#1007 Parallel action processing
Merged 2 months ago by praiskup. Opened 2 months ago by praiskup.
copr/ praiskup/copr parallel-actions  into  master

file modified
+78 -31

@@ -5,6 +5,7 @@ 

  import time

  import traceback

  import base64

+ import subprocess

  

  from distutils.dir_util import copy_tree

  from distutils.errors import DistutilsFileError

@@ -21,8 +22,11 @@ 

  from .sign import create_user_keys, CoprKeygenRequestError

  from .createrepo import createrepo

  from .exceptions import CreateRepoError, CoprSignError

- from .helpers import get_redis_logger, silent_remove, ensure_dir_exists, get_chroot_arch, cmd_debug, format_filename

+ from .helpers import (get_redis_logger, silent_remove, ensure_dir_exists,

+                       get_chroot_arch, cmd_debug, format_filename,

+                       uses_devel_repo)

  from .sign import sign_rpms_in_dir, unsign_rpms_in_dir, get_pubkey

+ from backend.worker_manager import WorkerManager

  

  from .vm_manage.manager import VmManager

  

@@ -46,17 +50,16 @@ 

  

      """

      # TODO: get more form opts, decrease number of parameters

-     def __init__(self, opts, action, frontend_client):

+     def __init__(self, opts, action, log=None):

  

          self.opts = opts

-         self.frontend_client = frontend_client

          self.data = action

  

          self.destdir = self.opts.destdir

          self.front_url = self.opts.frontend_base_url

          self.results_root_url = self.opts.results_baseurl

  

-         self.log = get_redis_logger(self.opts, "backend.actions", "actions")

+         self.log = log if log else get_redis_logger(self.opts, "backend.actions", "actions")

  

      def __str__(self):

          return "<Action: {}>".format(self.data)

@@ -83,14 +86,14 @@ 

  

                  path = self.get_chroot_result_dir(chroot, project_dirname, ownername)

                  try:

+                     self.log.info("Empty repo so far, creating the directory")

                      os.makedirs(path)

                  except FileExistsError:

                      pass

  

                  try:

-                     createrepo(path=path, front_url=self.front_url,

-                                username=ownername, projectname=projectname,

-                                override_acr_flag=True)

+                     createrepo(path=path, username=ownername,

+                                projectname=projectname)

                      done_count += 1

                  except CoprRequestException as err:

                      # fixme: dirty hack to catch case when createrepo invoked upon deleted project

@@ -166,9 +169,8 @@ 

                      self.log.info("Forked build %s as %s", src_path, dst_path)

  

              for chroot_path in chroot_paths:

-                 createrepo(path=chroot_path, front_url=self.front_url,

-                            username=data["user"], projectname=data["copr"],

-                            override_acr_flag=True)

+                 createrepo(path=chroot_path, username=data["user"],

+                            projectname=data["copr"])

  

              result.result = ActionResult.SUCCESS

              result.ended_on = time.time()

@@ -232,6 +234,9 @@ 

                  result.result = ActionResult.FAILURE

  

      def run_createrepo(self, ownername, projectname, project_dirname, chroots):

+         devel = uses_devel_repo(self.front_url, ownername,

+                                 projectname)

+ 

          for chroot in chroots:

              chroot_path = os.path.join(self.destdir, ownername, project_dirname, chroot)

              self.log.debug("Running createrepo on %s", chroot_path)

@@ -250,15 +255,14 @@ 

                  pass

  

              try:

-                 createrepo(

-                     path=chroot_path,

-                     front_url=self.front_url, base_url=result_base_url,

-                     username=ownername, projectname=projectname)

+                 createrepo(path=chroot_path, base_url=result_base_url,

+                            username=ownername, projectname=projectname,

+                            devel=devel)

              except CoprRequestException:

                  # FIXME: dirty hack to catch the case when createrepo invoked upon a deleted project

                  self.log.exception("Project %s/%s has been deleted on frontend", ownername, projectname)

              except CreateRepoError:

-                 self.log.exception("Error making local repo: %s", full_path)

+                 self.log.exception("Error making local repo: %s", chroot_path)

  

      def delete_build(self, ownername, project_dirname, chroot_builddirs):

          self.log.info("Going to delete: %s", chroot_builddirs)

@@ -364,9 +368,8 @@ 

                      with open(os.path.join(destdir, "build.info"), "a") as f:

                          f.write("\nfrom_chroot={}".format(data["rawhide_chroot"]))

  

-             createrepo(path=chrootdir, front_url=self.front_url,

-                        username=data["ownername"], projectname=data["projectname"],

-                        override_acr_flag=True)

+             createrepo(path=chrootdir, username=data["ownername"],

+                        projectname=data["projectname"])

          except:

              result.result = ActionResult.FAILURE

  

@@ -470,9 +473,8 @@ 

                      mmd.set_rpm_artifacts(artifacts)

                      self.log.info("Module artifacts: %s", mmd.get_rpm_artifacts())

                      Modulemd.dump([mmd], os.path.join(destdir, "modules.yaml"))

-                     createrepo(path=destdir, front_url=self.front_url,

-                                username=ownername, projectname=projectname,

-                                override_acr_flag=True)

+                     createrepo(path=destdir, username=ownername,

+                                projectname=projectname)

  

              result.result = ActionResult.SUCCESS

          except Exception as e:

@@ -483,6 +485,7 @@ 

          """ Handle action (other then builds) - like rename or delete of project """

          self.log.info("Executing: %s", str(self))

  

+         # TODO: we don't need Munch() here, drop it

          result = Munch()

          result.id = self.data["id"]

  

@@ -525,16 +528,7 @@ 

              self.handle_cancel_build(result)

  

          self.log.info("Action result: %s", result)

- 

-         if "result" in result:

-             if result.result == ActionResult.SUCCESS and \

-                     not getattr(result, "job_ended_on", None):

-                 result.job_ended_on = time.time()

- 

-             try:

-                 self.frontend_client.update({"actions": [result]})

-             except RequestException as e:

-                 self.log.exception(e)

+         return result

  

  

  # TODO: sync with ActionTypeEnum from common

@@ -556,3 +550,56 @@ 

      WAITING = 0

      SUCCESS = 1

      FAILURE = 2

+ 

+ 

+ class ActionQueueTask():

+     def __init__(self, id):

+         self.id = id

+     def __repr__(self):

+         return str(self.id)

+ 

+ 

+ class ActionWorkerManager(WorkerManager):

+     frontend_client = None

+     worker_prefix = 'action_worker'

+ 

+     def start_task(self, worker_id, task):

+         command = [

+             'copr-backend-process-action',

+             '--daemon',

+             '--task-id', repr(task),

+             '--worker-id', worker_id,

+         ]

+         # TODO: mark as started on FE, and let user know in UI

+         subprocess.check_call(command)

+ 

+     def has_worker_ended(self, worker_id, task_info):

+         return 'status' in task_info

+ 

+     def finish_task(self, worker_id, task_info):

+         task_id = self.get_task_id_from_worker_id(worker_id)

+ 

+         result = Munch()

+         result.id = int(task_id)

+         result.result = int(task_info['status'])

+         result.job_ended_on = time.time()

+ 

+         try:

+             self.frontend_client.update({"actions": [result]})

+         except RequestException:

+             self.log.exception("can't post to frontend, retrying indefinitely")

+             return False

+         return True

+ 

+     def is_worker_alive(self, worker_id, task_info):

+         if not 'PID' in task_info:

+             return False

+         pid = int(task_info['PID'])

+         try:

+             # Send signal=0 to the process to check whether it still exists.

+             # This is just no-op if the signal was successfully delivered to

+             # existing process, otherwise exception is raised.

+             os.kill(pid, 0)

+         except OSError:

+             return False

+         return True

file modified
+8 -12

@@ -10,7 +10,6 @@ 

  # opts = BackendConfigReader().read()

  # log = get_redis_logger(opts, "createrepo", "actions")

  

- from .helpers import get_auto_createrepo_status

  from .exceptions import CreateRepoError

  

  

@@ -163,29 +162,26 @@ 

      return ""

  

  

- def createrepo(path, front_url, username, projectname,

-                override_acr_flag=False, base_url=None):

+ def createrepo(path, username, projectname, devel=False, base_url=None):

      """

