#1672 Don't pass SQLAlchemy objects between threads
Merged 2 years ago by breilly. Opened 3 years ago by otaylor.
otaylor/fm-orchestrator database-threading  into  master

@@ -1108,6 +1108,10 @@ 

      )

  

      @classmethod

+     def from_id(cls, db_session, component_build_id):

+         return db_session.query(cls).filter(cls.id == component_build_id).first()

+ 

+     @classmethod

      def from_component_event(cls, db_session, task_id, module_id=None):

          _filter = db_session.query(cls).filter

          if module_id is None:

@@ -2,7 +2,6 @@ 

  # SPDX-License-Identifier: MIT

  from __future__ import absolute_import

  import concurrent.futures

- import threading

  

  from module_build_service.common import conf, log, models

  from module_build_service.scheduler import events
@@ -45,20 +44,16 @@ 

      return False

  

  

- BUILD_COMPONENT_DB_SESSION_LOCK = threading.Lock()

- 

- 

- def start_build_component(db_session, builder, c):

+ def start_build_component(db_session, builder, component_build_id):

      """

      Submits single component build to builder. Called in thread

      by QueueBasedThreadPool in continue_batch_build.

- 

-     This function runs inside separate threads that share one SQLAlchemy

-     session object to update a module build state once there is something wrong

-     when one of its components is submitted to Koji to build.

      """

      import koji

  

+     # Get an object valid for this thread

+     c = models.ComponentBuild.from_id(db_session, component_build_id)

+ 

      try:

          c.task_id, c.state, c.state_reason, c.nvr = builder.build(

              artifact_name=c.package, source=c.scmurl)
@@ -66,18 +61,23 @@ 

          c.state = koji.BUILD_STATES["FAILED"]

          c.state_reason = "Failed to build artifact %s: %s" % (c.package, str(e))

          log.exception(e)

-         with BUILD_COMPONENT_DB_SESSION_LOCK:

-             c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")

-             db_session.commit()

+ 

+         c.module_build.transition(

+             db_session, conf, models.BUILD_STATES["failed"], failure_type="infra"

+         )

+         db_session.commit()

+ 

          return

  

      if not c.task_id and c.is_building:

          c.state = koji.BUILD_STATES["FAILED"]

          c.state_reason = "Failed to build artifact %s: Builder did not return task ID" % (c.package)

-         with BUILD_COMPONENT_DB_SESSION_LOCK:

-             c.module_build.transition(conf, models.BUILD_STATES["failed"], failure_type="infra")

-             db_session.commit()

-         return

+ 

+         c.module_build.transition(

+             db_session, conf, models.BUILD_STATES["failed"], failure_type="infra"

+         )

+ 

+     db_session.commit()

  

  

  def continue_batch_build(config, module, builder, components=None):
@@ -133,21 +133,31 @@ 

          c.state = koji.BUILD_STATES["BUILDING"]

          components_to_build.append(c)

  

+     # Commit to ensure threads see the most recent version of ComponentBuilds

+     db_session.commit()

+ 

      # Start build of components in this batch.

      max_workers = config.num_threads_for_build_submissions

      with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:

          futures = {

-             executor.submit(start_build_component, db_session, builder, c): c

+             executor.submit(start_build_component, db_session, builder, c.id): c

              for c in components_to_build

          }

          concurrent.futures.wait(futures)

-         # In case there has been an excepion generated directly in the

+         # In case there has been an exception generated directly in the

          # start_build_component, the future.result() will re-raise it in the

          # main thread so it is not lost.

+         #

+         # We get 'SQLite objects created in a thread can only be used in that same thread'

+         # errors in this case, because the finalizer for the connection object

+         # runs in a different thread, but the original exception is still visible.

+         #

          for future in futures:

              future.result()

  

-     db_session.commit()

+     # We need to start a new session here, or SQLite isolation keeps us from seeing

+     # changes that were done in the other threads

+     db_session.close()

  

  

  def start_next_batch_build(config, module, builder, components=None):

