| |
@@ -64,7 +64,7 @@
|
| |
|
| |
class ManagedRepo(object):
|
| |
|
| |
- def __init__(self, manager, data):
|
| |
+ def __init__(self, manager, data, repodata=None):
|
| |
self.manager = manager
|
| |
self.session = manager.session
|
| |
self.options = manager.options
|
| |
@@ -81,7 +81,7 @@
|
| |
self.expire_ts = None
|
| |
if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']:
|
| |
self.current = False
|
| |
- self.expire_ts = time.time()
|
| |
+ self._find_expire_time(repodata)
|
| |
# TODO use hub data to find the actual expiration time
|
| |
self.first_seen = time.time()
|
| |
if self.current:
|
| |
@@ -92,6 +92,17 @@
|
| |
tags[x['parent_id']] = 1
|
| |
self.taglist = to_list(tags.keys())
|
| |
|
| |
+ def _find_expire_time(self, repodata):
|
| |
+ # find all newer repos for same tag and set oldest as expire_ts for our repo
|
| |
+ if repodata:
|
| |
+ repos = [r for r in repodata if
|
| |
+ r['tag_id'] == self.tag_id and r['create_event'] > self.event_id]
|
| |
+ if repos:
|
| |
+ invalidated_by = sorted(repos, key=lambda x: x['create_event'])
|
| |
+ self.expire_ts = invalidated_by[0]['create_ts']
|
| |
+ if not self.expire_ts:
|
| |
+ self.expire_ts = time.time()
|
| |
+
|
| |
@property
|
| |
def dist(self):
|
| |
# TODO: remove this indirection once we can rely on the hub to return
|
| |
@@ -180,7 +191,7 @@
|
| |
age = time.time() - max(times)
|
| |
return age > timeout
|
| |
|
| |
- def tryDelete(self):
|
| |
+ def tryDelete(self, logger):
|
| |
"""Remove the repo from disk, if possible"""
|
| |
path = self.get_path()
|
| |
if not path:
|
| |
@@ -199,52 +210,55 @@
|
| |
if e.errno == 2:
|
| |
# No such file or directory, so the repo either never existed,
|
| |
# or has already been deleted, so allow it to be marked deleted.
|
| |
- self.logger.info("Repo directory does not exist: %s" % path)
|
| |
+ logger.info("Repo directory does not exist: %s" % path)
|
| |
pass
|
| |
else:
|
| |
- self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
|
| |
+ logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror))
|
| |
return False
|
| |
else:
|
| |
times = [self.event_ts, mtime, self.expire_ts]
|
| |
times = [ts for ts in times if ts is not None]
|
| |
age = time.time() - max(times)
|
| |
- self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
|
| |
+ logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age)
|
| |
if age < lifetime:
|
| |
return False
|
| |
- self.logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
| |
+ logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
| |
if self.state != koji.REPO_EXPIRED:
|
| |
raise koji.GenericError("Repo not expired")
|
| |
if self.session.repoDelete(self.repo_id) > 0:
|
| |
# cannot delete, we are referenced by a buildroot
|
| |
- self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
| |
+ logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
| |
return False
|
| |
- self.logger.info("Deleted repo %s" % self.repo_id)
|
| |
+ logger.info("Deleted repo %s" % self.repo_id)
|
| |
self.state = koji.REPO_DELETED
|
| |
if os.path.islink(path):
|
| |
# expected for repos on other volumes
|
| |
info = self.get_info()
|
| |
if not os.path.exists(path):
|
| |
- self.logger.error('Repo volume link broken: %s', path)
|
| |
+ logger.error('Repo volume link broken: %s', path)
|
| |
return False
|
| |
if not info or 'volume' not in info:
|
| |
- self.logger.error('Missing repo.json in %s', path)
|
| |
+ logger.error('Missing repo.json in %s', path)
|
| |
return False
|
| |
realpath = self.get_path(volume=info['volume'])
|
| |
if not os.path.exists(realpath):
|
| |
- self.logger.error('Repo real path missing: %s', realpath)
|
| |
+ logger.error('Repo real path missing: %s', realpath)
|
| |
return False
|
| |
if not os.path.samefile(path, realpath):
|
| |
- self.logger.error('Incorrect volume link: %s', path)
|
| |
+ logger.error('Incorrect volume link: %s', path)
|
| |
return False
|
| |
# ok, try to remove the symlink
|
| |
try:
|
| |
os.unlink(path)
|
| |
except OSError:
|
| |
- self.logger.error('Unable to remove volume link: %s', path)
|
| |
- # and remove the real path
|
| |
- self.manager.rmtree(realpath)
|
| |
+ logger.error('Unable to remove volume link: %s', path)
|
| |
else:
|
| |
- self.manager.rmtree(path)
|
| |
+ realpath = path
|
| |
+ try:
|
| |
+ rmtree(realpath)
|
| |
+ except BaseException:
|
| |
+ logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
| |
+
|
| |
return True
|
| |
|
| |
def ready(self):
|
| |
@@ -371,7 +385,7 @@
|
| |
else:
|
| |
self.logger.info('Found repo %s, state=%s'
|
| |
% (repo_id, koji.REPO_STATES[data['state']]))
|
| |
- repo = ManagedRepo(self, data)
|
| |
+ repo = ManagedRepo(self, data, repodata)
|
| |
self.repos[repo_id] = repo
|
| |
if not getTag(self.session, repo.tag_id) and not repo.expired():
|
| |
self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id)
|
| |
@@ -519,6 +533,21 @@
|
| |
finally:
|
| |
session.logout()
|
| |
|
| |
+ def deleteLoop(self, session):
|
| |
+ """Triggers regens as needed/possible. Runs in a separate thread"""
|
| |
+ self.session = session
|
| |
+ self.delete_logger = logging.getLogger("koji.repo.delete")
|
| |
+ self.delete_logger.info('deleteLoop starting')
|
| |
+ try:
|
| |
+ while True:
|
| |
+ self.deleteRepos()
|
| |
+ time.sleep(self.options.sleeptime)
|
| |
+ except Exception:
|
| |
+ self.delete_logger.exception('Error in delete thread')
|
| |
+ raise
|
| |
+ finally:
|
| |
+ session.logout()
|
| |
+
|
| |
def pruneLocalRepos(self):
|
| |
for volinfo in self.session.listVolumes():
|
| |
volumedir = pathinfo.volumedir(volinfo['name'])
|
| |
@@ -689,17 +718,6 @@
|
| |
time_expired = time.time() - tag['expire_ts']
|
| |
f.write(fmt % (tag['taginfo']['name'], int(time_expired), int(tag['score'])))
|
| |
|
| |
- # trigger deletes
|
| |
- n_deletes = 0
|
| |
- for repo in to_list(self.repos.values()):
|
| |
- if n_deletes >= self.options.delete_batch_size:
|
| |
- break
|
| |
- if repo.expired():
|
| |
- # try to delete
|
| |
- if repo.tryDelete():
|
| |
- n_deletes += 1
|
| |
- del self.repos[repo.repo_id]
|
| |
-
|
| |
def checkTasks(self):
|
| |
"""Check on newRepo tasks
|
| |
|
| |
@@ -850,12 +868,12 @@
|
| |
order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True)
|
| |
for tag in order:
|
| |
if running_tasks >= self.options.max_repo_tasks:
|
| |
- self.logger.debug("Running tasks (%s): %s" % list(running_tasks))
|
| |
+ self.logger.debug("Running tasks (%s): %s" % (running_tasks, list(self.tasks)))
|
| |
self.logger.info("Maximum number of repo tasks reached")
|
| |
return
|
| |
elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit:
|
| |
- self.logger.debug("Tracked tasks (%s): %s" % list(self.tasks))
|
| |
- self.logger.debug("Untracked tasks (%s): %s" % list(self.other_tasks))
|
| |
+ self.logger.debug("Tracked tasks (%s): %s" % (len(self.tasks), list(self.tasks)))
|
| |
+ self.logger.debug("Untracked tasks (%s): %s" % (len(self.other_tasks), list(self.other_tasks)))
|
| |
self.logger.info("Repo task limit reached")
|
| |
return
|
| |
tagname = tag['taginfo']['name']
|
| |
@@ -905,6 +923,18 @@
|
| |
if running_tasks_maven >= self.options.max_repo_tasks_maven:
|
| |
self.logger.info("Maximum number of maven repo tasks reached")
|
| |
|
| |
+ def deleteRepos(self):
|
| |
+ # trigger deletes
|
| |
+ self.delete_logger.debug("Starting delete repos")
|
| |
+ n = 0
|
| |
+ for repo in to_list(self.repos.values()):
|
| |
+ if repo.expired():
|
| |
+ # try to delete
|
| |
+ if repo.tryDelete(self.delete_logger):
|
| |
+ del self.repos[repo.repo_id]
|
| |
+ n += 1
|
| |
+ self.delete_logger.debug("Ending delete repos (deleted: %s)" % n)
|
| |
+
|
| |
|
| |
def start_currency_checker(session, repomgr):
|
| |
subsession = session.subsession()
|
| |
@@ -933,6 +963,15 @@
|
| |
return thread
|
| |
|
| |
|
| |
+ def start_delete_loop(session, repomgr):
|
| |
+ subsession = session.subsession()
|
| |
+ thread = threading.Thread(name='deleteLoop',
|
| |
+ target=repomgr.deleteLoop, args=(subsession,))
|
| |
+ thread.setDaemon(True)
|
| |
+ thread.start()
|
| |
+ return thread
|
| |
+
|
| |
+
|
| |
def main(options, session):
|
| |
repomgr = RepoManager(options, session)
|
| |
repomgr.readCurrentRepos()
|
| |
@@ -944,6 +983,7 @@
|
| |
if options.check_external_repos:
|
| |
curr_ext_chk_thread = start_external_currency_checker(session, repomgr)
|
| |
regen_thread = start_regen_loop(session, repomgr)
|
| |
+ delete_thread = start_delete_loop(session, repomgr)
|
| |
# TODO also move rmtree jobs to threads
|
| |
logger.info("Entering main loop")
|
| |
while True:
|
| |
@@ -961,6 +1001,9 @@
|
| |
if not regen_thread.is_alive():
|
| |
logger.error("Regeneration thread died. Restarting it.")
|
| |
regen_thread = start_regen_loop(session, repomgr)
|
| |
+ if not delete_thread.is_alive():
|
| |
+ logger.error("Delete thread died. Restarting it.")
|
| |
+ delete_thread = start_delete_loop(session, repomgr)
|
| |
except KeyboardInterrupt:
|
| |
logger.warning("User exit")
|
| |
break
|
| |
Fixes: https://pagure.io/koji/issue/2336