Source code for core.download

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Background download management."""

import os
import shutil
import tempfile
from threading import Lock, Thread
from urllib.error import URLError
from urllib.request import Request, urlopen

from . import log
from .lnp import VERSION

__download_queues = {}


[docs]def download_str(url, **kwargs): """Instantly download a file from <url> and return its contents. Failed downloads return None. NOTE: This is a blocking method. Use a download queue for non-blocking downloads. Args: url: the URL to download encoding: used to decode the data to text. Defaults to UTF-8. timeout: timeout used for the URL request, in seconds. Defaults to 3. Returns: The contents of the downloaded file as text, or None. """ try: req = Request(url, headers={'User-Agent': 'PyLNP/' + VERSION}) return urlopen( req, timeout=kwargs.get('timeout', 3)).read().decode( kwargs.get('encoding', 'utf-8')) except URLError as ex: log.e('Error downloading ' + url + ': ' + ex.reason) except Exception: log.e('Error downloading ' + url) return None
[docs]def download(queue, url, destination, end_callback=None, **kwargs): """Adds a download to the specified queue.""" return get_queue(queue).add(url, destination, end_callback, **kwargs)
[docs]def queue_empty(queue): """Returns True if the specified queue does not exist, or is empty; otherwise False.""" return queue not in __download_queues or __download_queues[queue].empty()
[docs]def get_queue(queue): """Returns the specified queue object, creating a new queue if necessary.""" __download_queues.setdefault(queue, DownloadQueue(queue)) return __download_queues[queue]
[docs]class DownloadQueue(object): """Queue used for downloading files.""" # pylint: disable=too-many-instance-attributes def __init__(self, name): self.name = name self.queue = [] self.on_start_queue = [] self.on_begin_download = [] self.on_progress = [] self.on_end_download = [] self.on_end_queue = [] self.thread = None self.lock = Lock() if name == 'immediate': def _immediate_progress(_, url, progress, total): if total != -1: msg = "Downloading %s... (%s/%s)" % ( os.path.basename(url), progress, total) else: msg = ("Downloading %s... (%s bytes downloaded)" % ( os.path.basename(url), progress)) print("\r%s" % msg, end='') self.register_progress(_immediate_progress)
[docs] def add(self, url, target, end_callback): """Adds a download to the queue. Args: url: the URL to download. target: the target path for the download. end_callback: a function(url, filename, success) which is called when the download finishes. """ with self.lock: if url not in [q[0] for q in self.queue]: self.queue.append((url, target, end_callback)) log.d(self.name + ': queueing ' + url + ' for download to ' + target) else: log.d(self.name + ': skipping add of ' + url + ', already in queue') if not self.thread and self.name != 'immediate': log.d('Download queue ' + self.name + ' not running, starting it') self.thread = t = Thread(target=self.__process_queue) t.daemon = True t.start() if self.name == 'immediate': log.i('Downloading immediately...') self.__process_queue()
[docs] def empty(self): """Returns True if the queue is empty, otherwise False.""" return len(self.queue) == 0
[docs] def register_start_queue(self, func): """Registers a function func(queue_name) to be called when the queue is started. If False is returned by any function, the queue is cleared.""" self.on_start_queue.append(func)
[docs] def unregister_start_queue(self, func): """Unregisters a function func from being called when the queue is started.""" self.on_start_queue.remove(func)
[docs] def register_begin_download(self, func): """Registers a function func(queue_name, url) to be called when a download is started.""" self.on_begin_download.append(func)
[docs] def unregister_begin_download(self, func): """Unregisters a function func from being called when a download is started.""" self.on_begin_download.remove(func)
[docs] def register_progress(self, func): """Registers a function func(queue_name, url, downloaded, total_size) to be called for download progress reports. If total size is unknown, None will be sent.""" self.on_progress.append(func)
[docs] def unregister_progress(self, func): """Unregisters a function from being called for download progress reports.""" self.on_progress.remove(func)
[docs] def register_end_download(self, func): """Registers a function func(queue_name, url, filename, success) to be called when a download is finished.""" self.on_end_download.append(func)
[docs] def unregister_end_download(self, func): """Unregisters a function func from being called when a download is finished.""" self.on_end_download.remove(func)
[docs] def register_end_queue(self, func): """Registers a function func(queue_name) to be called when the queue is emptied.""" self.on_end_queue.append(func)
[docs] def unregister_end_queue(self, func): """Unregisters a function func from being called when the queue is emptied.""" self.on_end_queue.remove(func)
def __process_callbacks(self, callbacks, *args): """Calls the provided set of callback functions with <args>.""" results = [] for c in callbacks: try: results.append(c(self.name, *args)) except Exception: results.append(None) return results def __process_queue(self): """Processes the download queue.""" if False in self.__process_callbacks(self.on_start_queue): with self.lock: self.queue = [] self.thread = None return while True: with self.lock: if self.empty(): self.thread = None break url, target, end_callback = self.queue[0] log.d(self.name + ': About to download ' + url + ' to ' + target) self.__process_callbacks(self.on_begin_download, url, target) dirname = os.path.dirname(target) if not os.path.isdir(dirname): os.makedirs(dirname) outhandle, outpath = tempfile.mkstemp(dir=dirname) outfile = os.fdopen(outhandle, 'wb') try: req = Request(url, headers={'User-Agent': 'PyLNP/' + VERSION}) with urlopen(req, timeout=5) as response: data = 0 while True: chunk = response.read(8192) if not chunk: break total = response.info().get('Content-Length') data += len(chunk) outfile.write(chunk) self.__process_callbacks( self.on_progress, url, data, total) except Exception: outfile.close() os.remove(outpath) log.e(self.name + ': Error downloading ' + url, stack=True) self.__process_callbacks( self.on_end_download, url, target, False) if end_callback: end_callback(url, target, False) else: outfile.close() shutil.move(outpath, target) log.d(self.name + ': Finished downloading ' + url) self.__process_callbacks( self.on_end_download, url, target, True) if end_callback: end_callback(url, target, True) with self.lock: self.queue.pop(0) self.__process_callbacks(self.on_end_queue)