-         Creates repo depending on the project setting "auto_createrepo".

-         When enabled creates `repodata` at the provided path, otherwise

+     Creates repodata.  Depending on the "auto_createrepo" parameter it either

+     creates the repodata directory in `path`, or in `path/devel`.

  

      :param path: directory with rpms

-     :param front_url: url to the copr frontend

      :param username: copr project owner username

      :param projectname: copr project name

+     :param devel: create the repository in 'devel' subdirectory

      :param base_url: base_url to access rpms independently of repomd location

-     :param Multiprocessing.Lock lock:  [optional] global copr-backend lock

  

      :return: tuple(returncode, stdout, stderr) produced by `createrepo_c`

      """

      # TODO: add means of logging

- 

-     base_url = base_url or ""

- 

-     acr_flag = get_auto_createrepo_status(front_url, username, projectname)

-     if override_acr_flag or acr_flag:

+     if not devel:

          out_cr = createrepo_unsafe(path)

          out_ad = add_appdata(path, username, projectname)

          out_md = add_modules(path)

          return "\n".join([out_cr, out_ad, out_md])

+ 

+     # Automatic createrepo disabled.  Even so, we still need to createrepo in

+     # special "devel" directory so we can later build packages against it.

      return createrepo_unsafe(path, base_url=base_url, dest_dir="devel")

@@ -7,8 +7,8 @@ 

  

  from backend.frontend import FrontendClient

  

- from ..actions import Action

- from ..helpers import get_redis_logger

+ from ..actions import ActionWorkerManager, ActionQueueTask

+ from ..helpers import get_redis_logger, get_redis_connection

  

  

  class ActionDispatcher(multiprocessing.Process):

@@ -23,7 +23,6 @@ 

  

          self.opts = opts

          self.log = get_redis_logger(self.opts, "backend.action_dispatcher", "action_dispatcher")

-         self.frontend_client = FrontendClient(self.opts, self.log)

  

      def update_process_title(self, msg=None):

          proc_title = "Action dispatcher"

@@ -31,30 +30,23 @@ 

              proc_title += " - " + msg

          setproctitle(proc_title)

  

-     def load_action(self):

+     def get_frontend_actions(self):

          """

-         Retrieve an action task from frontend.

+         Get unfiltered list of actions from frontend, both running and pending.

          """

-         self.log.info("Waiting for an action task from frontend...")

-         get_action_init_time = time.time()

- 

-         action_task = None

-         while not action_task:

-             self.update_process_title("Waiting for an action task from frontend for {}s"

-                                       .format(int(time.time() - get_action_init_time)))

-             try:

-                 r = get("{0}/backend/pending-action/".format(self.opts.frontend_base_url),

-                         auth=("user", self.opts.frontend_auth))

-                 action_task = r.json()

-             except (RequestException, ValueError) as error:

-                 self.log.exception("Retrieving an action task from %s failed with error: %s",

-                                    self.opts.frontend_base_url, error)

-             finally:

-                 if not action_task:

-                     time.sleep(self.opts.sleeptime)

- 

-         self.log.info("Got new action_task %s of type %s", action_task['id'], action_task['action_type'])

-         return Action(self.opts, action_task, frontend_client=self.frontend_client)

+ 

+         try:

+             url = "{0}/backend/pending-actions/".format(self.opts.frontend_base_url)

+             request = get(url, auth=("user", self.opts.frontend_auth))

+             raw_actions = request.json()

+         except (RequestException, ValueError) as error:

+             self.log.exception(

+                 "Retrieving an action tasks failed with error: %s",

+                 error)

+             return []

+ 

+         return [ActionQueueTask(action['id']) for action in raw_actions]

+ 

  

      def run(self):

          """

@@ -63,13 +55,24 @@ 

          self.log.info("Action dispatching started.")

          self.update_process_title()

  

+         redis = get_redis_connection(self.opts)

+         worker_manager = ActionWorkerManager(

+             redis_connection=redis,

+             log=self.log,

+             max_workers=self.opts.actions_max_workers)

+         worker_manager.frontend_client = FrontendClient(self.opts, self.log)

+ 

+         timeout = self.opts.sleeptime

+ 

          while True:

-             action = self.load_action()

-             try:

-                 action.run()

-             except Exception as e: # dirty

-                 self.log.exception(str(e))

-             msg = "Started new action {} of type {}"\

-                   .format(action.data["id"], action.data["action_type"])

-             self.update_process_title(msg)

-             self.log.info(msg)

+             self.log.info("getting actions from frontend")

+             start = time.time()

+             for task in self.get_frontend_actions():

+                 worker_manager.add_task(task)

+ 

+             # Execute the actions.

+             worker_manager.run(timeout=timeout)

Just to make sure, this line will be executed instantly and just spawn asynchronous processes in the background. We are not going to wait here until they finish, right?

This function will run till there's something to do, but not longer then timeout.

+ 

+             sleep_more = timeout - (time.time() - start)

+             if sleep_more > 0:

+                 time.sleep(sleep_more)

I don't understand this piece. For what are we waiting here? Can't it be just time.sleep(timeout)?

The run() method takes some time, and I was trying to do the request for new actions each "timeout" period.

file modified
+23 -1

@@ -1,7 +1,8 @@ 

  import json

  import time

- from requests import post, RequestException

+ from requests import post, get, RequestException

  

+ RETRY_TIMEOUT = 5

  

  class FrontendClient(object):

      """

@@ -38,6 +39,27 @@ 

              raise

          return response

  

+     def get_reliably(self, url_path):

+         """

+         Get the URL response from frontend, try indefinitely till the server

+         gives us answer.

