| |
@@ -0,0 +1,240 @@
|
| |
+ #!/usr/bin/python3
|
| |
+ # pylint: disable=invalid-name
|
| |
+
|
| |
+ """
|
| |
+ Download HTTP access logs from AWS s3 storage, parse them, increment on
|
| |
+ frontend, and clean up.
|
| |
+
|
| |
+ AWS CLI cheatsheet:
|
| |
+
|
| |
+ aws s3 ls
|
| |
+ aws s3 ls s3://fedora-copr
|
| |
+ aws s3 cp s3://fedora-copr/cloudwatch/E2PUZIRCXCOXTG.2021-12-07-15.00d7a244.gz ./
|
| |
+
|
| |
+ Token permissions are required:
|
| |
+ https://pagure.io/fedora-infrastructure/issue/10395
|
| |
+ """
|
| |
+
|
| |
+
|
| |
+ import os
|
| |
+ import argparse
|
| |
+ import logging
|
| |
+ import tempfile
|
| |
+ import gzip
|
| |
+ from datetime import datetime
|
| |
+ from socket import gethostname
|
| |
+ import boto3
|
| |
+ from copr_common.request import SafeRequest
|
| |
+ from copr_log_hitcounter import url_to_key_strings
|
| |
+ from copr_backend.helpers import BackendConfigReader, setup_script_logger
|
| |
+
|
| |
+
|
| |
+ # We will allow only this hostname to delete files from the S3 storage
|
| |
+ PRODUCTION_HOSTNAME = "copr-be.aws.fedoraproject.org"
|
| |
+
|
| |
+
|
| |
+ log = logging.getLogger(__name__)
|
| |
+ setup_script_logger(log, "/var/log/copr-backend/hitcounter-s3.log")
|
| |
+
|
| |
+
|
| |
+ class S3Bucket:
|
| |
+ """
|
| |
+ A high-level interface for interacting with files in the AWS s3 buckets
|
| |
+ """
|
| |
+
|
| |
+ def __init__(self, bucket=None, directory=None, dry_run=False):
|
| |
+ self.s3 = boto3.client("s3")
|
| |
+ self.bucket = bucket or "fedora-copr"
|
| |
+ self.directory = directory or "cloudwatch/"
|
| |
+ self.dry_run = dry_run
|
| |
+
|
| |
+ def list_files(self):
|
| |
+ """
|
| |
+ List all files within our AWS s3 bucket
|
| |
+ """
|
| |
+ objects = self.s3.list_objects(
|
| |
+ Bucket=self.bucket,
|
| |
+ Prefix=self.directory)
|
| |
+ return [x["Key"] for x in objects["Contents"]]
|
| |
+
|
| |
+ def download_file(self, s3file, dstdir):
|
| |
+ """
|
| |
+ Download a file from AWS s3 bucket
|
| |
+ """
|
| |
+ dst = os.path.join(dstdir, os.path.basename(s3file))
|
| |
+ self.s3.download_file(self.bucket, s3file, dst)
|
| |
+ return dst
|
| |
+
|
| |
+ def delete_file(self, s3file):
|
| |
+ """
|
| |
+ Delete a file from AWS s3 bucket
|
| |
+ """
|
| |
+ # Refusing to delete anything from development instances. We don't have
|
| |
+ # separate S3 buckets / directories for production and devel so removing
|
| |
+ # a file on a devel instance would mean a data loss for production.
|
| |
+ # We will remove files only from production and make devel instances
|
| |
+ # count them incorrectly multiple times.
|
| |
+ if gethostname() != PRODUCTION_HOSTNAME:
|
| |
+ log.debug("Not deleting %s on a dev instance", s3file)
|
| |
+ return
|
| |
+ if self.dry_run:
|
| |
+ return
|
| |
+ self.s3.delete_object(Bucket=self.bucket, Key=s3file)
|
| |
+
|
| |
+
|
| |
+ def gunzip(path):
|
| |
+ """
|
| |
+ Take a .gz file and uncompress it in the same directory
|
| |
+ """
|
| |
+ with gzip.open(path, "rb") as src:
|
| |
+ with open(path.rstrip(".gz"), "w") as dst:
|
| |
+ dst.write(src.read().decode("utf-8"))
|
| |
+ return dst.name
|
| |
+
|
| |
+
|
| |
+ def parse_access_file(path):
|
| |
+ """
|
| |
+ Take a raw access file and return its contents as a list of dicts.
|
| |
+ """
|
| |
+ with open(path, "r") as fd:
|
| |
+ content = fd.readlines()
|
| |
+
|
| |
+ # The file starts with meta information and thanks to #Fields, we know what
|
| |
+ # each column means.
|
| |
+ assert content[0].startswith("#Version:")
|
| |
+ assert content[1].startswith("#Fields:")
|
| |
+ keys = content[1].lstrip("#Fields:").split()
|
| |
+
|
| |
+ accesses = []
|
| |
+ for line in content[2:]:
|
| |
+ # Make sure we are not parsing any more meta information
|
| |
+ assert not line.startswith("#")
|
| |
+
|
| |
+ # Combine field names and this row values to create a dict
|
| |
+ values = line.split()
|
| |
+ access = dict(zip(keys, values))
|
| |
+ accesses.append(access)
|
| |
+
|
| |
+ return accesses
|
| |
+
|
| |
+
|
| |
+ def get_hit_data(accesses):
|
| |
+ """
|
| |
+ Prepare body for the frontend request in the same format that
|
| |
+ copr_log_hitcounter.py does.
|
| |
+ """
|
| |
+ hits = {}
|
| |
+ timestamps = []
|
| |
+ for access in accesses:
|
| |
+ url = access["cs-uri-stem"]
|
| |
+ key_strings = url_to_key_strings(url)
|
| |
+
|
| |
+ # We don't want to count every accessed URL, only those pointing to
|
| |
+ # RPM files and repo file
|
| |
+ if not key_strings:
|
| |
+ log.debug("Skipping: %s", url)
|
| |
+ continue
|
| |
+
|
| |
+ log.debug("Processing: %s", url)
|
| |
+
|
| |
+ # When counting RPM access, we want to iterate both project hits and
|
| |
+ # chroot hits. That way we can get multiple `key_strings` for one URL
|
| |
+ for key_str in key_strings:
|
| |
+ hits.setdefault(key_str, 0)
|
| |
+ hits[key_str] += 1
|
| |
+
|
| |
+ # Remember this access timestamp
|
| |
+ datetime_format = "%Y-%m-%d %H:%M:%S"
|
| |
+ datetime_string = "{0} {1}".format(access["date"], access["time"])
|
| |
+ datetime_object = datetime.strptime(datetime_string, datetime_format)
|
| |
+ timestamps.append(int(datetime_object.timestamp()))
|
| |
+
|
| |
+ return {
|
| |
+ "ts_from": min(timestamps),
|
| |
+ "ts_to": max(timestamps),
|
| |
+ "hits": hits,
|
| |
+ } if hits else {}
|
| |
+
|
| |
+
|
| |
+ def update_frontend(accesses, dry_run=False):
|
| |
+ """
|
| |
+ Increment frontend statistics based on these `accesses`
|
| |
+ """
|
| |
+ result = get_hit_data(accesses)
|
| |
+ if not result:
|
| |
+ log.debug("No recognizable hits among these accesses, skipping.")
|
| |
+ return
|
| |
+
|
| |
+ log.info(
|
| |
+ "Sending: %i results from %i to %i",
|
| |
+ len(result["hits"]),
|
| |
+ result["ts_from"],
|
| |
+ result["ts_to"]
|
| |
+ )
|
| |
+ log.debug("Hits: %s", result["hits"])
|
| |
+
|
| |
+ opts = BackendConfigReader().read()
|
| |
+ url = os.path.join(
|
| |
+ opts.frontend_base_url,
|
| |
+ "stats_rcv",
|
| |
+ "from_backend",
|
| |
+ )
|
| |
+ if not dry_run:
|
| |
+ SafeRequest(auth=opts.frontend_auth).post(url, result)
|
| |
+
|
| |
+
|
| |
+ def get_arg_parser():
|
| |
+ """
|
| |
+ Generate argument parser for this script
|
| |
+ """
|
| |
+ name = os.path.basename(__file__)
|
| |
+ description = (
|
| |
+ "Download HTTP access logs from AWS s3 storage, parse them, increment "
|
| |
+ "on frontend, and clean up."
|
| |
+ )
|
| |
+ parser = argparse.ArgumentParser(name, description=description)
|
| |
+ parser.add_argument(
|
| |
+ "--dry-run",
|
| |
+ action="store_true",
|
| |
+ help=("Do not perform any destructive changes, only print what "
|
| |
+ "would happen"))
|
| |
+ parser.add_argument(
|
| |
+ "--verbose",
|
| |
+ action="store_true",
|
| |
+ help=("Print verbose information about what is going on"))
|
| |
+ return parser
|
| |
+
|
| |
+
|
| |
+ def main():
|
| |
+ """
|
| |
+ Main function
|
| |
+ """
|
| |
+ parser = get_arg_parser()
|
| |
+ args = parser.parse_args()
|
| |
+ tmp = tempfile.mkdtemp(prefix="copr-aws-s3-hitcounter-")
|
| |
+
|
| |
+ if args.verbose:
|
| |
+ log.setLevel(logging.DEBUG)
|
| |
+
|
| |
+ s3 = S3Bucket(dry_run=args.dry_run)
|
| |
+ for s3file in s3.list_files():
|
| |
+ gz = s3.download_file(s3file, dstdir=tmp)
|
| |
+ raw = gunzip(gz)
|
| |
+ accesses = parse_access_file(raw)
|
| |
+
|
| |
+ # Maybe we want to use some locking or transaction mechanism to avoid
|
| |
+ # a scenario when we increment the accesses on the frontend but then
|
| |
+ # leave the s3 file untouched, which would result in parsing and
|
| |
+ # incrementing from the same file again in the next run
|
| |
+ update_frontend(accesses, dry_run=args.dry_run)
|
| |
+ s3.delete_file(s3file)
|
| |
+
|
| |
+ # Clean all temporary files
|
| |
+ for path in [gz, raw]:
|
| |
+ os.remove(path)
|
| |
+
|
| |
+ os.removedirs(tmp)
|
| |
+
|
| |
+
|
| |
+ if __name__ == "__main__":
|
| |
+ main()
|
| |
Download HTTP access logs from AWS s3 storage, parse them, increment
on frontend, and clean up.