mirror of
https://github.com/Feodor2/Mypal68.git
synced 2025-06-19 07:15:36 -04:00
449 lines
14 KiB
Python
449 lines
14 KiB
Python
#!/usr/bin/python3 -u
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import argparse
|
|
import bz2
|
|
import concurrent.futures
|
|
import gzip
|
|
import hashlib
|
|
import json
|
|
import lzma
|
|
import multiprocessing
|
|
import os
|
|
import pathlib
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.request
|
|
|
|
try:
|
|
import zstandard
|
|
except ImportError:
|
|
zstandard = None
|
|
|
|
try:
|
|
import certifi
|
|
except ImportError:
|
|
certifi = None
|
|
|
|
|
|
CONCURRENCY = multiprocessing.cpu_count()
|
|
|
|
|
|
def log(msg):
|
|
print(msg, file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
|
|
class IntegrityError(Exception):
|
|
"""Represents an integrity error when downloading a URL."""
|
|
|
|
|
|
# The following is copied from
|
|
# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85
|
|
def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1):
|
|
"""
|
|
A generator function that sleeps between retries, handles exponential
|
|
backoff and jitter. The action you are retrying is meant to run after
|
|
retrier yields.
|
|
|
|
At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter).
|
|
Afterwards sleeptime is multiplied by sleepscale for the next iteration.
|
|
|
|
Args:
|
|
attempts (int): maximum number of times to try; defaults to 5
|
|
sleeptime (float): how many seconds to sleep between tries; defaults to
|
|
60s (one minute)
|
|
max_sleeptime (float): the longest we'll sleep, in seconds; defaults to
|
|
300s (five minutes)
|
|
sleepscale (float): how much to multiply the sleep time by each
|
|
iteration; defaults to 1.5
|
|
jitter (int): random jitter to introduce to sleep time each iteration.
|
|
the amount is chosen at random between [-jitter, +jitter]
|
|
defaults to 1
|
|
|
|
Yields:
|
|
None, a maximum of `attempts` number of times
|
|
|
|
Example:
|
|
>>> n = 0
|
|
>>> for _ in retrier(sleeptime=0, jitter=0):
|
|
... if n == 3:
|
|
... # We did the thing!
|
|
... break
|
|
... n += 1
|
|
>>> n
|
|
3
|
|
|
|
>>> n = 0
|
|
>>> for _ in retrier(sleeptime=0, jitter=0):
|
|
... if n == 6:
|
|
... # We did the thing!
|
|
... break
|
|
... n += 1
|
|
... else:
|
|
... print("max tries hit")
|
|
max tries hit
|
|
"""
|
|
jitter = jitter or 0 # py35 barfs on the next line if jitter is None
|
|
if jitter > sleeptime:
|
|
# To prevent negative sleep times
|
|
raise Exception('jitter ({}) must be less than sleep time ({})'.format(jitter, sleeptime))
|
|
|
|
sleeptime_real = sleeptime
|
|
for _ in range(attempts):
|
|
log("attempt %i/%i" % (_ + 1, attempts))
|
|
|
|
yield sleeptime_real
|
|
|
|
if jitter:
|
|
sleeptime_real = sleeptime + random.randint(-jitter, jitter)
|
|
# our jitter should scale along with the sleeptime
|
|
jitter = int(jitter * sleepscale)
|
|
else:
|
|
sleeptime_real = sleeptime
|
|
|
|
sleeptime *= sleepscale
|
|
|
|
if sleeptime_real > max_sleeptime:
|
|
sleeptime_real = max_sleeptime
|
|
|
|
# Don't need to sleep the last time
|
|
if _ < attempts - 1:
|
|
log("sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts))
|
|
time.sleep(sleeptime_real)
|
|
|
|
|
|
def stream_download(url, sha256=None, size=None):
|
|
"""Download a URL to a generator, optionally with content verification.
|
|
|
|
If ``sha256`` or ``size`` are defined, the downloaded URL will be
|
|
validated against those requirements and ``IntegrityError`` will be
|
|
raised if expectations do not match.
|
|
|
|
Because verification cannot occur until the file is completely downloaded
|
|
it is recommended for consumers to not do anything meaningful with the
|
|
data if content verification is being used. To securely handle retrieved
|
|
content, it should be streamed to a file or memory and only operated
|
|
on after the generator is exhausted without raising.
|
|
"""
|
|
log('Downloading %s' % url)
|
|
|
|
h = hashlib.sha256()
|
|
length = 0
|
|
|
|
t0 = time.time()
|
|
with urllib.request.urlopen(url, cafile=certifi.where()) if certifi else urllib.request.urlopen(url) as fh:
|
|
if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip':
|
|
fh = gzip.GzipFile(fileobj=fh)
|
|
|
|
while True:
|
|
chunk = fh.read(65536)
|
|
if not chunk:
|
|
break
|
|
|
|
h.update(chunk)
|
|
length += len(chunk)
|
|
|
|
yield chunk
|
|
|
|
duration = time.time() - t0
|
|
digest = h.hexdigest()
|
|
|
|
log('%s resolved to %d bytes with sha256 %s in %.3fs' % (
|
|
url, length, digest, duration))
|
|
|
|
if size:
|
|
if size == length:
|
|
log('Verified size of %s' % url)
|
|
else:
|
|
raise IntegrityError('size mismatch on %s: wanted %d; got %d' % (
|
|
url, size, length))
|
|
|
|
if sha256:
|
|
if digest == sha256:
|
|
log('Verified sha256 integrity of %s' % url)
|
|
else:
|
|
raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % (
|
|
url, sha256, digest))
|
|
|
|
|
|
def download_to_path(url, path, sha256=None, size=None):
|
|
"""Download a URL to a filesystem path, possibly with verification."""
|
|
|
|
# We download to a temporary file and rename at the end so there's
|
|
# no chance of the final file being partially written or containing
|
|
# bad data.
|
|
try:
|
|
path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
for _ in retrier(attempts=5, sleeptime=60):
|
|
try:
|
|
tmp = path.with_name('%s.tmp' % path.name)
|
|
|
|
log('Downloading %s to %s' % (url, tmp))
|
|
|
|
try:
|
|
with tmp.open('wb') as fh:
|
|
for chunk in stream_download(url, sha256=sha256, size=size):
|
|
fh.write(chunk)
|
|
except IntegrityError:
|
|
tmp.unlink()
|
|
raise
|
|
|
|
log('Renaming to %s' % path)
|
|
tmp.rename(path)
|
|
return
|
|
except IntegrityError:
|
|
raise
|
|
except Exception as e:
|
|
log("Download failed: {}".format(e))
|
|
continue
|
|
|
|
raise Exception("Download failed, no more retries!")
|
|
|
|
|
|
def gpg_verify_path(path: pathlib.Path, public_key_data: bytes,
|
|
signature_data: bytes):
|
|
"""Verify that a filesystem path verifies using GPG.
|
|
|
|
Takes a Path defining a file to verify. ``public_key_data`` contains
|
|
bytes with GPG public key data. ``signature_data`` contains a signed
|
|
GPG document to use with ``gpg --verify``.
|
|
"""
|
|
log('Validating GPG signature of %s' % path)
|
|
log('GPG key data:\n%s' % public_key_data.decode('ascii'))
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
try:
|
|
# --batch since we're running unattended.
|
|
gpg_args = ['gpg', '--homedir', td, '--batch']
|
|
|
|
log('Importing GPG key...')
|
|
subprocess.run(gpg_args + ['--import'],
|
|
input=public_key_data,
|
|
check=True)
|
|
|
|
log('Verifying GPG signature...')
|
|
subprocess.run(gpg_args + ['--verify', '-', '%s' % path],
|
|
input=signature_data,
|
|
check=True)
|
|
|
|
log('GPG signature verified!')
|
|
finally:
|
|
# There is a race between the agent self-terminating and
|
|
# shutil.rmtree() from the temporary directory cleanup that can
|
|
# lead to exceptions. Kill the agent before cleanup to prevent this.
|
|
env = dict(os.environ)
|
|
env['GNUPGHOME'] = td
|
|
subprocess.run(['gpgconf', '--kill', 'gpg-agent'], env=env)
|
|
|
|
|
|
def archive_type(path: pathlib.Path):
|
|
"""Attempt to identify a path as an extractable archive."""
|
|
if path.suffixes[-2:-1] == ['.tar']:
|
|
return 'tar'
|
|
elif path.suffix == '.zip':
|
|
return 'zip'
|
|
else:
|
|
return None
|
|
|
|
|
|
def extract_archive(path, dest_dir, typ):
|
|
"""Extract an archive to a destination directory."""
|
|
|
|
# Resolve paths to absolute variants.
|
|
path = path.resolve()
|
|
dest_dir = dest_dir.resolve()
|
|
|
|
# We pipe input to the decompressor program so that we can apply
|
|
# custom decompressors that the program may not know about.
|
|
if typ == 'tar':
|
|
if path.suffix == '.bz2':
|
|
ifh = bz2.open(str(path), 'rb')
|
|
elif path.suffix == '.gz':
|
|
ifh = gzip.open(str(path), 'rb')
|
|
elif path.suffix == '.xz':
|
|
ifh = lzma.open(str(path), 'rb')
|
|
elif path.suffix == '.zst':
|
|
if not zstandard:
|
|
raise ValueError('zstandard Python package not available')
|
|
dctx = zstandard.ZstdDecompressor()
|
|
ifh = dctx.stream_reader(path.open('rb'))
|
|
elif path.suffix == '.tar':
|
|
ifh = path.open('rb')
|
|
else:
|
|
raise ValueError('unknown archive format for tar file: %s' % path)
|
|
|
|
args = ['tar', 'xf', '-']
|
|
pipe_stdin = True
|
|
elif typ == 'zip':
|
|
# unzip from stdin has wonky behavior. We don't use a pipe for it.
|
|
ifh = open(os.devnull, 'rb')
|
|
args = ['unzip', '-o', str(path)]
|
|
pipe_stdin = False
|
|
else:
|
|
raise ValueError('unknown archive format: %s' % path)
|
|
|
|
log('Extracting %s to %s using %r' % (path, dest_dir, args))
|
|
t0 = time.time()
|
|
|
|
with ifh, subprocess.Popen(args, cwd=str(dest_dir), bufsize=0,
|
|
stdin=subprocess.PIPE) as p:
|
|
while True:
|
|
if not pipe_stdin:
|
|
break
|
|
|
|
chunk = ifh.read(131072)
|
|
if not chunk:
|
|
break
|
|
|
|
p.stdin.write(chunk)
|
|
|
|
if p.returncode:
|
|
raise Exception('%r exited %d' % (args, p.returncode))
|
|
|
|
log('%s extracted in %.3fs' % (path, time.time() - t0))
|
|
|
|
|
|
def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None):
|
|
"""Fetch a URL and extract it to a destination path.
|
|
|
|
If the downloaded URL is an archive, it is extracted automatically
|
|
and the archive is deleted. Otherwise the file remains in place in
|
|
the destination directory.
|
|
"""
|
|
|
|
basename = url.split('/')[-1]
|
|
dest_path = dest_dir / basename
|
|
|
|
download_to_path(url, dest_path, sha256=sha256, size=size)
|
|
|
|
if not extract:
|
|
return
|
|
|
|
typ = archive_type(dest_path)
|
|
if typ:
|
|
extract_archive(dest_path, dest_dir, typ)
|
|
log('Removing %s' % dest_path)
|
|
dest_path.unlink()
|
|
|
|
|
|
def fetch_urls(downloads):
|
|
"""Fetch URLs pairs to a pathlib.Path."""
|
|
with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e:
|
|
fs = []
|
|
|
|
for download in downloads:
|
|
fs.append(e.submit(fetch_and_extract, *download))
|
|
|
|
for f in fs:
|
|
f.result()
|
|
|
|
|
|
def command_static_url(args):
|
|
gpg_sig_url = args.gpg_sig_url
|
|
gpg_env_key = args.gpg_key_env
|
|
|
|
if bool(gpg_sig_url) != bool(gpg_env_key):
|
|
print('--gpg-sig-url and --gpg-key-env must both be defined')
|
|
return 1
|
|
|
|
if gpg_sig_url:
|
|
gpg_signature = b''.join(stream_download(gpg_sig_url))
|
|
gpg_key = os.environb[gpg_env_key.encode('ascii')]
|
|
|
|
dest = pathlib.Path(args.dest)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
download_to_path(args.url, dest, sha256=args.sha256, size=args.size)
|
|
|
|
if gpg_sig_url:
|
|
gpg_verify_path(dest, gpg_key, gpg_signature)
|
|
|
|
except Exception:
|
|
try:
|
|
dest.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
raise
|
|
|
|
|
|
def api(root_url, service, version, path):
|
|
# taskcluster-lib-urls is not available when this script runs, so
|
|
# simulate its behavior:
|
|
if root_url == 'https://taskcluster.net':
|
|
return 'https://{service}.taskcluster.net/{version}/{path}'.format(
|
|
service=service, version=version, path=path)
|
|
return '{root_url}/api/{service}/{version}/{path}'.format(
|
|
root_url=root_url, service=service, version=version, path=path)
|
|
|
|
|
|
def command_task_artifacts(args):
|
|
fetches = json.loads(os.environ['MOZ_FETCHES'])
|
|
downloads = []
|
|
for fetch in fetches:
|
|
extdir = pathlib.Path(args.dest)
|
|
if 'dest' in fetch:
|
|
extdir = extdir.joinpath(fetch['dest'])
|
|
extdir.mkdir(parents=True, exist_ok=True)
|
|
root_url = os.environ['TASKCLUSTER_ROOT_URL']
|
|
if fetch['artifact'].startswith('public/'):
|
|
path = 'task/{task}/artifacts/{artifact}'.format(
|
|
task=fetch['task'], artifact=fetch['artifact'])
|
|
url = api(root_url, 'queue', 'v1', path)
|
|
else:
|
|
url = ('{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}').format(
|
|
proxy_url=os.environ['TASKCLUSTER_PROXY_URL'],
|
|
task=fetch['task'],
|
|
artifact=fetch['artifact'])
|
|
downloads.append((url, extdir, fetch['extract']))
|
|
|
|
fetch_urls(downloads)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(title='sub commands')
|
|
|
|
url = subparsers.add_parser('static-url', help='Download a static URL')
|
|
url.set_defaults(func=command_static_url)
|
|
url.add_argument('--sha256', required=True,
|
|
help='SHA-256 of downloaded content')
|
|
url.add_argument('--size', required=True, type=int,
|
|
help='Size of downloaded content, in bytes')
|
|
url.add_argument('--gpg-sig-url',
|
|
help='URL containing signed GPG document validating '
|
|
'URL to fetch')
|
|
url.add_argument('--gpg-key-env',
|
|
help='Environment variable containing GPG key to validate')
|
|
url.add_argument('url', help='URL to fetch')
|
|
url.add_argument('dest', help='Destination path')
|
|
|
|
artifacts = subparsers.add_parser('task-artifacts',
|
|
help='Fetch task artifacts')
|
|
artifacts.set_defaults(func=command_task_artifacts)
|
|
artifacts.add_argument('-d', '--dest', default=os.environ.get('MOZ_FETCHES_DIR'),
|
|
help='Destination directory which will contain all '
|
|
'artifacts (defaults to $MOZ_FETCHES_DIR)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.dest:
|
|
parser.error('no destination directory specified, either pass in --dest '
|
|
'or set $MOZ_FETCHES_DIR')
|
|
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|