+         """

+         url = "{}/{}/".format(self.frontend_url, url_path)

+         auth = ("user", self.frontend_auth)

+ 

+         attempt = 0

+         while True:

+             attempt += 1

+             try:

+                 response = get(url, auth=auth)

+             except RequestException as ex:

+                 self.msg = "Get request {} failed: {}".format(attempt, ex)

+                 time.sleep(RETRY_TIMEOUT)

+                 continue

+ 

+             return response

+ 

+ 

      def _post_to_frontend_repeatedly(self, data, url_path, max_repeats=10):

          """

          Make a request max_repeats-time to the frontend

file modified
+7 -3

@@ -266,6 +266,10 @@ 

              cp, "builder", "consecutive_failure_threshold",

              DEF_CONSECUTIVE_FAILURE_THRESHOLD, mode="int")

  

+         opts.actions_max_workers = _get_conf(

+             cp, "builder", "actions_max_workers",

+             default=10, mode="int")

+ 

          opts.log_dir = _get_conf(

              cp, "backend", "log_dir", "/var/log/copr-backend/")

          opts.log_level = _get_conf(

@@ -294,13 +298,13 @@ 

          return opts

  

  

- def get_auto_createrepo_status(front_url, username, projectname):

+ def uses_devel_repo(front_url, username, projectname):

      client = CoprClient(copr_url=front_url)

      result = client.get_project_details(projectname, username)

  

      if "auto_createrepo" in result.data["detail"]:

-         return bool(result.data["detail"]["auto_createrepo"])

-     return True

+         return not bool(result.data["detail"]["auto_createrepo"])

+     return False

  

  

  def get_persistent_status(front_url, username, projectname):

@@ -25,6 +25,7 @@ 

  from ..constants import DEF_BUILD_TIMEOUT, DEF_REPOS, \

      DEF_BUILD_USER, DEF_MACROS

  from ..exceptions import MockRemoteError, BuilderError, CreateRepoError

+ from ..helpers import uses_devel_repo

  

  

  # TODO: replace sign & createrepo with dependency injection

@@ -205,20 +206,25 @@ 

          if self.job.chroot == 'srpm-builds':

              return

  

-         base_url = "/".join([self.opts.results_baseurl, self.job.project_owner,

-                              self.job.project_name, self.job.chroot])

+         project_owner = self.job.project_owner

+         project_name = self.job.project_name

+ 

+         base_url = "/".join([self.opts.results_baseurl, project_owner,

+                              project_name, self.job.chroot])

          self.log.info("Createrepo:: owner:  {}; project: {}; "

                        "front url: {}; path: {}; base_url: {}"

-                       .format(self.job.project_owner, self.job.project_name,

+                       .format(project_owner, project_name,

                                self.opts.frontend_base_url, self.chroot_dir, base_url))

  

+         devel = uses_devel_repo(self.opts.frontend_base_url,

+                                 project_owner, project_name)

          try:

              createrepo(

                  path=self.chroot_dir,

-                 front_url=self.opts.frontend_base_url,

                  base_url=base_url,

-                 username=self.job.project_owner,

-                 projectname=self.job.project_name,

+                 username=project_owner,

+                 projectname=project_name,

+                 devel=devel,

              )

          except CreateRepoError:

              self.log.exception("Error making local repo: {}".format(self.chroot_dir))

@@ -0,0 +1,235 @@ 

+ import time

+ from heapq import heappop, heappush

+ import itertools

+ import redis

+ import logging

+ 

+ class JobQueue():

+     """

+     Priority "task" queue for WorkerManager.  Taken from:

+     https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes

+     The higher the 'priority' is, the later the task is taken.

+     """

+ 

+     def __init__(self, removed='<removed-task>'):

+         self.prio_queue = []             # list of entries arranged in a heap

+         self.entry_finder = {}           # mapping of tasks to entries

+         self.removed = removed           # placeholder for a removed task

+         self.counter = itertools.count() # unique sequence count

+ 

+     def add_task(self, task, priority=0):

+         'Add a new task or update the priority of an existing task'

+         if repr(task) in self.entry_finder:

+             self.remove_task(task)

+         count = next(self.counter)

+         entry = [priority, count, task]

+         self.entry_finder[repr(task)] = entry

+         heappush(self.prio_queue, entry)

+ 

+     def remove_task(self, task):

+         'Mark an existing task as removed.  Raise KeyError if not found.'

+         entry = self.entry_finder.pop(repr(task))

+         entry[-1] = self.removed

+ 

+     def pop_task(self):

+         'Remove and return the lowest priority task. Raise KeyError if empty.'

+         while self.prio_queue:

+             priority, count, task = heappop(self.prio_queue)

+             if task is not self.removed:

+                 del self.entry_finder[repr(task)]

+                 return task

+         raise KeyError('pop from an empty priority queue')

+ 

+ 

+ class WorkerManager():

+     """

+     Automatically process 'self.tasks' priority queue, and start background jobs

+     to handle them.

+ 

+     :cvar worker_prefix: Unique string across all the WorkerManager child

+             classes, this is used as prefix for the workers in redis database

+             and to easily determine to which WorkerManager the particlar worker

+             belongs to.  So it can be anything reasonable, just make sure it is

+             unique.

+     :cvar worker_timeout_start: The time period we give the background process

+             successfully start and identify itself (see has_worker_started()

+             method).  If the background process isn't indentified after this

+             timeout, we drop it from database and consider it failed.  And the

+             task is re-scheduled.  Float value in seconds.

+     :cvar worker_timeout_deadcheck: If the worker successfully identified itself

+             after start (the has_worker_started() returns True) we know that the

+             worker process at least started.  But after worker_timeout_deadcheck

+             timeout we'll also keep an eye on the process by asking the

+             is_worker_alive() method - whether the task is really still doing

+             something on background or not (== unexpected failure cleanup).

+             Fill float value in seconds.

+     """

+     worker_prefix = 'worker' # make sure this is unique in each class

+     worker_timeout_start = 10

+     worker_timeout_deadcheck = 60

+ 

+     def __init__(self, redis_connection=None, max_workers=8, log=None):

+         self.tasks = JobQueue()

+         self.log = log if log else logging.getLogger()

+         self.redis = redis_connection

+         self.max_workers = max_workers

+ 

+     def start_task(self, worker_id, task):

+         """

+         Start background job using the 'task' object taken from the 'tasks'

+         queue.  The background task should _on its own_ and ASAP let the manager

+         know that it successfully started (e.g. mark the job 'started' in redis

+         DB), so the has_worker_started() method later gives us valid info.

+         """

+         raise NotImplementedError

+ 

+     def finish_task(self, worker_id, task):

+         """

+         This is called once the worker manager consider the task to be done,

+         because the `has_worker_ended()` method already returns True.  Override

+         this function and use it to let Frontend know that the task finished,

+         and how (whether it succeeded, etc.).

+         """

+         raise NotImplementedError

+ 

+     def has_worker_started(self, worker_id, task_info):

+         """

+         The background task process should somehow notify manager that it

+         already started (so we can have has_worker_started() implemented).

+         By default we expect it sets 'started' attribute in redis DB, but feel

+         free to override this method and invent different notification

+         mechanism.

+         """

+         return 'started' in task_info

+ 

+     def has_worker_ended(self, worker_id, task_info):

+         """

+         Check 'task_info' (dictionary output from redis) whether the task is

+         already finished by worker.  If yes, do whatever is needed with the

+         result (contact frontend) and return True.  If the task is still

+         processed, return False.

+         """

+         raise NotImplementedError

+ 

+     def is_worker_alive(self, worker_id, task_info):

+         """

+         Check staled jobs on background, whether they haven't died before they

+         notified us about the status.  We'll keep asking after

+         worker_timeout_deadcheck seconds left since we tried to spawn the

+         worker.

+         """

+         raise NotImplementedError

+ 

+     def get_worker_id(self, task_id):

+         """

+         Given the unique task representation form (usually ID), generate worker

+         identificator (redis key).

+         """

+         return '{}:{}'.format(self.worker_prefix, task_id)

+ 

+     def get_task_id_from_worker_id(self, worker_id):

+         """

+         Given the unique task representation form (usually ID), generate worker

+         identificator (redis key).

+         """

+         prefix, task_id = worker_id.rsplit(':', 1)

+         assert prefix == self.worker_prefix

+         return task_id

+ 

+     def has_worker(self, task_id):

+         worker_id = self.get_worker_id(task_id)

+         return worker_id in self.worker_ids()

+ 

+     def add_task(self, task):

+         task_id = repr(task)

+         if self.has_worker(task_id):

+             # No need to re-add this to queue.

+             self.log.warning("Task %s has worker, skipped", task_id)

+             return

+ 

+         self.log.info("Adding task %s to queue", task_id)

+         self.tasks.add_task(task)

+ 

+     def worker_ids(self):

+         """

+         Return the redis keys representing workers running on background.

+         """

+         return self.redis.keys(self.worker_prefix + ':*')

+ 

+     def run(self, timeout=float('inf')):

+         """

+         Process the task (priority) queue.

+         """

+         start_time = time.time()

+ 

+         while time.time() - start_time < timeout:

+             self._cleanup_workers()

+ 

+             worker_count = len(self.worker_ids())

+             if worker_count >= self.max_workers:

+                 time.sleep(1)

+                 continue

+ 

+             # We can allocate some workers, if there's something to do.

+             try:

+                 task = self.tasks.pop_task()

+             except KeyError:

+                 # Empty queue!

+                 if worker_count:

+                     # It still makes sense to cycle to finish the workers.

+                     time.sleep(1)

+                     continue

+                 # Optimization part, nobody is working now, and there's nothing

+                 # to do.  Just simply wait till the end of the cycle.

+                 break

+ 

+             self._start_worker(task)

+ 

+     def _start_worker(self, task):

+         worker_id = self.get_worker_id(repr(task))

+         self.redis.hset(worker_id, 'allocated', time.time())

+         self.log.info("Starting worker %s", worker_id)

+         self.start_task(worker_id, task)

+ 

+     def clean_tasks(self):

+         'remove all tasks from queue'

+         self.tasks = JobQueue()

+ 

+     def _cleanup_workers(self):

+         now = time.time()

+ 

+         for worker_id in self.worker_ids():

+             info = self.redis.hgetall(worker_id)

+             allocated = float(info.get('allocated'))

+ 

+             if self.has_worker_ended(worker_id, info):

+                 # finished worker

+                 self.log.info("Finished worker %s", worker_id)

+                 self.finish_task(worker_id, info)

+                 self.redis.delete(worker_id)

+                 continue

+ 

+             if info.get('delete'):

+                 self.log.warning("worker %s deleted", worker_id)

+                 self.redis.delete(worker_id)

+                 continue

+ 

+             if not self.has_worker_started(worker_id, info):

+                 if now - allocated > self.worker_timeout_start:

+                     # This worker failed to start?

+                     self.log.error("worker %s failed to start", worker_id)

+                     self.redis.delete(worker_id)

+                 continue

+ 

+             checked = info.get('checked', allocated)

+ 

+             if now - float(checked) > self.worker_timeout_deadcheck:

+                 self.log.info("checking worker %s", worker_id)

+                 self.redis.hset(worker_id, 'checked', now)

+                 if self.is_worker_alive(worker_id, info):

+                     continue

+                 self.log.error("dead worker %s", worker_id)

+ 

+                 # The worker could finish in the meantime, make sure we

+                 # hgetall() once more.

+                 self.redis.hset(worker_id, 'delete', 1)

@@ -55,6 +55,10 @@ 

  # default is 10

  sleeptime=30

  

+ # How many workers (background processes) can Backend spawn for handling actions

+ # concurrently.

+ #actions_max_workers=8

+ 

  # exit on worker failure

  # default is false

  #exit_on_worker=false

@@ -0,0 +1,112 @@ 

+ #! /usr/bin/python3

+ 

+ import os

+ import sys

+ import time

+ import daemon

+ import argparse

+ import logging

+ import contextlib

+ 

+ sys.path.append("/usr/share/copr/")

+ 

+ from backend.helpers import (BackendConfigReader, get_redis_logger,

+                              get_redis_connection)

+ from backend.frontend import FrontendClient

+ from backend.actions import Action, ActionResult

+ 

+ 

+ def get_arg_parser():

+     'return argument parser object'

+ 

+     parser = argparse.ArgumentParser(

+         description="Process single copr action",

+     )

+     parser.add_argument(

+         "--task-id",

+         type=int,

+         required=True,

+         help="Task ID to process",

+     )

+     parser.add_argument(

+         "--worker-id",

+         help="Worker ID already exists in DB (used by WorkerManager only)",

+     )

+     parser.add_argument(

+         "--daemon",

+         action='store_true',

+         help="run on background, as daemon process"

+     )

+     return parser

+ 

+ 

+ def handle_task(opts, args, log):

+     "Handle the task, executed on background in DaemonContext"

+ 

+     task_id = args.task_id

+ 

+     frontend_client = FrontendClient(opts, log)

+     redis = get_redis_connection(opts)

+ 

+     log.info("Handling action %s", task_id)

+ 

+     if args.worker_id:

+         # Identify ourselves.

+         redis.hset(args.worker_id, 'started', 1)

+         redis.hset(args.worker_id, 'PID', os.getpid())

+ 

+     resp = frontend_client.get_reliably('action/{}'.format(task_id))

+     if resp.status_code != 200:

+         log.error("failed to download task, apache code %s", resp.status_code)

+         sys.exit(1)

+ 

+     action_task = resp.json()

+     action = Action(opts, action_task, log=log)

+     result = ActionResult.FAILURE

+     try:

+         action_result = action.run()

+         result = action_result.result

+     except Exception:

+         log.exception("action failed for unknown error")

+ 

+     log.info("Action %s ended with status=%s", action_task, result)

+ 

+     # Let the manager know what's the result.

+     if args.worker_id:

+         redis.hset(args.worker_id, 'status', str(result))

+ 

+ 

+ def main():

+     'handle the task, the main function'

+ 

+     if os.getuid() == 0:

+         sys.stderr.write("this needs to be run as 'copr' user\n")

+         sys.exit(1)

+ 

+     config = '/etc/copr/copr-be.conf'

+     opts = BackendConfigReader(config).read()

+     args = get_arg_parser().parse_args()

+ 

+     context = contextlib.nullcontext()

+     if args.daemon:

+         context = daemon.DaemonContext()

+ 

+     with context:

+         logger_name = '{}.{}.pid-{}'.format(

+             sys.argv[0],

+             'managed' if args.worker_id else 'manual',

+             os.getpid(),

+         )

+         log = get_redis_logger(opts, logger_name, "actions")

+         try:

+             if not args.daemon:

+                 # when executing from commandline - on foreground - we want to print

+                 # something to stderr as well

+                 log.addHandler(logging.StreamHandler())

+             handle_task(opts, args, log)

+         except Exception as exc: # pylint: disable=W0703

+             log.exception("unexpected failure %s", str(exc))

+ 

+ 

+ if __name__ == "__main__":

+     main()

@@ -19,7 +19,7 @@ 

  sys.path.append("/usr/share/copr/")

  

  from backend.helpers import BackendConfigReader

- from backend.helpers import get_auto_createrepo_status,get_persistent_status,get_auto_prune_status

+ from backend.helpers import uses_devel_repo, get_persistent_status, get_auto_prune_status

  from backend.frontend import FrontendClient

  from backend.createrepo import createrepo

  

@@ -114,7 +114,7 @@ 

          loginfo("projectname = {}".format(projectname))

  

          try:

-             if not get_auto_createrepo_status(self.opts.frontend_base_url, username, projectname):

+             if uses_devel_repo(self.opts.frontend_base_url, username, projectname):

                  loginfo("Skipped {}/{} since auto createrepo option is disabled"

                            .format(username, projectdir))

                  return

@@ -165,9 +165,8 @@ 

                  cmd = ['prunerepo', '--verbose', '--days', str(self.prune_days), '--nocreaterepo', chroot_path]

                  stdout = runcmd(cmd)

                  loginfo(stdout)

-                 createrepo(path=chroot_path, front_url=self.opts.frontend_base_url,

-                            username=username, projectname=projectname,

-                            override_acr_flag=True)

+                 createrepo(path=chroot_path, username=username,

+                            projectname=projectname)

                  clean_copr(chroot_path, self.prune_days, verbose=True)

              except Exception as err:

                  logexception(err)

@@ -20,13 +20,14 @@ 

  

  

  sys.path.append("/usr/share/copr/")

- from backend.helpers import BackendConfigReader, create_file_logger

+ from backend.helpers import (BackendConfigReader, create_file_logger,

+                              uses_devel_repo)

  from backend.sign import get_pubkey, sign_rpms_in_dir, create_user_keys

  from backend.exceptions import CoprSignNoKeyError

  from backend.createrepo import createrepo

  

  

- def check_signed_rpms_in_pkg_dir(pkg_dir, user, project, chroot, chroot_dir, opts):

+ def check_signed_rpms_in_pkg_dir(pkg_dir, user, project, chroot, chroot_dir, opts, devel):

      success = True

  

      logger = create_file_logger("run.check_signed_rpms_in_pkg_dir",

@@ -39,10 +40,10 @@ 

                               project, chroot])

          createrepo(

              path=chroot_dir,

-             front_url=opts.frontend_base_url,

              base_url=base_url,

              username=user,

              projectname=project,

+             devel=devel,

          )

  

      except Exception as err:

@@ -53,7 +54,7 @@ 

      return success

  

  

- def check_signed_rpms(project_dir, user, project, opts):

+ def check_signed_rpms(project_dir, user, project, opts, devel):

      """

      Ensure that all rpm files are signed

      """

@@ -77,7 +78,9 @@ 

  

              log.debug(">> Stepping into package: {}".format(mb_pkg_path))

  

-             if not check_signed_rpms_in_pkg_dir(mb_pkg_path, user, project, chroot, chroot_path, opts):

+             if not check_signed_rpms_in_pkg_dir(mb_pkg_path, user, project,

+                                                 chroot, chroot_path, opts,

+                                                 devel):

                  success = False

  

      return success

@@ -140,9 +143,18 @@ 

                  failed = True

                  continue

  

+             try:

+                 devel = uses_devel_repo(opts.frontend_base_url,

+                                         user_name, project_name)

+             except:

+                 log.exception("Can't get ACR flag for {}/{}, mark as failed, skipping")

+                 failed = True

+                 continue

+ 

              project_dir = os.path.join(user_dir, project_name)

              pubkey_path = os.path.join(project_dir, "pubkey.gpg")

-             if not check_signed_rpms(project_dir, user_name, project_name, opts):

+             if not check_signed_rpms(project_dir, user_name, project_name, opts,

+                                      devel):

                  failed = False

  

              if not check_pubkey(pubkey_path, user_name, project_name, opts):

file modified
+14 -26

@@ -13,33 +13,21 @@ 

  }

  trap cleanup EXIT

  

