From de04a41093ab43e4d7d96eb542b0ff3842e99c7d Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Nov 02 2015 16:21:27 +0000 Subject: Fancy updater. This compares the contents of the sqlite dbs after we download them to try and determine what changed, and then it publishes that information to fedmsg. --- diff --git a/mdapi-get_repo_md b/mdapi-get_repo_md index 5d43fc6..3131257 100644 --- a/mdapi-get_repo_md +++ b/mdapi-get_repo_md @@ -42,11 +42,14 @@ import os import shutil import tempfile import hashlib - import xml.etree.ElementTree as ET +import fedmsg import requests +from sqlalchemy import text + +import mdapi.lib as mdapilib import mdapi.file_lock as file_lock KOJI_REPO = 'https://kojipkgs.fedoraproject.org/repos/' @@ -59,6 +62,11 @@ repomd_xml_namespace = { padding = 22 +fedmsg.init( + active=True, + cert_prefix='mdapi', +) + def list_branches(status='Active'): ''' Return the list of Fedora branches corresponding to the given @@ -70,8 +78,16 @@ def list_branches(status='Active'): return data['collections'] -def decompress_primary_db(archive, location): +def download_db(name, repomd_url, archive): + print('%s Downloading file: %s' % (name.ljust(padding), repomd_url)) + response = requests.get(repomd_url) + with open(archive, 'wb') as stream: + stream.write(response.content) + + +def decompress_db(name, archive, location): ''' Decompress the given XZ archive at the specified location. ''' + print('%s Extracting %s to %s' % (name.ljust(padding), archive, location)) if archive.endswith('.xz'): import lzma with contextlib.closing(lzma.LZMAFile(archive)) as stream_xz: @@ -91,11 +107,144 @@ def decompress_primary_db(archive, location): bzar = bz2.BZ2File(archive) out.write(bzar.read()) bzar.close() - elif archive.endswith('.sqlite'): - with file_lock.FileFlock(location + '.lock'): - with open(location, 'wb') as out: - with open(archive) as inp: - out.write(inp.read()) + else: + raise NotImplementedError(archive) + + +def compare_dbs(name, db1, db2, cache1, cache2): + print('%s Comparing %s and %s' % (name.ljust(padding), db1, db2)) + + def get_table_names(uri): + with file_lock.FileFlock(uri + '.lock'): + with mdapilib.session_manager('sqlite:///' + uri) as session: + for name in session.connection().engine.table_names(): + if name == 'db_info': + continue + yield name + + def get_all_rows(uri, table, cache): + queries = { + 'conflicts': relations_query, + 'enhances': relations_query, + 'obsoletes': relations_query, + 'provides': relations_query, + 'requires': relations_query, + 'supplements': relations_query, + 'recommends': relations_query, + 'suggests': relations_query, + 'files': files_query, + 'packages': packages_query, + 'changelog': changelog_query, + 'filelist': filelist_query, + } + query = text(queries.get(table, default_query).format(table=table)) + with file_lock.FileFlock(uri + '.lock'): + with mdapilib.session_manager('sqlite:///' + uri) as session: + engine = session.connection().engine + for i, row in enumerate(engine.execute(query)): + if table in cache_dependant_tables: + row = list(row) # lists support item assignment + row[0] = cache[row[0]] + + yield tuple(row) + + + def build_cache(uri, cache): + query = text(packages_cache_builder.format(table=table)) + with file_lock.FileFlock(uri + '.lock'): + with mdapilib.session_manager('sqlite:///' + uri) as session: + engine = session.connection().engine + for pkgId, pkgname in engine.execute(query): + cache[pkgId] = pkgname + + tables1 = list(get_table_names(db1)) + tables2 = list(get_table_names(db2)) + + if not tables1 and not tables2: + raise RuntimeError("Something is very very wrong.") + + if not tables2: + # We have never downloaded this before... + # so we have nothing to compare it against. Just return and say there + # are "no differences". + print('%s Empty! %s Cannot compare.' % (name.ljust(padding), db2)) + return {} + + assert len(tables1) == len(tables2), "Cannot compare disparate dbs." + # These should be the same + tables = tables1 = tables2 + + # These two tables have a primary key reference to a table *in another + # database*. Consequently, we have to use an in-memory cache to do the + # "join" ourselves. Specifically, we need to swap out pkgId for pkg name. + cache_dependant_tables = ['filelist', 'changelog'] + # This table produces the cache. + cache_producing_tables = ['packages'] + + # Prune out some squirrelly tables we're not going to worry about. + ignored_db_tables = [ + # The 'packages' table in the 'filelists' db is full of primary keys + # and is prone to false positives. + ('filelists', 'packages'), + # Same goes for the 'packages' table in the 'other' db. + ('other', 'packages'), + ] + def should_compare(table): + for test, target in ignored_db_tables: + if test in db1 and table == target: + return False + return True + + tables = [table for table in tables if should_compare(table)] + + # Finally, compare the contents of both tables and return a diff + results = {} + for table in tables: + if table in cache_producing_tables: + build_cache(db1, cache1) + build_cache(db2, cache2) + rows1 = set(list(get_all_rows(db1, table, cache1))) + rows2 = set(list(get_all_rows(db2, table, cache2))) + results[table] = { + 'added': rows1 - rows2, + 'removed': rows2 - rows1, + } + + return results + + +def publish_changes(name, differences): + print('%s Publishing differences to fedmsg:' % (name.ljust(padding))) + + change = False + for table, details in differences.items(): + print("%s %s, %i added, %i removed." % ( + name.ljust(padding), table, + len(details['added']), len(details['removed']) + )) + # If anything changed in any table, flip this flag to True + change = change or bool(details['added']) or bool(details['removed']) + + if not change: + print('%s No real changes. Skipping fedmsg.' % (name.ljust(padding))) + + #import pprint; pprint.pprint(differences) + + fedmsg.publish( + modname='mdapi', + topic='repo.update', + msg=dict( + name=name, + differences=differences, + ) + ) + + +def install_db(name, src, dest): + print('%s Installing %s to %s.' % (name.ljust(padding), src, dest)) + with file_lock.FileFlock(dest + '.lock'): + shutil.move(src, dest) + def needs_update(local_file, remote_sha, sha_type): ''' Compare sha of a local and remote file. @@ -150,7 +299,14 @@ def process_repo(tupl): # Filter down to only sqlite dbs files = ((f, s, t) for f, s, t in files if '.sqlite' in f) - working_dir = tempfile.mkdtemp(prefix='mdapi-') + # We need to ensure the primary db comes first so we can build a pkey cache + primary_first = lambda item: not 'primary' in item[0] + files = sorted(files, key=primary_first) + + # Primary-key caches built from the primary dbs so we can make sense + # of the contents of the filelist and changelog dbs. + cache1, cache2 = {}, {} + for filename, shasum, shatype in files: repomd_url = url + '/' + filename @@ -163,21 +319,23 @@ def process_repo(tupl): elif 'other.sqlite' in filename: db = 'mdapi-%s-other.sqlite' % name + # Have we downloaded this before? Did it change? destfile = os.path.join(destfolder, db) if not needs_update(destfile, shasum, shatype): - print('%s - No change of %s' % (name.ljust(padding), repomd_url)) + print('%s No change of %s' % (name.ljust(padding), repomd_url)) continue # If it has changed, then download it and move it into place. - print('%s - Downloading file: %s' % (name.ljust(padding), repomd_url)) - print('%s to: %s' % (name.ljust(padding), destfile)) - response = requests.get(repomd_url) - archive = os.path.join(working_dir, filename) - with open(archive, 'wb') as stream: - stream.write(response.content) - decompress_primary_db(archive, destfile) + tempargs = dict(prefix='mdapi-', dir='/var/tmp') + with tempfile.TemporaryDirectory(**tempargs) as working_dir: + tempdb = os.path.join(working_dir, db) + archive = os.path.join(working_dir, filename) + download_db(name, repomd_url, archive) + decompress_db(name, archive, tempdb) + differences = compare_dbs(name, tempdb, destfile, cache1, cache2) + publish_changes(name, differences) + install_db(name, tempdb, destfile) - shutil.rmtree(working_dir) def main(): @@ -251,14 +409,84 @@ def main(): rurl = url % version repositories.append((rurl, name)) - p = multiprocessing.Pool(10) - p.map(process_repo, itertools.product( + # In parallel + #p = multiprocessing.Pool(10) + #p.map(process_repo, itertools.product( + # [CONFIG.get('DB_FOLDER', '/var/tmp')], + # repositories) + #) + + # In serial + for t in itertools.product( [CONFIG.get('DB_FOLDER', '/var/tmp')], - repositories) - ) + repositories): + + process_repo(t) return 0 +# Some queries to help us sort out what is in the repos +relations_query = """ + SELECT + {table}.name, + {table}.flags, + {table}.epoch, + {table}.version, + {table}.release, + packages.name + FROM {table}, packages + WHERE {table}.pkgKey == packages.pkgKey; + """ + +files_query = """ + SELECT + {table}.name, + {table}.type, + packages.name + FROM {table}, packages + WHERE {table}.pkgKey == packages.pkgKey; + """ + +packages_cache_builder = """ + SELECT + {table}.pkgId, + {table}.name + FROM {table}; + """ + +packages_query = """ + SELECT + {table}.name, + {table}.version, + {table}.release, + {table}.epoch, + {table}.arch + FROM {table}; + """ + +changelog_query = """ + SELECT + packages.pkgId, + {table}.author, + {table}.date, + {table}.changelog + FROM {table}, packages + WHERE {table}.pkgKey == packages.pkgKey; + """ + +filelist_query = """ + SELECT + packages.pkgId, + {table}.dirname, + {table}.filenames, + {table}.filetypes + FROM {table}, packages + WHERE {table}.pkgKey == packages.pkgKey; + """ + +default_query = "SELECT * from {table};" + + if __name__ == '__main__': main() diff --git a/mdapi/lib.py b/mdapi/lib.py index 0f8ac6b..3c14649 100644 --- a/mdapi/lib.py +++ b/mdapi/lib.py @@ -23,6 +23,7 @@ MDAPI internal API to interact with the database. ''' +import contextlib import time import sqlalchemy as sa @@ -56,6 +57,20 @@ def create_session(db_url, debug=False, pool_recycle=3600): return scopedsession +@contextlib.contextmanager +def session_manager(db_url, debug=False, pool_recycle=3600): + """ A handy context manager for our sessions. """ + session = create_session(db_url, debug=debug, pool_recycle=pool_recycle) + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() + + def get_package(session, pkg_name): ''' Return information about a package, if we can find it. ''' diff --git a/requirements.txt b/requirements.txt index eeb4a23..1eb70e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ requests simplejson sqlalchemy werkzeug +fedmsg