From 70bd17628e7a750130224c003f2d31de72007755 Mon Sep 17 00:00:00 2001 From: Michael Simacek Date: Jul 21 2017 09:00:44 +0000 Subject: [resolver] Avoid race conditions When sqla is building query results and sees an object that it already has in its session, it returns that object ignoring the actual row from query result. With that it tries to emulate repeatable read level of isolation. But in koschei, we sometimes need read committed isolation level in order to achieve mutual exclusion of side-effects (fedmsg, build scheduling). Expiring the objects in resolver forces their refresh, avoiding race conditions with scheduler (like described in #100 and #158). --- diff --git a/koschei/backend/services/resolver.py b/koschei/backend/services/resolver.py index e6d21ca..e65b76a 100644 --- a/koschei/backend/services/resolver.py +++ b/koschei/backend/services/resolver.py @@ -50,7 +50,7 @@ total_time = Stopwatch("Total repo generation") DepTuple = namedtuple('DepTuple', ['id', 'name', 'epoch', 'version', 'release', 'arch']) ResolutionOutput = namedtuple('ResolutionOutput', - ['package_id', 'prev_resolved', 'resolved', + ['package', 'prev_resolved', 'resolved', 'problems', 'changes', 'last_build_id']) @@ -249,7 +249,7 @@ class Resolver(Service): chunk format: [ ResolutionOutput( - package_id=123, + package=Package(...), prev_resolved=False, resolved=True, # current resolution status changes=[dict(...), ...], # dependency changes in dict form @@ -262,16 +262,21 @@ class Resolver(Service): if not chunk: return - package_ids = [p.package_id for p in chunk] + package_ids = [p.package.id for p in chunk] - # get and lock the packages to be updated - packages = { - p.id: p for p in self.db.query(Package) + # expire packages, so that we get the packages we locked, not old + # version in sqla cache + for p in chunk: + self.db.expire(p.package) + + # lock the packages to be updated + ( + self.db.query(Package.id) .filter(Package.id.in_(package_ids)) .order_by(Package.id) # ordering to prevent deadlocks .with_lockmode('update') .all() - } + ) # find latest resolution problems to be compared for change previous_problems = { @@ -299,8 +304,7 @@ class Resolver(Service): # update packages, queue resolution results, changes and problems for insertion for pkg_result in chunk: - package_id = pkg_result.package_id - package = packages[package_id] + package = pkg_result.package if pkg_result.last_build_id != package.last_build_id: # there was a build submitted/registered in the meantime, @@ -321,7 +325,7 @@ class Resolver(Service): ) if prev_state != new_state: # queue for fedmsg sending after commit - state_changes[package_id] = prev_state, new_state + state_changes[package.id] = prev_state, new_state dependency_changes += pkg_result.changes @@ -333,11 +337,11 @@ class Resolver(Service): pkg_result.resolved is False and pkg_result.prev_resolved is False and # both are sets, they can be compared directly - pkg_result.problems != previous_problems.get(package_id) + pkg_result.problems != previous_problems.get(package.id) ) ): resolution_change = ResolutionChange( - package_id=package_id, + package_id=package.id, resolved=pkg_result.resolved, ) self.db.add(resolution_change) @@ -410,7 +414,7 @@ class Resolver(Service): prev_deps, curr_deps, package_id=package.id, prev_build_id=prev_build.id) results.append(ResolutionOutput( - package_id=package.id, + package=package, prev_resolved=package.resolved, resolved=resolved, problems=set(curr_problems),