+ common_path=$(readlink -f ../common)

+ export PYTHONPATH="$common_path:backend:run${PYTHONPATH+:$PYTHONPATH}$common_path"

+ 

  COVPARAMS='--cov-report term-missing --cov ./backend --cov ./run'

  

- while [[ $# > 1 ]]

- do

- 	key="$1"

- 	case $key in

- 		--nocov)

- 		COVPARAMS=""

- 		;;

- 		*) # unknown option

- 		;;

- 	esac

- shift # past argument or value

+ KEEP_ARGS=()

+ for arg; do

+     case $arg in

+     --nocov)

+         COVPARAMS=""

+         ;;

+     *)

+         KEEP_ARGS+=( "$arg" )

+         ;;

+     esac

  done

  

- #TESTS=./tests

- 

- # Quick hack to disable tests/daemons/test_backend.py tests/mockremote/test_builder.py

- # tests/mockremote/test_mockremote.py that are currently failing due to complete code rewrite

- # TODO: prune tests (case-by-case) that are no longer relevant. We mostly rely on

- # integration & regression tests now.

- TESTS="tests/test_createrepo.py tests/test_frontend.py tests/test_helpers.py tests/test_sign.py tests/vm_manager/test_manager.py tests/test_action.py"

- 

- if [[ -n $@ ]]; then