@@ -39,14 +39,6 @@ 

      }

      if conf.sqlalchemy_database_uri.startswith("sqlite://"):

          options.update({

-             # For local module build, MBS is actually a multi-threaded

-             # application. The command submitting a module build runs in its

-             # own thread, and the backend build workflow, implemented as a

-             # fedmsg consumer on top of fedmsg-hub, runs in separate threads.

-             # So, disable this option in order to allow accessing data which

-             # was written from another thread.

-             "connect_args": {"check_same_thread": False},

- 

              # Both local module build and running tests requires a file-based

              # SQLite database, we do not use a connection pool for these two

              # scenarios.

@@ -172,6 +172,7 @@ 

          start_next_batch_build(conf, module_build, builder)

  

          # Batch number should increase.

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

          assert module_build.batch == 2

  

          # Make sure we only have one message returned for the one reused component
@@ -218,6 +219,8 @@ 

          builder.recover_orphaned_artifact.return_value = []

          start_next_batch_build(conf, module_build, builder)

  

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should increase.

          assert module_build.batch == 2

          # No component reuse messages should be returned
@@ -233,7 +236,15 @@ 

          builder.build.side_effect = Exception("Something have gone terribly wrong")

          component = mock.MagicMock()

  

-         start_build_component(db_session, builder, component)

+         component = models.ComponentBuild.from_component_name(

+             db_session, "perl-List-Compare", 3)

+ 

+         assert component.state != koji.BUILD_STATES["FAILED"]

+ 

+         start_build_component(db_session, builder, component.id)

+ 

+         component = models.ComponentBuild.from_component_name(

+             db_session, "perl-List-Compare", 3)

  

          assert component.state == koji.BUILD_STATES["FAILED"]

  
@@ -264,6 +275,8 @@ 

          builder.recover_orphaned_artifact.return_value = []

          start_next_batch_build(conf, module_build, builder)

  

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should increase

          assert module_build.batch == 2

  
@@ -292,6 +305,8 @@ 

          mock_sbc.reset_mock()

  

          # Complete the build

+         plc_component = models.ComponentBuild.from_component_name(

+             db_session, "perl-List-Compare", 3)

          plc_component.state = koji.BUILD_STATES["COMPLETE"]

          pt_component = models.ComponentBuild.from_component_name(

              db_session, "perl-Tangerine", 3)
@@ -301,6 +316,9 @@ 

  

          # Start the next build batch

          start_next_batch_build(conf, module_build, builder)

+ 

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should increase

          assert module_build.batch == 3

          # Verify that tangerine was reused even though perl-Tangerine was rebuilt in the previous
@@ -344,6 +362,8 @@ 

          builder.recover_orphaned_artifact.return_value = []

          start_next_batch_build(conf, module_build, builder)

  

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should increase.

          assert module_build.batch == 2

  
@@ -357,8 +377,8 @@ 

  

          # Test the order of the scheduling

          expected_calls = [

-             mock.call(db_session, builder, plc_component),

-             mock.call(db_session, builder, pt_component)

+             mock.call(db_session, builder, plc_component.id),

+             mock.call(db_session, builder, pt_component.id)

          ]

          assert mock_sbc.mock_calls == expected_calls

  
@@ -379,6 +399,8 @@ 

          builder = mock.MagicMock()

          start_next_batch_build(conf, module_build, builder)

  

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should not increase.

          assert module_build.batch == 2

          # Make sure start build was called for the second component which wasn't reused
@@ -399,5 +421,7 @@ 

          builder = mock.MagicMock()

          builder.buildroot_ready.return_value = False

  

+         module_build = models.ModuleBuild.get_by_id(db_session, 3)

+ 

          # Batch number should not increase.

          assert module_build.batch == 1

SQLAlchemy objects can't be used from multiple threads - so when starting
threads for builds, pass the ComponentBuild id rather than the object.
(Note that despite the comment that the threads were sharing a session,
they weren't - what was passed to the thread was a scoped_session that
acts as a separate thread-local session per-thread.)

BUILD_COMPONENT_DB_SESSION_LOCK - a threading.Lock() object that was used
in a few places - but not nearly enough places to effectively lock usage
of a shared session - is removed.

rebased onto 4a3e6fb

2 years ago

Commit c240381 fixes this pull-request

Pull-Request has been merged by breilly

2 years ago

Pull-Request has been merged by breilly

2 years ago