from .. import utils
from .._lazyload import requests
import os
import tempfile
import urllib.request
import zipfile
_CHUNK_SIZE = 32768
_GOOGLE_DRIVE_URL = "https://docs.google.com/uc?export=download"
_FAKE_HEADERS = [("User-Agent", "Mozilla/5.0")]
def _save_response_content(response, destination):
global _CHUNK_SIZE
if isinstance(destination, str):
with open(destination, "wb") as handle:
_save_response_content(response, handle)
else:
for chunk in response.iter_content(_CHUNK_SIZE):
if chunk: # filter out keep-alive new chunks
destination.write(chunk)
def _google_drive_confirm_token(response):
for key, value in response.cookies.items():
if key.startswith("download_warning"):
return value
return None
@utils._with_pkg(pkg="requests")
def _GET_google_drive(id):
"""Post a GET request to Google Drive."""
global _GOOGLE_DRIVE_URL
with requests.Session() as session:
response = session.get(_GOOGLE_DRIVE_URL, params={"id": id}, stream=True)
token = _google_drive_confirm_token(response)
if token:
params = {"id": id, "confirm": token}
response = session.get(_GOOGLE_DRIVE_URL, params=params, stream=True)
return response
[docs]def download_google_drive(id, destination):
"""Download a file from Google Drive.
Requires the file to be available to view by anyone with the URL.
Parameters
----------
id : string
Google Drive ID string. You can access this by clicking 'Get Shareable Link',
which will give a URL of the form
<https://drive.google.com/file/d/**your_file_id**/view?usp=sharing>
destination : string or file
File to which to save the downloaded data
"""
response = _GET_google_drive(id)
_save_response_content(response, destination)
[docs]def download_url(url, destination):
"""Download a file from a URL.
Parameters
----------
url : string
URL of file to be downloaded
destination : string or file
File to which to save the downloaded data
"""
if isinstance(destination, str):
with open(destination, "wb") as handle:
download_url(url, handle)
else:
# destination is File
opener = urllib.request.build_opener()
opener.addheaders = _FAKE_HEADERS
urllib.request.install_opener(opener)
with urllib.request.urlopen(url) as handle:
destination.write(handle.read())
[docs]def unzip(filename, destination=None, delete=True):
"""Extract a .zip file and optionally remove the archived version.
Parameters
----------
filename : string
Path to the zip file
destination : string, optional (default: None)
Path to the folder in which to extract the zip.
If None, extracts to the same directory the archive is in.
delete : boolean, optional (default: True)
If True, deletes the zip file after extraction
"""
filename = os.path.expanduser(filename)
if destination is None:
destination = os.path.dirname(filename)
elif not os.path.isdir(destination):
os.mkdir(destination)
with zipfile.ZipFile(filename, "r") as handle:
handle.extractall(destination)
if delete:
os.unlink(filename)
[docs]def download_and_extract_zip(url, destination):
"""Download a .zip file from a URL and extract it.
Parameters
----------
url : string
URL of file to be downloaded
destination : string
Directory in which to extract the downloaded zip
"""
if not os.path.isdir(destination):
os.mkdir(destination)
zip_handle = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
download_url(url, zip_handle)
zip_handle.close()
unzip(zip_handle.name, destination)