From 3e33bf972585ad99fbcbdd2797ed0046904a9f2f Mon Sep 17 00:00:00 2001 From: Frank Ch. Eigler Date: Mar 12 2020 14:40:22 +0000 Subject: [PATCH 1/3] issues/1880: send Range: header to accelerate duplicate/partial downloads --- diff --git a/cli/koji_cli/lib.py b/cli/koji_cli/lib.py index 7b000d4..49fd6ab 100644 --- a/cli/koji_cli/lib.py +++ b/cli/koji_cli/lib.py @@ -498,20 +498,30 @@ def download_file(url, relpath, quiet=False, noprogress=False, size=None, num=No else: print(_("Downloading: %s") % relpath) + f = open(relpath, 'ab') + headers = {} + pos = f.tell() + if pos: + headers['Range'] = f'bytes={pos}-' + # closing needs to be used for requests < 2.18.0 - with closing(requests.get(url, stream=True)) as response: - # raise error if occured - response.raise_for_status() + with closing(requests.get(url, headers=headers, stream=True)) as response: + if (response.status_code == 200): # full content provided? + f.close() + f = open(relpath, 'wb') + elif not (response.status_code == 416 and pos): # error? + response.raise_for_status() length = int(response.headers.get('content-length') or 0) - with open(relpath, 'wb') as f: + pos = 0 - for chunk in response.iter_content(chunk_size=65536): + for chunk in response.iter_content(chunk_size=65536): pos += len(chunk) - f.write(chunk) - if not (quiet or noprogress): + f.write(chunk) + if not (quiet or noprogress): _download_progress(length, pos) - if not length and not (quiet or noprogress): + if not length and not (quiet or noprogress): _download_progress(pos, pos) + f.close() if not (quiet or noprogress): print('') From 9b84b2ed979417805f789b785df30fcbc8622496 Mon Sep 17 00:00:00 2001 From: Frank Ch. Eigler Date: Mar 12 2020 14:40:22 +0000 Subject: [PATCH 2/3] Range: header: use old school fmt % operator This should be more compatible with old pythons. --- diff --git a/cli/koji_cli/lib.py b/cli/koji_cli/lib.py index 49fd6ab..0e670d0 100644 --- a/cli/koji_cli/lib.py +++ b/cli/koji_cli/lib.py @@ -502,7 +502,7 @@ def download_file(url, relpath, quiet=False, noprogress=False, size=None, num=No headers = {} pos = f.tell() if pos: - headers['Range'] = f'bytes={pos}-' + headers['Range'] = ('bytes=%d-' % pos) # closing needs to be used for requests < 2.18.0 with closing(requests.get(url, headers=headers, stream=True)) as response: From c703d50ca6bbfce4d781d52152ff59a6339ba209 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Mar 12 2020 14:40:23 +0000 Subject: [PATCH 3/3] download_archive / download_rpm methods for CLI Fixes: https://pagure.io/koji/issue/1880 --- diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index c083734..cf7d4df 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -30,7 +30,9 @@ from koji_cli.lib import ( _running_in_bg, activate_session, arg_filter, + download_archive, download_file, + download_rpm, error, format_inheritance_flags, get_usage_str, @@ -6687,14 +6689,12 @@ def anon_handle_download_build(options, session, args): if build.isdigit(): if suboptions.latestfrom: - print("--latestfrom not compatible with build IDs, specify a package name.") - return 1 + error("--latestfrom not compatible with build IDs, specify a package name.") build = int(build) if suboptions.task_id: builds = session.listBuilds(taskID=build) if not builds: - print("No associated builds for task %s" % build) - return 1 + error("No associated builds for task %s" % build) build = builds[0]['build_id'] if suboptions.latestfrom: @@ -6703,17 +6703,14 @@ def anon_handle_download_build(options, session, args): builds = session.listTagged(suboptions.latestfrom, latest=True, package=build, type=suboptions.type) except koji.GenericError as data: - print("Error finding latest build: %s" % data) - return 1 + error("Error finding latest build: %s" % data) if not builds: - print("%s has no builds of %s" % (suboptions.latestfrom, build)) - return 1 + error("%s has no builds of %s" % (suboptions.latestfrom, build)) info = builds[0] elif suboptions.rpm: rpminfo = session.getRPM(build) if rpminfo is None: - print("No such rpm: %s" % build) - return 1 + error("No such rpm: %s" % build) info = session.getBuild(rpminfo['build_id']) else: # if we're given an rpm name without --rpm, download the containing build @@ -6726,66 +6723,43 @@ def anon_handle_download_build(options, session, args): info = session.getBuild(build) if info is None: - print("No such build: %s" % build) - return 1 + error("No such build: %s" % build) if not suboptions.topurl: - print("You must specify --topurl to download files") - return 1 - pathinfo = koji.PathInfo(topdir=suboptions.topurl) + error("You must specify --topurl to download files") - urls = [] + archives = [] + rpms = [] if suboptions.type: archives = session.listArchives(buildID=info['id'], type=suboptions.type) if not archives: - print("No %s archives available for %s" % (suboptions.type, koji.buildLabel(info))) - return 1 - if suboptions.type == 'maven': - for archive in archives: - url = pathinfo.mavenbuild(info) + '/' + pathinfo.mavenfile(archive) - urls.append((url, pathinfo.mavenfile(archive))) - elif suboptions.type == 'win': - for archive in archives: - url = pathinfo.winbuild(info) + '/' + pathinfo.winfile(archive) - urls.append((url, pathinfo.winfile(archive))) - elif suboptions.type == 'image': - if not suboptions.topurl: - print("You must specify --topurl to download images") - return 1 - pi = koji.PathInfo(topdir=suboptions.topurl) - for archive in archives: - url = '%s/%s' % (pi.imagebuild(info), archive['filename']) - urls.append((url, archive['filename'])) - else: - # can't happen - assert False # pragma: no cover + error("No %s archives available for %s" % (suboptions.type, koji.buildLabel(info))) else: arches = suboptions.arches if len(arches) == 0: arches = None if suboptions.rpm: - rpms = [rpminfo] + all_rpms = [rpminfo] else: - rpms = session.listRPMs(buildID=info['id'], arches=arches) - if not rpms: + all_rpms = session.listRPMs(buildID=info['id'], arches=arches) + if not all_rpms: if arches: - print("No %s packages available for %s" % + error("No %s packages available for %s" % (" or ".join(arches), koji.buildLabel(info))) else: - print("No packages available for %s" % koji.buildLabel(info)) - return 1 - for rpm in rpms: + error("No packages available for %s" % koji.buildLabel(info)) + for rpm in all_rpms: if not suboptions.debuginfo and koji.is_debuginfo(rpm['name']): continue - if suboptions.key: - fname = pathinfo.signed(rpm, suboptions.key) - else: - fname = pathinfo.rpm(rpm) - url = pathinfo.build(info) + '/' + fname - urls.append((url, os.path.basename(fname))) + rpms.append(rpm) - for url, relpath in urls: - download_file(url, relpath, suboptions.quiet, suboptions.noprogress) + # run the download + for rpm in rpms: + download_rpm(info, rpm, suboptions.topurl, sigkey=suboptions.key, + quiet=suboptions.quiet, noprogress=suboptions.noprogress) + for archive in archives: + download_archive(info, archive, suboptions.topurl, + quiet=suboptions.quiet, noprogress=suboptions.noprogress) def anon_handle_download_logs(options, session, args): @@ -6996,8 +6970,8 @@ def anon_handle_download_task(options, session, args): if '..' in filename: error(_('Invalid file name: %s') % filename) url = '%s/%s/%s' % (pathinfo.work(volume), pathinfo.taskrelpath(task["id"]), filename) - download_file(url, new_filename, suboptions.quiet, suboptions.noprogress, len(downloads), - number) + download_file(url, new_filename, quiet=suboptions.quiet, noprogress=suboptions.noprogress, + size=len(downloads), num=number) def anon_handle_wait_repo(options, session, args): diff --git a/cli/koji_cli/lib.py b/cli/koji_cli/lib.py index 0e670d0..967f6e8 100644 --- a/cli/koji_cli/lib.py +++ b/cli/koji_cli/lib.py @@ -1,6 +1,7 @@ # coding=utf-8 from __future__ import absolute_import, division +import hashlib import optparse import os import random @@ -487,8 +488,20 @@ def linked_upload(localfile, path, name=None): os.umask(old_umask) -def download_file(url, relpath, quiet=False, noprogress=False, size=None, num=None): - """Download files from remote""" +def download_file(url, relpath, quiet=False, noprogress=False, size=None, + num=None, filesize=None): + """Download files from remote + + :param str url: URL to be downloaded + :param str relpath: where to save it + :param bool quiet: no/verbose + :param bool noprogress: print progress bar + :param int size: total number of files being downloaded (printed in verbose + mode) + :param int num: download index (printed in verbose mode) + :param int filesize: expected file size, used for appending to file, no + other checks are performed, caller is responsible for + checking, that resulting file is valid.""" if '/' in relpath: koji.ensuredir(os.path.dirname(relpath)) @@ -498,46 +511,142 @@ def download_file(url, relpath, quiet=False, noprogress=False, size=None, num=No else: print(_("Downloading: %s") % relpath) - f = open(relpath, 'ab') + pos = 0 headers = {} - pos = f.tell() - if pos: - headers['Range'] = ('bytes=%d-' % pos) + if filesize: + # append the file + f = open(relpath, 'ab') + pos = f.tell() + if pos: + if filesize == pos: + if not quiet: + print(_("File %s already downloaded, skipping" % relpath)) + return + if not quiet: + print(_("Appending to existing file %s" % relpath)) + headers['Range'] = ('bytes=%d-' % pos) + else: + # rewrite + f = open(relpath, 'wb') # closing needs to be used for requests < 2.18.0 with closing(requests.get(url, headers=headers, stream=True)) as response: - if (response.status_code == 200): # full content provided? + if response.status_code == 200: # full content provided? + # rewrite in such case f.close() f = open(relpath, 'wb') - elif not (response.status_code == 416 and pos): # error? - response.raise_for_status() - length = int(response.headers.get('content-length') or 0) - - pos = 0 - for chunk in response.iter_content(chunk_size=65536): - pos += len(chunk) + response.raise_for_status() + length = filesize or int(response.headers.get('content-length') or 0) + for chunk in response.iter_content(chunk_size=1024**2): + pos += len(chunk) f.write(chunk) if not (quiet or noprogress): - _download_progress(length, pos) + _download_progress(length, pos, filesize) if not length and not (quiet or noprogress): - _download_progress(pos, pos) + _download_progress(pos, pos, filesize) f.close() if not (quiet or noprogress): print('') -def _download_progress(download_t, download_d): +def download_rpm(build, rpm, topurl, sigkey=None, quiet=False, noprogress=False): + "Wrapper around download_file, do additional checks for rpm files" + pi = koji.PathInfo(topdir=topurl) + if sigkey: + fname = pi.signed(rpm, sigkey) + else: + fname = pi.rpm(rpm) + url = os.path.join(pi.build(build), fname) + path = os.path.basename(fname) + + download_file(url, path, quiet=quiet, noprogress=noprogress, filesize=rpm['size']) + + # size + size = os.path.getsize(path) + if size != rpm['size']: + os.unlink(path) + error("Downloaded rpm %s size %d does not match db size %d, deleting" % + (path, size, rpm['size'])) + + # basic sanity + try: + koji.check_rpm_file(path) + except koji.GenericError as ex: + os.unlink(path) + warn(str(ex)) + error("Downloaded rpm %s is not valid rpm file, deleting" % path) + + # payload hash + sigmd5 = koji.get_header_fields(path, ['sigmd5'])['sigmd5'] + if rpm['payloadhash'] != koji.hex_string(sigmd5): + os.unlink(path) + error("Downloaded rpm %s doesn't match db, deleting" % path) + + +def download_archive(build, archive, topurl, quiet=False, noprogress=False): + "Wrapper around download_file, do additional checks for archive files" + + pi = koji.PathInfo(topdir=topurl) + if archive['btype'] == 'maven': + url = os.path.join(pi.mavenbuild(build), pi.mavenfile(archive)) + path = pi.mavenfile(archive) + elif archive['btype'] == 'win': + url = os.path.join(pi.winbuild(build), pi.winfile(archive)) + path = pi.winfile(archive) + elif archive['btype'] == 'image': + url = os.path.join(pi.imagebuild(build), archive['filename']) + path = archive['filename'] + else: + # TODO: cover module/operator-manifests/remote-sources + # can't happen + assert False # pragma: no cover + + download_file(url, path, quiet=quiet, noprogress=noprogress, filesize=archive['size']) + + # check size + if os.path.getsize(path) != archive['size']: + os.unlink(path) + error("Downloaded rpm %s size does not match db size, deleting" % path) + + # check checksum/checksum_type + if archive['checksum_type'] == koji.CHECKSUM_TYPES['md5']: + hash = hashlib.md5() + elif archive['checksum_type'] == koji.CHECKSUM_TYPES['sha1']: + hash = hashlib.sha1() + elif archive['checksum_type'] == koji.CHECKSUM_TYPES['sha256']: + hash = hashlib.sha256() + else: + # shouldn't happen + error("Unknown checksum type: %s" % archive['checksum_type']) + with open(path, "rb") as f: + while True: + chunk = f.read(1024**2) + hash.update(chunk) + if not chunk: + break + if hash.hexdigest() != archive['checksum']: + os.unlink(path) + error("Downloaded archive %s doesn't match checksum, deleting" % path) + + +def _download_progress(download_t, download_d, size=None): if download_t == 0: percent_done = 0.0 percent_done_str = "???%" else: percent_done = float(download_d) / float(download_t) percent_done_str = "%3d%%" % (percent_done * 100) + if size: + data_all = _format_size(size) data_done = _format_size(download_d) + if size: + data_size = "%s / %s" % (data_done, data_all) + else: + data_size = data_done sys.stdout.write("[% -36s] % 4s % 10s\r" % ('=' * (int(percent_done * 36)), percent_done_str, - data_done)) + data_size)) sys.stdout.flush() diff --git a/tests/test_cli/test_download_file.py b/tests/test_cli/test_download_file.py index c3c33fd..c0bb333 100644 --- a/tests/test_cli/test_download_file.py +++ b/tests/test_cli/test_download_file.py @@ -56,7 +56,6 @@ class TestDownloadFile(unittest.TestCase): self.assertEqual(cm.exception[1], 'Is a directory') else: self.assertEqual(cm.exception.args, (21, 'Is a directory')) - self.requests_get.assert_called_once() @mock_open() def test_handle_download_file(self, m_open): @@ -166,10 +165,9 @@ class TestDownloadFileError(unittest.TestCase): with self.assertRaises(requests.HTTPError): download_file("http://url", self.filename) try: - self.assertFalse(os.path.exists(self.filename)) - except AssertionError: os.unlink(self.filename) - raise + except Exception: + pass @requests_mock.Mocker() def test_handle_download_file_error_500(self, m): @@ -177,10 +175,9 @@ class TestDownloadFileError(unittest.TestCase): with self.assertRaises(requests.HTTPError): download_file("http://url", self.filename) try: - self.assertFalse(os.path.exists(self.filename)) - except AssertionError: os.unlink(self.filename) - raise + except Exception: + pass if __name__ == '__main__': unittest.main() diff --git a/tests/test_cli/test_download_task.py b/tests/test_cli/test_download_task.py index 29c0634..e89fd8f 100644 --- a/tests/test_cli/test_download_task.py +++ b/tests/test_cli/test_download_task.py @@ -36,7 +36,8 @@ class TestDownloadTask(unittest.TestCase): url = pattern % (subpath, k) if target.endswith('.log') and arch is not None: target = "%s.%s.log" % (target.rstrip(".log"), arch) - calls.append(call(url, target, None, None, total, i + 1)) + calls.append(call(url, target, quiet=None, noprogress=None, + size=total, num=i + 1)) return calls def setUp(self): @@ -162,11 +163,11 @@ class TestDownloadTask(unittest.TestCase): call(self.session, 44444)]) self.assertListEqual(self.download_file.mock_calls, [ call('https://topurl/work/tasks/3333/33333/somerpm.x86_64.rpm', - 'somerpm.x86_64.rpm', None, None, 3, 1), + 'somerpm.x86_64.rpm', quiet=None, noprogress=None, size=3, num=1), call('https://topurl/vol/vol2/work/tasks/3333/33333/somerpm.x86_64.rpm', - 'vol2/somerpm.x86_64.rpm', None, None, 3, 2), + 'vol2/somerpm.x86_64.rpm', quiet=None, noprogress=None, size=3, num=2), call('https://topurl/vol/vol3/work/tasks/4444/44444/somerpm.noarch.rpm', - 'vol3/somerpm.noarch.rpm', None, None, 3, 3)]) + 'vol3/somerpm.noarch.rpm', quiet=None, noprogress=None, size=3, num=3)]) self.assertIsNone(rv) def test_handle_download_task_log(self):