- 	TESTS=$@

- fi

- 

- common_path=$(readlink -f ../common)

- export PYTHONPATH="$common_path:backend:run${PYTHONPATH+:$PYTHONPATH}$common_path"

- python3 -m pytest -s $COVPARAMS $TESTS

+ python3 -m pytest -s tests $COVPARAMS "${KEEP_ARGS[@]}"

@@ -0,0 +1,44 @@ 

+ #! /usr/bin/python3

+ 

+ import os

+ import sys

+ import time

+ import daemon

+ from munch import Munch

+ import logging

+ 

+ WORKDIR = os.path.dirname(__file__)

+ 

+ sys.path.append(os.path.join(WORKDIR, '..'))

+ 

+ from backend.helpers import get_redis_connection

+ 

+ REDIS_OPTS = Munch(

+     redis_db=9,

+     redis_port=7777,

+ )

+ 

+ def do_the_useful_stuff(process_counter, task_id, worker_id, sleep):

+     if 'FAIL_EARLY' in os.environ:

+         raise Exception("sorry")

+ 

+     redis = get_redis_connection(REDIS_OPTS)

+ 

+     redis.hset(worker_id, 'started', 1)

+     redis.hset(worker_id, 'PID', os.getpid())

+ 

+     if 'FAIL_STARTED' in os.environ:

