Source code for bioflow.utils.general_utils.internet_io

"""
Module responsible for retrieval of files stored on the internet.
Requires some refactoring before can be considered a library in its onw right.
"""
from os.path import abspath, join, isdir
import os
import requests
import shutil
import zipfile
import gzip
# import tarfile
import io
import requests_ftp
import hashlib

from bioflow.utils.log_behavior import get_logger


log = get_logger(__name__)


[docs]def url_to_local_path(url, path, rename=None): """ Copies a file from an http url to a local destination provided in path. Performs file-to-folder converstion :param url: :param path: :raise Exception: something is wrong with the uri :return: """ if isdir(path) and '.zip' not in url and '.tar' not in url: new_path = join(path, url.split('/')[-1]) if rename is not None: new_path = join(path, rename) if not url[:3] == 'ftp': r = requests.get(url, stream=True) else: # print 'debug firing' requests_ftp.monkeypatch_session() s = requests.Session() r = s.get(url) # print r.status_code # print r.content if r.status_code in ['226', 200, 226, '200']: if not url[:3] == 'ftp': with open(new_path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) else: with open(new_path, 'wb') as f: f.write(r.content) else: log.critical('url %s failed, return code: %s' % (url, r.status_code)) raise Exception( "Something is wrong with the url provided: %s.\n Please attempt downloading files manually" % url)
[docs]def url_to_local_p_zip(url, path): """ Copies a file from an http url to a local folder provided in path :param url: :param path: :raise Exception: something is wrong with the path :raise Exception: something is wrong with the uri :return: """ if not isdir(path): raise Exception("path provided %s is not a directory" % path) r = requests.get(url, stream=True) if r.status_code == 200: r.raw.decode_content = True z = zipfile.ZipFile(io.BytesIO(r.content)) z.extractall(path) # This is unsafe as hell else: raise Exception( "Something is wrong with the url provided: %s.\n Please attempt downloading files manually" % url)
[docs]def url_to_local_p_gz(url, path): """ Copies a file from an http or ftp url to a local destination provided in path :param url: :param path: :raise Exception: something is wrong with the uri :return: """ if url[:3] == 'ftp': requests_ftp.monkeypatch_session() s = requests.Session() r = s.retr(url) else: r = requests.get(url, stream=True) if r.status_code in ['226', 200, 226, '200']: r.raw.decode_content = True f_out = open(path, 'wb') f_in = gzip.GzipFile(fileobj=io.BytesIO((r.content))) f_out.writelines(f_in) f_out.close() f_in.close() else: raise Exception( "Something is wrong with the url provided: %s.\n Please attempt downloading files manually" % url)
[docs]def url_to_local(url, path, rename=None): """ Copies a file from an http or ftp url to a local destination provided in path while choosing a good decompression algorithm so far, only gunzipped ftp url downloads and path autocompletion only for non-compressed files are supported :param url: :param path: :raise Exception: renaming for gunzip and zipped files is not supported :return: """ if url[-2:] == 'gz': if rename is not None: raise Exception('rename unsupported for gunzipped files') url_to_local_p_gz(url, path) elif url[-3:] == 'zip': if rename is not None: raise Exception('rename unsupported for zipped files') url_to_local_p_zip(url, path) else: url_to_local_path(url, path, rename)
[docs]def marbach_post_proc(local_directory): """ Function to post-process a specific online database that is rather quite unfriendly :return: """ relevant_path = join(local_directory, 'Network_compendium/Tissue-specific_regulatory_networks_FANTOM5-v1/32_high-level_networks') for fle in os.listdir(relevant_path): # print fle if fle[-3:] == '.gz': f_in = join(relevant_path, fle) f_out = join(local_directory, fle[:-3]) with gzip.open(f_in, 'rb') as _in, open(f_out, 'wb') as _out: # print _in, _out shutil.copyfileobj(_in, _out) shutil.rmtree(join(local_directory, 'Network_compendium'))
[docs]def check_hash(file_path, expected_hash, hasher): """ Checks a expected_hash of a file :param file_path: :param expected_hash: :param hash_type: :return: """ a_file = open(file_path, 'rb') block_size = 65536 buf = a_file.read(block_size) while len(buf) > 0: hasher.update(buf) buf = a_file.read(block_size) hex_digest = hasher.hexdigest() return hex_digest == expected_hash
if __name__ == "__main__": pull_url = "http://3.bp.blogspot.com/-0N45KxrABQU/VOegbIex3BI/AAAAAAAAPzg/imoUFV-_fu0/s1600/BestPicture.jpg" pth = join(abspath("../../testdir/"), 'testfile.jpg') url_to_local_path(pull_url, pth) pull_url = r'ftp://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/complete/uniprot_sprot.dat.gz' pth = abspath('../../../dumps/uniprot_sprot.dat') url_to_local(pull_url, pth) print(check_hash(pth, '439f9bf72102af7184d631d0476997d3', hashlib.md5()))