#!/usr/bin/python3.9
|
#
|
# CDDL HEADER START
|
#
|
# The contents of this file are subject to the terms of the
|
# Common Development and Distribution License (the "License").
|
# You may not use this file except in compliance with the License.
|
#
|
# You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
|
# or http://www.opensolaris.org/os/licensing.
|
# See the License for the specific language governing permissions
|
# and limitations under the License.
|
#
|
# When distributing Covered Code, include this CDDL HEADER in each
|
# file and include the License file at usr/src/OPENSOLARIS.LICENSE.
|
# If applicable, add the following below this CDDL HEADER, with the
|
# fields enclosed by brackets "[]" replaced with your own identifying
|
# information: Portions Copyright [yyyy] [name of copyright owner]
|
#
|
# CDDL HEADER END
|
#
|
# Copyright (c) 2010, 2016, Oracle and/or its affiliates. All rights reserved.
|
#
|
#
|
# userland-fetch - a file download utility
|
#
|
# A simple program similiar to wget(1), but handles local file copy, ignores
|
# directories, and verifies file hashes.
|
#
|
|
import errno
|
import os
|
import sys
|
import shutil
|
import json
|
import subprocess
|
import re
|
import gzip
|
import bz2
|
from urllib.parse import urlparse
|
from urllib.request import urlopen
|
from urllib.error import HTTPError,URLError
|
from urllib.request import Request
|
from pathlib import Path
|
import hashlib
|
from http.client import BadStatusLine
|
|
# EXIT CODES:
|
# 1 - unspecified error
|
# 2 - download uses insecure protocol
|
# 3 - was unable to find a suitable download
|
# 4 - need-hash specified but no hash was found
|
|
# NEW PARAMETERS:
|
# -n/--need-hash: Set this to tell userland-fetch to fail if it cannot find a
|
# correct hash. This also causes userland-fetch to search for
|
# and download hash files, if they are not already present in
|
# HASH_DIR. If --hash is provided this effectively does nothing.
|
#
|
# -N/--need-sig: Set this to tell userland-fetch to require a signature. This
|
# also causes userland-fetch to search for signature files. If
|
# the signature fails then the download is considered corrupted,
|
# and will be deleted unless --keep is set.
|
# This means that if the signature can't be checked, the file
|
# WILL be deleted!
|
#
|
# -c/--clobber-hash: Set this to tell userland-fetch to clobber old hash files.
|
# userland-fetch will replace hash files in HASH_DIR with their
|
# remote counterparts.
|
#
|
|
# convert environment variables to global python variables
|
def prep_envvars():
|
# This algorithm is set if it cannot be found in the filename
|
global DEFAULT_HASH_ALGO
|
DEFAULT_HASH_ALGO=os.getenv("DEFAULT_HASH_ALGO","sha256")
|
|
global DEFAULT_HASH_FILES
|
try:
|
DEFAULT_HASH_FILES=[ x for x in os.environ["DEFAULT_HASH_FILES"].split(" ") if x ]
|
except KeyError:
|
DEFAULT_HASH_FILES=["SHA256SUMS","sha256sums.txt"]
|
|
global HASH_DIR
|
try:
|
HASH_DIR = os.path.realpath(os.environ["HASH_DIR"])
|
except KeyError:
|
# set after getting cmdline args
|
HASH_DIR = None
|
|
global SECURE_PROTOCOLS
|
try:
|
SECURE_PROTOCOLS=["UNCHECKED"]+[ x for x in os.environ["SECURE_PROTOCOLS"].split(" ") if x ]
|
except KeyError:
|
SECURE_PROTOCOLS=["UNCHECKED","https"]
|
|
global SIGNATURE_EXTENSIONS
|
try:
|
SIGNATURE_EXTENSIONS=[ x for x in os.environ["SIGNATURE_EXTENSIONS"].split(" ") if x ]
|
except KeyError:
|
SIGNATURE_EXTENSIONS=["sig","asc"]
|
|
global ALLOW_UNVERIFIED_DOWNLOADS
|
try:
|
ALLOW_UNVERIFIED_DOWNLOADS = os.environ["ALLOW_UNVERIFIED_DOWNLOADS"] == 'yes'
|
except KeyError:
|
ALLOW_UNVERIFIED_DOWNLOADS = False
|
|
LOCAL_SCHEMES = [None, 'file','']
|
REMOTE_SCHEMES = ['https','http','ftp']
|
|
def printIOError(e, txt):
|
""" Function to decode and print IOError type exception """
|
print("I/O Error: " + txt + ": ")
|
try:
|
(code, message) = e
|
print(str(message) + " (" + str(code) + ")")
|
except:
|
print(str(e))
|
|
# TODO: refactor this so there aren't any global variables
|
VALIDATE_ERROR=""
|
VALIDATE_CODE=-1
|
def validate_signature(path, signature):
|
"""Given paths to a file and a detached PGP signature, verify that
|
the signature is valid for the file. Current configuration allows for
|
unrecognized keys to be downloaded as necessary."""
|
|
# Find the root of the repo so that we can point GnuPG at the right
|
# configuration and keyring.
|
proc = subprocess.Popen(["git", "rev-parse", "--show-toplevel"], stdout=subprocess.PIPE,
|
universal_newlines=True)
|
proc.wait()
|
if proc.returncode != 0:
|
return False
|
out, err = proc.communicate()
|
gpgdir = os.path.join(out.strip(), "tools", ".gnupg")
|
|
# Skip the permissions warning: none of the information here is private,
|
# so not having to worry about getting git keeping the directory
|
# unreadable is just simplest.
|
try:
|
proc = subprocess.Popen(["gpg2", "--verify",
|
"--no-permission-warning", "--homedir", gpgdir, signature,
|
path], stdin=open("/dev/null"),
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
universal_newlines=True)
|
except OSError as e:
|
# If the executable simply couldn't be found, just skip the
|
# validation.
|
if e.errno == errno.ENOENT:
|
return False
|
raise
|
|
proc.wait()
|
global VALIDATE_CODE
|
VALIDATE_CODE = proc.returncode
|
if proc.returncode != 0:
|
# Only print GnuPG's output when there was a problem.
|
# Make this a global variable so we can output it somewhere tidy.
|
global VALIDATE_ERROR
|
VALIDATE_ERROR=proc.stdout.read()
|
return False
|
return True
|
|
|
def validate(file, hash):
|
"""Given a file-like object and a hash string, verify that the hash
|
matches the file contents."""
|
|
try:
|
algorithm, hashvalue = hash.split(':')
|
except:
|
algorithm = DEFAULT_HASH_ALGO
|
|
# force migration away from sha1
|
if algorithm == "sha1":
|
algorithm = DEFAULT_HASH_ALGO
|
|
try:
|
m = hashlib.new(algorithm)
|
except ValueError:
|
print("Unable to generate hashlib instance for",algorithm)
|
return False
|
|
try:
|
block = file.read()
|
m.update(block)
|
return "%s:%s" % (algorithm, m.hexdigest())
|
except IOError as err:
|
print(str(err), end=' ')
|
except EOFError as err:
|
print(str(err), end=' ')
|
|
return "%s:" % (algorithm)
|
|
|
def validate_container(filename, hash):
|
"""Given a file path and a hash string, verify that the hash matches the
|
file contents."""
|
|
try:
|
file = open(filename, 'rb')
|
except IOError as e:
|
printIOError(e, "Can't open file " + filename)
|
return False
|
return validate(file, hash)
|
|
|
def validate_payload(filename, hash):
|
"""Given a file path and a hash string, verify that the hash matches the
|
payload (uncompressed content) of the file."""
|
|
expr_bz = re.compile('.+\.bz2?$', re.IGNORECASE)
|
expr_gz = re.compile('.+\.gz$', re.IGNORECASE)
|
expr_tgz = re.compile('.+\.tgz$', re.IGNORECASE)
|
expr_tbz = re.compile('.+\.tbz2?$', re.IGNORECASE)
|
|
try:
|
if expr_bz.match(filename):
|
file = bz2.BZ2File(filename, 'rb')
|
elif expr_gz.match(filename):
|
file = gzip.GzipFile(filename, 'rb')
|
elif expr_tgz.match(filename):
|
file = gzip.GzipFile(filename, 'rb')
|
elif expr_tbz.match(filename):
|
file = bz2.GzipFile(filename, 'rb')
|
else:
|
return False
|
except IOError as e:
|
printIOError(e, "Can't open archive " + filename)
|
return False
|
return validate(file, hash)
|
|
|
def download(url, filename=None, user_agent_arg=None, quiet=None,allow_partial=True):
|
"""Download the content at the given URL to the given filename
|
(defaulting to the basename of the URL if not given. If 'quiet' is
|
True, throw away any error messages. Returns the name of the file to
|
which the content was donloaded."""
|
retval = None
|
try:
|
req = Request(url,method="HEAD")
|
if user_agent_arg is not None:
|
req.add_header("User-Agent", user_agent_arg)
|
if filename is None:
|
filename = req.get_full_url().split('/')[-1]
|
retry = 1
|
dl = 0
|
i = urlopen(req)
|
if 'transfer-encoding' in i.headers and i.headers['transfer-encoding'] == 'chunked':
|
length = 0
|
if not quiet:
|
print("length unknown (streamed/chunked)")
|
else:
|
try:
|
length = int(i.headers['content-length'])
|
if not quiet:
|
print("length %i bytes" % (length))
|
except (KeyError,ValueError,TypeError):
|
length = 0
|
if not quiet:
|
print("length unknown")
|
if not 'accept-ranges' in i.headers or i.headers['accept-ranges'] != 'bytes':
|
if not quiet:
|
print("No partial download support from server")
|
allow_partial = False
|
i.close()
|
req.method = "GET"
|
# This might speed things up and keep memory usage down
|
while retry <= 3:
|
with open(filename + ".part","ab" if allow_partial else "wb") as o:
|
try:
|
# seek to end of the file if applicable
|
if allow_partial:
|
o.seek(0,2)
|
dl = o.tell()
|
if not quiet:
|
print("(Attempt %i of 3%s)..." % (retry,"; %i bytes done"%(dl) if dl else ""),end=" ")
|
if dl > 0:
|
req.add_header("Range","bytes=%i-"%(dl))
|
with urlopen(req) as i:
|
src = i.read(65536)
|
while len(src) > 0:
|
o.write(src)
|
src = i.read(65536)
|
retry = 4
|
if length > 0 and o.tell() < length:
|
if not quiet:
|
print("Download of %s stopped abruptly." % (str(url)))
|
retry += 1
|
|
except URLError as e:
|
if not quiet:
|
print("Error downloading %s at %i bytes: %s" % (str(url),dl,str(e)))
|
# if we haven't downloaded any bytes since the last URLError, cancel the download.
|
if dl > 0 and o.tell() > dl:
|
dl = o.tell()
|
retry += 1
|
req.add_header("Range","bytes=%i-"%(o.tell()+1))
|
else:
|
retry = 4
|
except HTTPError as e:
|
if not quiet:
|
print("Error downloading %s: %s" % (str(url),str(e)))
|
retry = 4
|
# return the name of the file that we downloaded the data to.
|
os.rename(filename+".part",filename)
|
retval = filename
|
except IOError as e:
|
if not quiet:
|
printIOError(e, "Can't open url " + url)
|
except BadStatusLine as e:
|
if not quiet:
|
print("Can't open url %s: server answered with code which we couldn't understand " % (url))
|
except KeyboardInterrupt:
|
print("Cancelling download...")
|
|
return retval
|
|
|
def download_paths(search, filenames, url):
|
"""Returns a list of URLs where the file 'filename' might be found,
|
using 'url', 'search', and $DOWNLOAD_SEARCH_PATH as places to look.
|
|
If 'filename' is None, then the list will simply contain 'url'."""
|
|
urls = list()
|
if type(filenames) == str:
|
filenames = [filenames]
|
|
if filenames is not None:
|
tmp = os.getenv('DOWNLOAD_SEARCH_PATH')
|
if tmp:
|
search += tmp.split(' ')
|
for filename in filenames:
|
file = os.path.basename(filename)
|
|
urls += [base + '/' + file for base in search]
|
|
# filename should always be first
|
if filename in urls:
|
urls.remove(filename)
|
urls.insert(0, filename)
|
|
# command line url is a fallback, so it's last
|
if url is not None and url not in urls:
|
parse_result = urlparse(url)
|
scheme = parse_result.scheme
|
path = parse_result.path
|
if scheme == "pypi":
|
url = pypi_url(url, os.path.basename(filename))
|
if url != None and url not in urls:
|
urls.append(url)
|
|
# last resort path
|
if filenames is not None:
|
tmp = os.getenv('DOWNLOAD_FALLBACK_PATH')
|
if tmp:
|
for filename in filenames:
|
file = os.path.basename(filename)
|
urls += [base + '/' + file for base in tmp.split(' ')]
|
|
local_urls = list()
|
remote_urls = list()
|
# sort entries by local first, then remote:
|
for entry in urls:
|
if urlparse(entry).scheme in LOCAL_SCHEMES:
|
local_urls.append(entry)
|
else:
|
remote_urls.append(entry)
|
return local_urls + remote_urls
|
|
|
def pypi_url(url, filename):
|
"""Given a pypi: URL, return the real URL for that component/version.
|
|
The pypi scheme has a host (with an empty host defaulting to
|
pypi.python.org), and a path that should be of the form
|
"component==version". Other specs could be supported, but == is the
|
only thing that makes sense in this context.
|
|
The filename argument is the name of the expected file to download, so
|
that when pypi gives us multiple archives to choose from, we can pick
|
the right one.
|
"""
|
|
|
parse_result = urlparse(url)
|
host = parse_result.netloc
|
path = parse_result.path
|
|
# We have to use ==; anything fancier would require pkg_resources, but
|
# really that's the only thing that makes sense in this context.
|
try:
|
name, version = re.match("/(.*)==(.*)$", path).groups()
|
except AttributeError:
|
print("PyPI URLs must be of the form 'pypi:///component==version'")
|
return None
|
|
if not host:
|
jsurl = "https://pypi.python.org/pypi/%s/json" % name
|
else:
|
jsurl = "https://%s/pypi/%s/json" % (host, name)
|
|
try:
|
f = urlopen(jsurl, data=None)
|
except HTTPError as e:
|
if e.getcode() == 404:
|
print("Unknown component '%s'" % name)
|
else:
|
printIOError(e, "Can't open PyPI JSON url %s" % url)
|
return None
|
except IOError as e:
|
printIOError(e, "Can't open PyPI JSON url %s" % url)
|
return None
|
content = f.read().decode("utf-8")
|
js = json.loads(content)
|
try:
|
verblock = js["releases"][version]
|
except KeyError:
|
print("Unknown version '%s'" % version)
|
return None
|
|
urls = [ d["url"] for d in verblock ]
|
for archiveurl in urls:
|
if archiveurl.endswith("/%s" % filename):
|
return archiveurl
|
|
if urls:
|
print("None of the following URLs delivers '%s':" % filename)
|
print(" " + "\n ".join(urls))
|
else:
|
print("Couldn't find any suitable URLs")
|
return None
|
|
def download_from_paths(search_list, file_arg, url, link_arg, quiet=False,get_signature=False,download_dir=None):
|
"""Attempts to download a file from a number of possible locations.
|
Generates a list of paths where the file ends up on the local
|
filesystem. This is a generator because while a download might be
|
successful, the signature or hash may not validate, and the caller may
|
want to try again from the next location. The 'link_arg' argument is a
|
boolean which, when True, specifies that if the source is not a remote
|
URL and not already found where it should be, to make a symlink to the
|
source rather than copying it."""
|
|
for url in download_paths(search_list, file_arg, url):
|
if not quiet:
|
print("Source %s..." % url, end=' ')
|
elif quiet == 2:
|
if len(url) > 53:
|
p = url[:24] + ' ... ' + url[-24:]
|
else:
|
p = url
|
print(" {:54s}".format(p), end='')
|
|
parse_result = urlparse(url)
|
scheme = parse_result.scheme
|
path = parse_result.path
|
|
|
if scheme in LOCAL_SCHEMES:
|
name = None
|
if type(file_arg) == str:
|
names = [file_arg]
|
else:
|
names = file_arg
|
notfound = False
|
for n in names:
|
# don't rename stuff - there shouldn't be a file list here anyway
|
if os.path.basename(n) != os.path.basename(url):
|
continue
|
if os.path.exists(path) is False:
|
notfound = True
|
if not quiet:
|
print("not found, skipping file copy")
|
elif quiet == 2:
|
print("{:10s}".format("-"))
|
break
|
elif n and n != path:
|
if link_arg is False:
|
if not quiet:
|
print("\n copying...")
|
shutil.copy2(path, n)
|
else:
|
if not quiet:
|
print("\n linking...")
|
os.symlink(path, n)
|
else:
|
name = n
|
if not quiet:
|
print("cached")
|
elif quiet == 2:
|
print("{:10s}".format("cached"),end="")
|
break
|
if notfound:
|
continue
|
elif scheme in REMOTE_SCHEMES:
|
if not quiet:
|
print("\n downloading...", end=' ')
|
if type(file_arg) == str:
|
name = download(url, file_arg, quiet,(scheme != 'ftp'))
|
else:
|
if not download_dir:
|
download_dir = os.curdir
|
name = download(url, os.path.join(download_dir,os.path.basename(url)),quiet,(scheme != 'ftp'))
|
if get_signature and name:
|
for ext in SIGNATURE_EXTENSIONS:
|
sig = download(url+"."+ext, name+"."+ext, quiet,(scheme != 'ftp'))
|
if sig:
|
break
|
if name is None:
|
if not quiet:
|
print("failed")
|
elif quiet == 2:
|
print("{:10s}".format("-"))
|
continue
|
else:
|
if not quiet:
|
print("ok")
|
elif quiet == 2:
|
print("{:10s}".format("fetched"),end="")
|
yield (name,url)
|
|
|
def find_hash_in_file(filename,hash_file):
|
splits = hash_file.split('.')
|
regex = re.compile('([0-9a-fA-F]+)( [ \*](.*/)?)('+os.path.basename(filename)+'$)')
|
match = re.match("(^[a-z0-9]+)(sums?(.txt)?$)",hash_file.lower())
|
if '.'.join(splits[:-1]) == filename:
|
algo = re.match('([a-zA-Z0-9]+)(sums?)',hash_file.split('.')[-1]).group(1)
|
elif match:
|
algo = match.group(1)
|
else:
|
algo = DEFAULT_HASH_ALGO
|
with open(os.path.join(HASH_DIR,hash_file),"r") as file:
|
hash_value = None
|
for line in file.readlines():
|
hash_value = regex.match(line)
|
if hash_value is not None:
|
hash_value = hash_value.group(1)
|
break
|
if hash_value is not None:
|
return "%s:%s" % (algo,hash_value)
|
return None
|
|
def find_hash_in_hash_dir(filename):
|
try:
|
hash_value = None
|
if not os.path.exists(HASH_DIR):
|
return None, None
|
for hash_file in sorted(os.listdir(HASH_DIR)):
|
splits = hash_file.split('.')
|
if '.'.join(splits[:-1]) in SIGNATURE_EXTENSIONS:
|
continue
|
hash_value = find_hash_in_file(filename,hash_file)
|
if hash_value:
|
return hash_value, hash_file
|
return None, None
|
except NotADirectoryError:
|
print(HASH_DIR,"should be a directory containing hashfiles in the",DEFAULT_HASH_ALGO+"sum","format.")
|
return (1)
|
except IsADirectoryError:
|
print(hash_file,"should be a file containing hashes, not a directory.")
|
return 1
|
def usage():
|
print("Usage: %s [-a|--user-agent (user-agent)] [-f|--file (file)] [-l|--link] " \
|
"[-k|--keep] [-h|--hash (hash)] [-n|--need-hash] [-s|--search (search-dir)] " \
|
"[-g|--get-hashes] [-G|--get-sigs] " \
|
"[-S|--sigurl (signature-url)] [-N|--need-sig] --url (url)" % (sys.argv[0].split('/')[-1]))
|
return 1
|
|
|
def main():
|
import getopt
|
sys.stdout.flush()
|
try:
|
opts, args = getopt.getopt(sys.argv[1:], "a:f:h:lks:u:GgNnc",
|
["file=", "link", "keep", "hash=", "search=", "url=","get-sigs","get-hashes",
|
"sigurl=", "user-agent=", "need-sig", "need-hash","clobber-hash"])
|
sys.exit(realmain(opts, args))
|
except getopt.GetoptError as err:
|
print(str(err))
|
usage()
|
|
def realmain(opts, args):
|
prep_envvars()
|
user_agent_arg = None
|
file_arg = None
|
link_arg = False
|
keep_arg = False
|
hash_arg = None
|
url_arg = None
|
sig_arg = None
|
need_sig = False
|
get_signature = False
|
need_hash = False
|
get_hash = False
|
clobber_hash = False
|
search_list = list()
|
retval = 1
|
|
for opt, arg in opts:
|
if opt in ["-a", "--user-agent"]:
|
user_agent_arg = arg
|
elif opt in ["-f", "--file"]:
|
file_arg = arg
|
elif opt in ["-l", "--link"]:
|
link_arg = True
|
elif opt in ["-k", "--keep"]:
|
keep_arg = True
|
elif opt in ["-h", "--hash"]:
|
hash_arg = arg
|
elif opt in ["-n", "--need-hash"]:
|
need_hash = True
|
elif opt in ["-g", "--get-hashes"]:
|
get_hash = True
|
elif opt in ["-s", "--search"]:
|
search_list.append(arg)
|
elif opt in ["-S", "--sigurl"]:
|
sig_arg = arg
|
elif opt in ["-u", "--url"]:
|
url_arg = arg
|
elif opt in ["-N", "--need-sig"]:
|
need_sig = True
|
elif opt in ["-G", "--get-sigs"]:
|
get_signature = True
|
elif opt in ["-c", "--clobber-hash"]:
|
clobber_hash = True
|
else:
|
assert False, "unknown option"
|
|
if url_arg is None:
|
if clobber_hash and len(search_list) == 0:
|
print("WARN: -c/--clobber-hash is meaningless without --search or --url. Ignoring.")
|
clobber_hash = False
|
if file_arg is None:
|
usage()
|
scheme = "UNCHECKED"
|
else:
|
parse_result = urlparse(url_arg)
|
scheme = parse_result.scheme
|
path = parse_result.path
|
if file_arg is None:
|
file_arg = os.path.realpath(os.path.join(os.curdir,os.path.basename(path)))
|
|
file_arg = os.path.realpath(file_arg)
|
filename = os.path.basename(file_arg)
|
global HASH_DIR
|
if not HASH_DIR:
|
HASH_DIR = os.path.realpath(os.path.join(os.path.dirname(file_arg),"hashes"))
|
valid_sig = False
|
|
if clobber_hash or get_hash:
|
print("Hash directory: %s [clobbering: %s]" % (HASH_DIR,str(clobber_hash)))
|
if clobber_hash:
|
HASH_DIR_ORIG = HASH_DIR
|
HASH_DIR = HASH_DIR + ".tmp"
|
try:
|
os.mkdir(HASH_DIR)
|
except FileNotFoundError:
|
print("Refusing to create %s recursively - is HASH_DIR set correctly?" % (HASH_DIR))
|
return 1
|
except FileExistsError:
|
pass
|
|
# We need to account for the following possibilities for hash files:
|
# 1: .asc with embedded checksums (1 file, needs PGP stripping)
|
# TODO: ^
|
# 2: .asc or .sig, detached from hash file (2 files)
|
# 3: checksums without signature (need a secure protocol)
|
|
print("Sourcing hash files... ",end="")
|
search_hash_files = DEFAULT_HASH_FILES + [
|
filename + '.' + DEFAULT_HASH_ALGO,
|
filename + '.' + DEFAULT_HASH_ALGO + 'sum',
|
filename + '.' + DEFAULT_HASH_ALGO + 'sums'
|
]
|
hash_file = None
|
print("\n {:54s}{:10s}{:10s}".format("URL","LOCALITY","HAS HASH"))
|
if url_arg:
|
if not search_list:
|
search_list = []
|
search_list.append(os.path.dirname(url_arg))
|
search_hash_files = [ os.path.join(HASH_DIR,x) for x in search_hash_files ]
|
for hashname, hashurl in download_from_paths(search_list, search_hash_files , None, link_arg, quiet=2,get_signature=True,download_dir=HASH_DIR):
|
scheme = urlparse(hashurl).scheme
|
safe = scheme in SECURE_PROTOCOLS or scheme in LOCAL_SCHEMES
|
valid_sig = False
|
for sigext in SIGNATURE_EXTENSIONS:
|
signame = hashname + "." + sigext
|
if os.path.exists(signame):
|
valid_sig = validate_signature(hashname,signame)
|
if not valid_sig and (not safe or need_sig):
|
print("denied (hashfile download did not meet security criteria)")
|
os.remove(hashname)
|
Path(signame).unlink(missing_ok=True)
|
continue
|
hash_arg = find_hash_in_file(filename, hashname)
|
if not hash_arg:
|
print("no")
|
else:
|
print("YES")
|
hash_file = hashname
|
break
|
if hash_file is not None:
|
print("INFO: hash found for",filename,"in",hash_file)
|
|
if clobber_hash:
|
for file in os.listdir(HASH_DIR):
|
orig_file = None
|
new_file = None
|
try:
|
orig_file = os.path.join(HASH_DIR_ORIG,os.path.basename(file))
|
new_file = os.path.join(HASH_DIR,file)
|
os.rename(new_file,orig_file)
|
except IsADirectoryError as e:
|
print("ERROR: moving hashfiles to HASH_DIR failed: %s" % (str(e)))
|
except OSError as e:
|
print("OSError: %s (%s -> %s)" %(str(e),new_file,orig_file))
|
try:
|
os.rmdir(HASH_DIR)
|
except OSError as e:
|
print("Couldn't remove %s: %s" % (HASH_DIR,str(e)))
|
HASH_DIR = HASH_DIR_ORIG
|
elif hash_arg == None:
|
hash_arg, hash_file = find_hash_in_hash_dir(filename)
|
if hash_file is not None:
|
print("INFO: hash found for",filename,"in",hash_file)
|
else:
|
print("INFO: not using any hashes in %s for" % (HASH_DIR),filename,"(overridden with --hash)")
|
|
|
|
if (hash_arg is None or hash_arg == 'none') and need_hash:
|
print("-n/--need-hash and no hash found. Exiting.")
|
return 4
|
if ALLOW_UNVERIFIED_DOWNLOADS:
|
print("WARN: ALLOW_UNVERIFIED_DOWNLOADS set.")
|
|
if sig_arg:
|
if get_signature:
|
print("INFO: not searching with -g (--sigurl provided)")
|
get_signature=False
|
for name, url in download_from_paths(search_list, file_arg, url_arg, link_arg,get_signature=get_signature):
|
scheme = urlparse(url).scheme
|
if not name:
|
print(" was not downloaded")
|
continue
|
print(" validating signature...", end=' ')
|
if valid_sig:
|
print("hashfile had valid signature")
|
else:
|
sig_file = None
|
if sig_arg:
|
if sig_arg == 'none':
|
print("skipping (--sigurl none)")
|
else:
|
print("using %s..." % sig_arg,end=' ')
|
if urlparse(sig_arg).scheme in REMOTE_SCHEMES:
|
sig_file = download(sig_arg,filename=os.path.join(os.path.dirname(name),os.path.basename(sig_arg)),quiet=True,allow_partial=False)
|
else:
|
if get_signature:
|
print("checking remote signature...",end=' ')
|
else:
|
print("checking local signature...",end=' ')
|
errors = []
|
if not sig_file:
|
for ext in SIGNATURE_EXTENSIONS:
|
if os.path.exists(name+'.'+ext):
|
sig_file = name+'.'+ext
|
break
|
if sig_file:
|
if validate_signature(name, sig_file):
|
print("VALID")
|
valid_sig = True
|
else:
|
print("invalid")
|
errors.append((sig_file,VALIDATE_CODE,VALIDATE_ERROR))
|
break
|
else:
|
print("not found")
|
|
if not valid_sig:
|
print(" signature validation failed\n")
|
bad_signature = False
|
for error in errors:
|
print("---%s output(exit code %d):\n%s---" % error)
|
if error[1] == 1:
|
bad_signature = True
|
if need_sig:
|
if not keep_arg or bad_signature:
|
print("WARN: Deleting corrupt file.")
|
try:
|
os.remove(name)
|
except OSError:
|
pass
|
print("-N/--need-sig is set. This download cannot be used.")
|
retval = 3
|
continue
|
elif not need_hash:
|
retval = 0
|
break
|
print(" validating hash...", end=' ')
|
if hash_arg and hash_arg != 'none':
|
realhash = validate_container(name, hash_arg)
|
else:
|
realhash = "skipped calculation (--hash none)"
|
hash_arg = None
|
|
if realhash == hash_arg:
|
print("ok")
|
retval = 0
|
else:
|
if hash_arg and hash_arg != 'none':
|
payloadhash = validate_payload(name, hash_arg)
|
else:
|
payloadhash = "skipped calculation (--hash none)"
|
if payloadhash == hash_arg:
|
print("ok")
|
retval = 0
|
else:
|
if not hash_arg or hash_arg == 'none':
|
scheme = urlparse(url).scheme
|
if not ALLOW_UNVERIFIED_DOWNLOADS:
|
print("ERROR: Cannot validate download (no hash or signature).")
|
if keep_arg == False:
|
try:
|
print("\nWARN: Removing the downloaded file")
|
os.remove(name)
|
except OSError:
|
pass
|
retval = 3
|
continue
|
elif scheme not in SECURE_PROTOCOLS and scheme not in LOCAL_SCHEMES:
|
print("ERROR: This download uses an insecure protocol: '%s'." % (str(scheme),))
|
if keep_arg == False:
|
try:
|
print("\nWARN: Removing the downloaded file")
|
os.remove(name)
|
except OSError:
|
pass
|
retval = 2
|
continue
|
print("ignoring errors")
|
retval = 0
|
else:
|
print("invalid hash!")
|
print(" expected: %s" % hash_arg)
|
print(" actual: %s" % realhash)
|
print(" payload: %s" % payloadhash)
|
|
if valid_sig:
|
if need_hash:
|
print("-n/--need-hash is set. This download cannot be used.")
|
if keep_arg == False:
|
try:
|
print("\nWARN: Removing the downloaded file")
|
os.remove(name)
|
except OSError:
|
pass
|
retval = 3
|
continue
|
|
# If the signature validated, then we assume
|
# that the expected hash is just a typo.
|
|
# An invalid hash shouldn't cause us to remove
|
# the target file if the signature was valid.
|
# Also, if the developer is in progress of upgrading
|
# some package version or introduces a new one, and
|
# explicitly ran "gmake fetch", keep the downloaded
|
# file (Makefile is not in position to have a valid
|
# checksum entry just yet) so it does not have to be
|
# downloaded twice.
|
retval = 0
|
else:
|
print("ERROR: This download failed to validate.")
|
if keep_arg == False:
|
try:
|
print("\nWARN: Removing the corrupt downloaded file")
|
os.remove(name)
|
except OSError:
|
pass
|
retval = 3
|
continue
|
if retval == 0:
|
break
|
return retval
|
|
if __name__ == "__main__":
|
main()
|