+         raise Exception("sorry")

+ 

+     # do some work!

+     time.sleep(sleep)

+ 

+     result = 1 if process_counter % 8 else 2

+     redis.hset(worker_id, 'status', str(result))

+     return 0

+ 

+ 

+ if __name__ == "__main__":

+     with daemon.DaemonContext():

+         do_the_useful_stuff(int(sys.argv[1]), sys.argv[2], sys.argv[3],

+                             float(sys.argv[4]))

@@ -13,7 +13,7 @@ 

  from backend.daemons.backend import CoprBackend, run_backend

  from backend.exceptions import CoprBackendError

  

- from unittest import mock

+ from unittest import mock, skip

  from unittest.mock import MagicMock

  

  STDOUT = "stdout"

@@ -22,22 +22,17 @@ 

  COPR_NAME = "copr_name"

  COPR_VENDOR = "vendor"

  

- MODULE_REF = "backend.daemons.backend"

- 

- @pytest.yield_fixture

- def mc_rt_channel():

-     with mock.patch("{}.jobgrabcontrol.Channel".format(MODULE_REF)) as mc_channel:

-         yield mc_channel

+ MODULE_REF = "backend.daemons.backend" 

  

  @pytest.yield_fixture

  def mc_worker():

-     with mock.patch("{}.Worker".format(MODULE_REF)) as worker:

+     with mock.patch("backend.daemons.worker.Worker") as worker:

          yield worker

  

  @pytest.yield_fixture

  def mc_time():

-     with mock.patch("{}.time".format(MODULE_REF)) as time_:

-         yield time_

+     with mock.patch("backend.daemons.worker.time") as time:

+         yield time

  

  @pytest.yield_fixture

  def mc_be():

@@ -130,12 +125,12 @@ 

          with pytest.raises(CoprBackendError):

              self.be = CoprBackend(None, self.ext_opts)

  

-     def test_constructor(self):

-         self.init_be()

+     def test_constructor(self, init_be):

          assert self.be.config_reader == self.bc_obj

          assert self.bc_obj.read.called

  

-     def test_init_task_queues(self, mc_rt_channel, init_be):

+     @skip("Fixme or remove, test doesn't work.")

+     def test_init_task_queues(self, init_be):

          self.be.jg_control = MagicMock()

          self.be.init_task_queues()

          assert self.be.jg_control.backend_start.called

@@ -148,6 +143,7 @@ 

          assert self.bc_obj.read.called

          assert self.be.opts == test_obj

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_spin_up_workers_by_group(self, mc_worker, init_be):

          worker = MagicMock()

          mc_worker.return_value = worker

@@ -161,6 +157,7 @@ 

          assert len(worker.start.call_args_list) == group["max_workers"]

          assert len(self.be.workers_by_group_id[0]) == group["max_workers"]

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_spin_up_workers_by_group_partial(self, mc_worker, init_be):

          worker = MagicMock()

          mc_worker.return_value = worker

@@ -176,6 +173,7 @@ 

          assert len(worker.start.call_args_list) == group["max_workers"] - 1

          assert len(self.be.workers_by_group_id[1]) == group["max_workers"]

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_prune_dead_workers_by_group(self, init_be):

          worker_alive = MagicMock()

          worker_alive.is_alive.return_value = True

@@ -192,6 +190,7 @@ 

          assert worker_dead.terminate.called

          assert not worker_alive.terminate.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_prune_dead_workers_by_group_terminate(self, init_be):

          worker_alive = MagicMock()

          worker_alive.is_alive.return_value = True

@@ -210,6 +209,7 @@ 

          assert worker_dead.terminate.called

          assert not worker_alive.terminate.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_terminate(self, init_be):

          worker_alive = MagicMock()

          worker_alive.is_alive.return_value = True

@@ -228,7 +228,8 @@ 

          assert worker_alive.terminate_instance.called

          assert worker_dead.terminate_instance.called

  

-     def test_run(self, mc_time, mc_rt_channel, init_be):

+     @skip("Fixme or remove, test doesn't work.")

+     def test_run(self, mc_time, init_be):

          worker_alive = MagicMock()

          worker_alive.is_alive.return_value = True

          worker_dead = MagicMock()

@@ -259,6 +260,7 @@ 

          assert not self.be.workers_by_group_id[0]

          assert not self.be.workers_by_group_id[1]

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_run_backend_basic(self, mc_be, mc_daemon_context):

          self.grp.getgrnam.return_value.gr_gid = 7

          self.pwd.getpwnam.return_value.pw_uid = 9

@@ -4,22 +4,24 @@ 

  import pprint

  from subprocess import CalledProcessError

  

- from ansible.errors import AnsibleError

  from munch import Munch

  import pytest

  import tempfile

  import shutil

  import time

  

- from backend.constants import BuildStatus, JOB_GRAB_TASK_END_PUBSUB

+ from backend.constants import BuildStatus

  from backend.exceptions import CoprWorkerError, CoprSpawnFailError, MockRemoteError, NoVmAvailable, VmError

  from backend.job import BuildJob

  from backend.vm_manage.models import VmDescriptor

  

- from unittest import mock

+ from unittest import mock, skip

  from unittest.mock import MagicMock

  

- from backend.daemons.dispatcher import Worker

+ from backend.daemons.worker import Worker

+ 

+ # TODO: drop these, not needed

+ JOB_GRAB_TASK_END_PUBSUB = "unused"

  

  STDOUT = "stdout"

  STDERR = "stderr"

@@ -27,7 +29,7 @@ 

  COPR_NAME = "copr_name"

  COPR_VENDOR = "vendor"

  

- MODULE_REF = "backend.daemons.dispatcher"

+ MODULE_REF = "backend.daemons.worker"

  

  @pytest.yield_fixture

  def mc_register_build_result(*args, **kwargs):

@@ -56,8 +58,8 @@ 

  

  

  @pytest.yield_fixture

- def mc_grc():

-     with mock.patch("{}.get_redis_connection".format(MODULE_REF)) as handle:

+ def mc_grl():

