Source code for polaris.io

import grp
import os
import stat
import tempfile
from urllib.parse import urlparse

import progressbar
import requests


[docs]def download(url, dest_path, config, exceptions=True): # noqa: C901 """ Download a file from a URL to the given path or path name Parameters ---------- url : str The URL (including file name) to download dest_path : str The path (including file name) where the downloaded file should be saved config : polaris.config.PolarisConfigParser Configuration options used to find custom paths if ``dest_path`` is a config option exceptions : bool, optional Whether to raise exceptions when the download fails Returns ------- dest_path : str The resulting file name if the download was successful, or None if not """ in_file_name = os.path.basename(urlparse(url).path) dest_path = os.path.abspath(dest_path) out_file_name = os.path.basename(dest_path) do_download = config.getboolean('download', 'download') check_size = config.getboolean('download', 'check_size') verify = config.getboolean('download', 'verify') if not do_download: if not os.path.exists(dest_path): raise OSError(f'File not found and downloading is disabled: ' f'{dest_path}') return dest_path if not check_size and os.path.exists(dest_path): return dest_path session = requests.Session() if not verify: session.verify = False # dest_path contains full path, so we need to make the relevant # subdirectories if they do not exist already directory = os.path.dirname(dest_path) try: os.makedirs(directory) except OSError: pass try: response = session.get(url, stream=True) total_size = response.headers.get('content-length') except requests.exceptions.RequestException: if exceptions: raise else: print(f' {url} could not be reached!') return None try: response.raise_for_status() except requests.exceptions.HTTPError as e: if exceptions: raise else: print(f'ERROR while downloading {in_file_name}:') print(e) return None if total_size is None: # no content length header if not os.path.exists(dest_path): dest_dir = os.path.dirname(dest_path) with open(dest_path, 'wb') as f: print(f'Downloading {in_file_name}\n' f' to {dest_dir}...') try: f.write(response.content) except requests.exceptions.RequestException: if exceptions: raise else: print(f' {in_file_name} failed!') return None else: print(' {in_file_name} done.') else: # we can do the download in chunks and use a progress bar, yay! total_size_int = int(total_size) if os.path.exists(dest_path) and \ total_size_int == os.path.getsize(dest_path): # we already have the file, so just return return dest_path if out_file_name == in_file_name: file_names = in_file_name else: file_names = f'{in_file_name} as {out_file_name}' dest_dir = os.path.dirname(dest_path) print(f'Downloading {file_names} ({_sizeof_fmt(total_size_int)})\n' f' to {dest_dir}') widgets = [progressbar.Percentage(), ' ', progressbar.Bar(), ' ', progressbar.ETA()] bar = progressbar.ProgressBar(widgets=widgets, max_value=total_size_int).start() size = 0 with open(dest_path, 'wb') as f: try: for data in response.iter_content(chunk_size=4096): size += len(data) f.write(data) bar.update(size) bar.finish() except requests.exceptions.RequestException: if exceptions: raise else: print(f' {in_file_name} failed!') return None else: print(f' {in_file_name} done.') return dest_path
[docs]def update_permissions(directories, group, show_progressbar=True): """ Fix permissions on the databases where files were downloaded so everyone in the group can read/write to them Parameters ---------- directories : list directories to change permissions on group : str The name of the group to set permissions to show_progressbar : bool, optional Whether to show a progress bar as permissions are updated """ new_uid = os.getuid() new_gid = grp.getgrnam(group).gr_gid write_perm = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH) exec_perm = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) mask = stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO if show_progressbar: print('changing permissions on downloaded files') # first the base directories that don't seem to be included in # os.walk() for directory in directories: root = None _set_dir_perms(root, directory, mask, exec_perm, new_uid, new_gid) files_and_dirs = _walk_dirs(directories) if show_progressbar: widgets = [progressbar.Percentage(), ' ', progressbar.Bar(), ' ', progressbar.ETA()] bar = progressbar.ProgressBar(widgets=widgets, maxval=len(files_and_dirs)).start() else: bar = None progress = 0 for base in directories: for root, dirs, files in os.walk(base): for directory in dirs: progress += 1 if show_progressbar: bar.update(progress) _set_dir_perms(root, directory, mask, exec_perm, new_uid, new_gid) for file_name in files: progress += 1 if show_progressbar: bar.update(progress) file_name = os.path.join(root, file_name) _set_file_perms(root, file_name, mask, exec_perm, write_perm, new_uid, new_gid) if show_progressbar: bar.finish() print(' done.')
def _walk_dirs(directories): """ Walk through directories to find files to change """ files_and_dirs = [] for base in directories: for root, dirs, files in os.walk(base): files_and_dirs.extend(dirs) files_and_dirs.extend(files) return files_and_dirs def _set_dir_perms(root, directory, mask, exec_perm, new_uid, new_gid): """ Set permissions for a directory """ if root is not None: directory = os.path.join(root, directory) try: dir_stat = os.stat(directory) except OSError: return if dir_stat.st_uid != new_uid: # current user doesn't own this dir so let's move on return perm = dir_stat.st_mode & mask if perm == exec_perm and dir_stat.st_gid == new_gid: return try: os.chown(directory, new_uid, new_gid) os.chmod(directory, exec_perm) except OSError: return def _set_file_perms(root, file_name, mask, exec_perm, write_perm, new_uid, new_gid): """ Set permissions for a directory """ file_name = os.path.join(root, file_name) try: file_stat = os.stat(file_name) except OSError: return if file_stat.st_uid != new_uid: # current user doesn't own this file so let's move on return perm = file_stat.st_mode & mask if perm & stat.S_IXUSR: # executable, so make sure others can execute it new_perm = exec_perm else: new_perm = write_perm if perm == new_perm and file_stat.st_gid == new_gid: return try: os.chown(file_name, new_uid, new_gid) os.chmod(file_name, new_perm) except OSError: return # From https://stackoverflow.com/a/1094933/7728169 def _sizeof_fmt(num, suffix='B'): """ Covert a number of bytes to a human-readable file size """ for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return f"{num:3.1f}{unit}{suffix}" num /= 1024.0 return f"{num:.1f}{'Yi'}{suffix}"