+     with mock.patch("{}.get_redis_logger".format(MODULE_REF)) as handle:

          yield handle

  

  

@@ -106,6 +108,7 @@ 

              "repos": "",

              "build_id": self.job_build_id,

              "chroot": self.CHROOT,

+             "project_dirname": COPR_NAME,

              "task_id": "{}-{}".format(self.job_build_id, self.CHROOT),

  

              "git_repo": self.GIT_REPO,

@@ -169,8 +172,10 @@ 

          self.worker = Worker(

              opts=self.opts,

              frontend_client=self.frontend_client,

-             worker_num=self.worker_num,

-             group_id=self.group_id,

+             vm_manager=None,

+             worker_id=None,

+             vm=VmDescriptor("1.1.1.1", "vm_name", 3, "ready"),

+             job=self.job,

          )

  

          self.worker.vmm = MagicMock()

@@ -191,9 +196,11 @@ 

          self.worker.vm_ip = self.vm_ip

  

      def teardown_method(self, method):

+         return

          # print("\nremove: {}".format(self.tmp_dir_path))

          shutil.rmtree(self.tmp_dir_path)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_init_worker_wo_callback(self):

          worker = Worker(

              opts=self.opts,

@@ -203,6 +210,7 @@ 

          )

          worker.vmm = MagicMock()

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_pkg_built_before(self):

          assert not Worker.pkg_built_before(self.pkg_path, self.CHROOT, self.tmp_dir_path)

          target_dir = os.path.join(self.tmp_dir_path, self.CHROOT, self.pkg_pdn)

@@ -216,16 +224,19 @@ 

              handle.write("done")

          assert Worker.pkg_built_before(self.pkg_path, self.CHROOT, self.tmp_dir_path)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_mark_started(self, init_worker):

          self.worker.mark_started(self.job)

          assert self.frontend_client.update.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_mark_started_error(self, init_worker):

          self.frontend_client.update.side_effect = IOError()

  

          with pytest.raises(CoprWorkerError):

              self.worker.mark_started(self.job)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_return_results(self, init_worker):

          self.job.started_on = self.test_time

          self.job.ended_on = self.test_time + 10

@@ -252,6 +263,7 @@ 

  

          assert self.frontend_client.update.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_return_results_error(self, init_worker):

          self.job.started_on = self.test_time

          self.job.ended_on = self.test_time + 10

@@ -260,6 +272,7 @@ 

          with pytest.raises(CoprWorkerError):

              self.worker.return_results(self.job)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_starting_builds(self, init_worker):

          self.job.started_on = self.test_time

          self.job.ended_on = self.test_time + 10

@@ -269,12 +282,14 @@ 

          # expected_call = mock.call(self.job_build_id, self.CHROOT)

          assert self.frontend_client.starting_build.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_starting_build_error(self, init_worker):

          self.frontend_client.starting_build.side_effect = IOError()

  

          with pytest.raises(CoprWorkerError):

              self.worker.starting_build(self.job)

  

+     @skip("Fixme or remove, test doesn't work.")

      @mock.patch("backend.daemons.dispatcher.MockRemote")

      @mock.patch("backend.daemons.dispatcher.os")

      def test_do_job_failure_on_mkdirs(self, mc_os, mc_mr, init_worker, reg_vm):

@@ -285,6 +300,7 @@ 

          assert self.job.status == BuildStatus.FAILURE

          assert not mc_mr.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_do_job(self, mc_mr_class, init_worker, reg_vm, mc_register_build_result):

          assert not os.path.exists(self.DESTDIR_CHROOT)

  

@@ -292,6 +308,7 @@ 

          assert self.job.status == BuildStatus.SUCCEEDED

          assert os.path.exists(self.DESTDIR_CHROOT)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_do_job_updates_details(self, mc_mr_class, init_worker, reg_vm, mc_register_build_result):

          assert not os.path.exists(self.DESTDIR_CHROOT)

          mc_mr_class.return_value.build_pkg_and_process_results.return_value = {

@@ -303,6 +320,7 @@ 

          assert self.job.results == self.test_time

          assert os.path.exists(self.DESTDIR_CHROOT)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_do_job_mr_error(self, mc_mr_class, init_worker,

                               reg_vm, mc_register_build_result):

          mc_mr_class.return_value.build_pkg_and_process_results.side_effect = MockRemoteError("foobar")

@@ -310,6 +328,7 @@ 

          self.worker.do_job(self.job)

          assert self.job.status == BuildStatus.FAILURE

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_copy_mock_logs(self, mc_mr_class, init_worker, reg_vm, mc_register_build_result):

          os.makedirs(self.job.results_dir)

          for filename in ["build-00012345.log", "build-00012345.rsync.log"]:

@@ -318,11 +337,13 @@ 

          self.worker.copy_mock_logs(self.job)

          assert set(os.listdir(self.job.results_dir)) == set(["rsync.log.gz", "mockchain.log.gz"])

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_copy_mock_logs_missing_files(self, mc_mr_class, init_worker, reg_vm, mc_register_build_result):

          os.makedirs(self.job.results_dir)

          self.worker.copy_mock_logs(self.job)

          assert set(os.listdir(self.job.results_dir)) == set()

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_clean_previous_build_results(self, mc_mr_class, init_worker, reg_vm, mc_register_build_result):

          os.makedirs(self.job.results_dir)

  

@@ -339,6 +360,7 @@ 

          assert set(os.listdir(backup_dir)) == set(files[2:] + ["build.info"])

          assert "foo.rpm" in os.listdir(self.job.results_dir)

  

+     @skip("Fixme or remove, test doesn't work.")

      @mock.patch("backend.daemons.dispatcher.fedmsg")

      def test_init_fedmsg(self, mc_fedmsg, init_worker):

          self.worker.init_fedmsg()

@@ -350,6 +372,7 @@ 

          mc_fedmsg.init.side_effect = KeyError()

          self.worker.init_fedmsg()

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_obtain_job(self, init_worker):

          mc_tq = MagicMock()

          self.worker.task_queue = mc_tq

@@ -359,6 +382,7 @@ 

          obtained_job = self.worker.obtain_job()

          assert obtained_job.__dict__ == self.job.__dict__

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_obtain_job_dequeue_type_error(self, init_worker):

          mc_tq = MagicMock()

          self.worker.task_queue = mc_tq

@@ -371,6 +395,7 @@ 

          assert not self.worker.starting_build.called

          assert not self.worker.pkg_built_before.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_obtain_job_dequeue_none_result(self, init_worker):

          mc_tq = MagicMock()

          self.worker.task_queue = mc_tq

@@ -383,7 +408,8 @@ 

          assert not self.worker.starting_build.called

          assert not self.worker.pkg_built_before.called

  

-     def test_dummy_run(self, init_worker, mc_time, mc_grc):

+     @skip("Fixme or remove, test doesn't work.")

+     def test_dummy_run(self, init_worker, mc_time, mc_grl):

          self.worker.init_fedmsg = MagicMock()

          self.worker.run_cycle = MagicMock()

          self.worker.update_process_title = MagicMock()

@@ -396,13 +422,15 @@ 

  

          assert self.worker.init_fedmsg.called

  

-         assert mc_grc.called

+         assert mc_grl.called

          assert self.worker.run_cycle.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_group_name_error(self, init_worker):

          self.opts.build_groups[self.group_id].pop("name")

          assert self.worker.group_name == str(self.group_id)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_update_process_title(self, init_worker, mc_setproctitle):

          self.worker.update_process_title()

          base_title = 'worker-{} {} '.format(self.group_id, self.worker_num)

@@ -421,6 +449,7 @@ 

          self.worker.update_process_title("foobar")

          assert mc_setproctitle.call_args[0][0] == title_with_name + "foobar"

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_dummy_notify_job_grab_about_task_end(self, init_worker):

          self.worker.rc = MagicMock()

          self.worker.notify_job_grab_about_task_end(self.job)

@@ -441,6 +470,7 @@ 

          })

          assert self.worker.rc.publish.call_args == mock.call(JOB_GRAB_TASK_END_PUBSUB, expected2)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_run_cycle(self, init_worker, mc_time):

          self.worker.update_process_title = MagicMock()

          self.worker.obtain_job = MagicMock()

@@ -505,6 +535,7 @@ 

          assert self.worker.notify_job_grab_about_task_end.call_args[1]["do_reschedule"]

          assert self.worker.vmm.release_vm.called

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_run_cycle_halt_on_can_start_job_false(self, init_worker):

          self.worker.notify_job_grab_about_task_end = MagicMock()

          self.worker.obtain_job = MagicMock()

@@ -19,13 +19,15 @@ 

  from backend.vm_manage import VmStates

  from backend.vm_manage.manager import VmManager

  from backend.daemons.vm_master import VmMaster

- from backend.constants import JOB_GRAB_TASK_END_PUBSUB

  from backend.exceptions import VmError, VmSpawnLimitReached

  

- from unittest import mock

+ from unittest import mock, skip

  from unittest.mock import patch, MagicMock

  import pytest

  

+ # TODO: drop these, these are not needed nowadays

+ JOB_GRAB_TASK_END_PUBSUB = "unused"

+ 

  

  """

  REQUIRES RUNNING REDIS

@@ -509,6 +511,7 @@ 

              with pytest.raises(VmSpawnLimitReached):

                  self.vm_master._check_total_vm_limit(0)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_try_spawn_error_handling(self, mc_time):

          mc_time.time.return_value = 0

          self.vm_master.log = MagicMock()

backend/tests/daemons/unused_test_job_grab.py backend/tests/daemons/test_job_grab.py
file renamed
file was moved with no change to the file

@@ -5,7 +5,7 @@ 

  from pprint import pprint

  import socket

  from munch import Munch

- from backend.exceptions import BuilderError, BuilderTimeOutError, AnsibleCallError, AnsibleResponseError, VmError

+ from backend.exceptions import BuilderError, VmError

  

  import tempfile

  import shutil

@@ -13,7 +13,7 @@ 

  

  from backend.job import BuildJob

  

- from unittest import mock

+ from unittest import mock, skip

  from unittest.mock import patch, MagicMock

  import pytest

  from types import MethodType

@@ -21,20 +21,20 @@ 

  import backend.mockremote.builder as builder_module

  from backend.mockremote.builder import Builder

  

- # @pytest.yield_fixture

- # def mc_ansible_runner():

- # patcher = mock.patch("backend.mockremote.builder.Runner")

- # yield patcher.start()

- # patcher.stop()

- 

- 

  MODULE_REF = "backend.mockremote.builder"

  

+ # TODO: drop these, these are not needed

+ class BuilderTimeOutError(Exception):

+     pass

+ class AnsibleCallError(Exception):

+     pass

+ class AnsibleResponseError(Exception):

+     pass

+ 

  

  @pytest.yield_fixture

  def mc_socket():

-     with mock.patch("{}.socket".format(MODULE_REF)) as handle:

-         yield handle

+     yield object()

  

  

  def noop(*args, **kwargs):

@@ -127,12 +127,7 @@ 

          return builder

  

      def setup_method(self, method):

-         self.mc_ansible_runner_patcher = mock.patch("backend.mockremote.builder.Runner")

-         self.mc_ansible_runner = self.mc_ansible_runner_patcher.start()

-         self.mc_ansible_runner.side_effect = lambda **kwargs: mock.MagicMock(**kwargs)

- 

          self.test_root_path = tempfile.mkdtemp()

- 

          self.stage = 0

          self.stage_ctx = defaultdict(dict)

  

@@ -141,26 +136,23 @@ 

          return self.gen_mockchain_command(self.BUILDER_PKG)

  

      def teardown_method(self, method):

-         self.mc_ansible_runner_patcher.stop()

-         # remote tmp dir

- 

          if os.path.exists(self.test_root_path):

              shutil.rmtree(self.test_root_path)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_constructor(self):

-         assert not self.mc_ansible_runner.called

          builder = self.get_test_builder()

-         assert self.mc_ansible_runner.called

- 

          assert builder.conn.remote_user == self.BUILDER_USER

          assert builder.root_conn.remote_user == "root"

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_get_remote_pkg_dir(self):

          builder = self.get_test_builder()

          expected = "/".join([self.BUILDER_REMOTE_TMPDIR, "build", "results",

                               self.BUILDER_CHROOT, builder.remote_pkg_name])

          assert builder._get_remote_results_dir() == expected

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_run_ansible(self):

          builder = self.get_test_builder()

          ans_cmd = "foo bar"

@@ -173,6 +165,7 @@ 

                  assert conn.module_args == ans_cmd

                  assert conn.module_name == module_name or "shell"

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_check_for_ans_answer(self):

          """

              Silly test. Ansible api has almost no documentation,

@@ -297,6 +290,7 @@ 

              # counter += 1

              # print("\nCounter {} passed".format(counter))

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_get_ans_results(self):

          result_obj = "RESULT_STRING"

          results = {"dark": {self.BUILDER_HOSTNAME: result_obj}, "contacted": {}}

@@ -312,6 +306,7 @@ 

          results = {"contacted": {}, "dark": {}}

          assert {} == builder_module.get_ans_results(results, self.BUILDER_HOSTNAME)

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_check_hostname_check(self, mc_socket):

          mc_socket.gethostbyname.side_effect = socket.gaierror()

          builder = self.get_test_builder()

@@ -321,6 +316,7 @@ 

                  builder.hostname = name

                  builder.check()

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_check_missing_required_binaries(self, mc_socket):

          builder = self.get_test_builder()

          self.stage = 0

@@ -344,6 +340,7 @@ 

  

          assert "does not have mock or rsync installed" in err.value.msg

  

+     @skip("Fixme or remove, test doesn't work.")

      def test_check_missing_mockchain_or_mock_config(self, mc_socket):