#388 *WIP* More hacky support, but for containers (originally D1196)
Closed 5 years ago by jskladan. Opened 6 years ago by jskladan.

file modified
-2
@@ -84,9 +84,7 @@ 

  Summary:        disposable client module for libtaskotron

  

  Requires:       libtaskotron-core = %{version}-%{release}

- Requires:       python-paramiko >= 1.15.1

  Requires:       testcloud >= 0.1.10

- BuildRequires:  python-paramiko >= 1.15.1

  BuildRequires:  testcloud >= 0.1.10

  

  %description -n libtaskotron-disposable

file modified
+134 -198
@@ -4,211 +4,147 @@ 

  # See the LICENSE file for more details on Licensing

  

  from __future__ import absolute_import

- import tempfile

+ 

+ import os

  import os.path

- import imp

- import copy

- import collections

- import pipes

- 

- from libtaskotron import taskformula

- from libtaskotron import logger

- from libtaskotron import python_utils

+ import subprocess

+ 

  from libtaskotron import config

  from libtaskotron.logger import log

- import libtaskotron.exceptions as exc

- from libtaskotron.directives import exitcode_directive

+ from libtaskotron import file_utils

+ from libtaskotron import os_utils

  

- try:

-     from libtaskotron.ext.fedora import rpm_utils

- except ImportError, e:

-     raise exc.TaskotronImportError(e)

+ from libtaskotron.ext.disposable import vm

+ from libtaskotron.ext.disposable import docker

+ # TODO exc

  

  

  class Executor(object):

-     '''This class serves the purpose of actual task execution. It is instantiated by

-     :class:`.Overlord`, :class:`.PersistentMinion` or :class:`.DisposableMinion`.

- 

-     :cvar str default_namespace: namespace to be used when the formula doesn't contain a namespace

-                                  definition

-     :ivar dict formula: parsed task formula

-     :ivar dict arg_data: processed cli arguments with some extra runtime variables

-     :ivar str workdir: path to working directory; a temporary one is created during execution if

-                        ``None``

-     :ivar dict working_data: variables exported from formula directives, to be injected when

-                              referenced using ``${variable}``

-     :ivar int exitcode: exit code set by ``exitcode`` directive; it is ``None`` if ``exitcode``

-                         directive was not used

-     '''

- 

-     default_namespace = 'scratch'

- 

-     def __init__(self, formula, arg_data, workdir=None):

-         self.formula = formula

+     def __init__(self, arg_data):

          self.arg_data = arg_data

-         self.working_data = {}

-         self.directives = {}

-         self.workdir = workdir

-         self.exitcode = None

- 

-     def execute(self):

-         '''Execute the task (consists of preparing the task and running it).'''

- 

-         filelog_path = os.path.join(self.arg_data['artifactsdir'], 'taskotron.log')

-         logger.add_filehandler(filelog_path=filelog_path, remove_mem_handler=True)

- 

-         self._prepare_task()

-         self._run()

- 

-     def _prepare_task(self):

-         '''Prepare the environment for running a task. If there are packages, requirements or

-         groups specified in the task formula, this method tries to install them (in case of

-         production profile) or checks whether they are installed (in case of developer profile).

-         '''

-         rpms = self.formula.get('environment', {}).get('rpm', [])

-         profile = config.get_config().profile

- 

-         if not rpm_utils.is_installed(rpms):

-             if profile == config.ProfileName.PRODUCTION or self.arg_data['local']:

-                 rpm_utils.install(rpms)

+         self.task_vm = None

+         self.run_remotely = False

+         self.task_container = None

+ 

+     def _spawn_vm(self):

+         self.task_vm = vm.TestCloudMachine(self.arg_data['uuid'])

+         self.task_vm.prepare()

+ 

+         log.info('Running task over SSH on machine %s', self.task_vm.instancename)

+ 

+         self.task_vm.wait_for_port(22)

+ 

+         return self.task_vm.ipaddr

+ 

+     def _spawn_container(self):

+         self.client = docker.DockerClient()

+         self.task_container = self.client.create_container(uuid=self.arg_data['uuid'],

+                                                            artifacts=self.arg_data['artifactsdir'])

+ 

+         log.info('Running task on a container on port {}.'.format(self.task_container))

+ 

+         return ("127.0.0.1", self.task_container)

+ 

+     def _prepare(self):

+         # when running remotely, run directly over ssh, instead of using libvirt

+         persistent = False

+         # when running on docker, run directly over ssh, into the container

+         docker = False

+ 

+         runtask_mode = config.get_config().runtask_mode

+         if runtask_mode == config.RuntaskModeName.LOCAL:

+             self.run_remotely = False

+         elif runtask_mode == config.RuntaskModeName.LIBVIRT:

+             self.run_remotely = True

+         elif runtask_mode == config.RuntaskModeName.DOCKER:

+             self.run_remotely = True

+             docker = True

+         else:

+             assert False, 'This should never occur'

+ 

+         if self.arg_data['local']:

+             log.debug("Forcing local execution (option --local)")

+             self.run_remotely = False

+ 

+         elif self.arg_data['libvirt']:

+             log.debug("Forcing remote execution (option --libvirt)")

+             self.run_remotely = True

+             persistent = False

+ 

+         elif self.arg_data['ssh']:

+             log.debug('Forcing remote execution (option --ssh)')

+             self.run_remotely = True

+             persistent = True

+ 

+         elif self.arg_data['docker']:

+             log.debug('Forcing execution on docker (option --docker)')

+             self.run_remotely = True

+             persistent = False

+             docker = True

+ 

+         log.debug('Execution mode: %s', 'remote' if self.run_remotely else 'local')

+ 

+         ipaddr = '127.0.0.1'

+         if self.run_remotely and not docker:

+             if persistent:

+                 ipaddr = self.arg_data['machine']

              else:

-                 raise exc.TaskotronError('Some packages are not installed. Please run '

-                                          '"dnf install %s" to install all required packages.'

-                                          % " ".join([pipes.quote(rpm) for rpm in rpms]))

- 

-     def _run(self):

-         self._validate_input()

- 

-         if not self.workdir:  # create temporary workdir if needed

-             self.workdir = tempfile.mkdtemp(prefix="task-",

-                                             dir=config.get_config().tmpdir)

-         log.debug("Current workdir: %s", self.workdir)

-         self.arg_data['workdir'] = self.workdir

-         self.arg_data['checkname'] = self.formula['name']

-         self.arg_data['namespace'] = self.formula.get('namespace') or self.default_namespace

- 

-         # override variable values

-         for var, val in self.arg_data['override'].items():

-             log.debug("Overriding variable %s, new value: %s", var, val)

-             self.arg_data[var] = eval(val, {}, {})

- 

-         self._do_actions()

- 

-     def _validate_input(self):

-         if 'input' not in self.formula:

-             return

- 

-         if not isinstance(self.formula['input'], collections.Mapping):

-             raise exc.TaskotronYamlError("Input yaml should contain correct 'input'"

-                                          "section (a mapping). Yours was: %s" %

-                                          type(self.formula['input']))

- 

-         required_args = self.formula['input'].get('args', None)

- 

-         if not python_utils.iterable(required_args):

-             raise exc.TaskotronYamlError("Input yaml should contain correct 'args' "

-                                          "section (an iterable). Yours was: %s" %

-                                          type(required_args))

- 

-         for arg in required_args:

-             if arg not in self.arg_data:

-                 raise exc.TaskotronYamlError("Required input arg '%s' "

-                                              "was not defined" % arg)

- 

-     def _do_actions(self):

-         '''Sequentially run all actions for a task. An 'action' is a single step

-         under the ``actions:`` key. An example action looks like::

- 

-           - name: download rpms from koji

-             koji:

-               action: download

-               koji_build: $koji_build

-               arch: $arch

-         '''

-         if 'actions' not in self.formula or not self.formula['actions']:

-             raise exc.TaskotronYamlError("At least one task should be specified"

-                                          " in input formula")

- 

-         for action in self.formula['actions']:

-             self._do_single_action(action)

- 

-     def _do_single_action(self, action):

-         '''Execute a single action from the task. See :meth:`do_actions` to see

-         how an action looks like.

- 

-         :param dict action: An action specification parsed from the task formula

-         '''

-         directive_name = self._extract_directive_from_action(action)

- 

-         rendered_action = self._render_action(action)

- 

-         log.debug("Executing directive: %s", directive_name)

-         self._load_directive(directive_name)

- 

-         directive_object = self.directives[directive_name]

-         directive_classname = directive_object.directive_class

- 

-         directive_callable = getattr(directive_object, directive_classname)()

- 

-         output = directive_callable.process(rendered_action[directive_name],

-                                             self.arg_data)

- 

-         if 'export' in action:

-             self.working_data[action['export']] = output

-             log.debug("Variable ${%s} was exported with value:\n%s" %

-                       (action['export'], output))

- 

-         # unable to use isinstance() due to dynamically loaded directives

-         if directive_classname == exitcode_directive.directive_class:

-             # set exitcode if was None or overwrite previous SUCCESS with FAILURE (not vice versa)

-             if not self.exitcode or output != exitcode_directive.SUCCESS:

-                 log.info("Setting exitcode to %s" % output)

-                 self.exitcode = output

- 

-     def _extract_directive_from_action(self, action):

-         for key in action:

-             if key not in ['name', 'export']:

-                 return key

-         raise exc.TaskotronYamlError('no directive found in action %s' % str(action))

- 

-     def _render_action(self, action):

-         '''Take an action and replace all included variables with actual values

-         from :attr:`arg_data` and :attr:`working_data`. See :meth:`do_actions`

-         to see how an action looks like.

- 

-         :param dict action: An action specification parsed from the task formula

-         :return: a rendered action

-         :rtype: dict

-         '''

-         # copy the input so that we don't disrupt what we're processing

-         rendered_action = copy.deepcopy(action)

- 

-         variables = copy.deepcopy(self.arg_data)

-         variables.update(self.working_data)

-         taskformula.replace_vars_in_action(rendered_action, variables)

- 

-         return rendered_action

- 

-     def _load_directive(self, directive_name, directive_dir=None):

-         # look in default path if nothing is specified

-         if not directive_dir:

-             directive_dir = os.path.join(os.path.dirname(__file__),

-                                          'directives')

- 

-         real_name = "%s_directive" % directive_name

-         directive_file = os.path.join(directive_dir, '%s.py' % real_name)

- 

-         if not os.path.exists(directive_file):

-             raise exc.TaskotronDirectiveError("Directive %s not found in directory %s" %

-                                               (directive_name, directive_dir))

- 

+                 ipaddr = self._spawn_vm()

+ 

+         with open(os.path.join('inventory'), 'w') as f:

+             f.write('[localhost]\n%s' % ipaddr)

+ 

+         if docker:

+             ipaddr, port = self._spawn_container()

+ 

+             with open(os.path.join('inventory'), 'w') as f:

+                 f.write('[localhost]\n%s\t\tansible_user=root'

+                         ' ansible_port=%s ansible_password=passw0rd '

+                         'ansible_ssh_common_args="-o UserKnownHostsFile=/dev/null '

+                         '-o StrictHostKeyChecking=no"'% (ipaddr, port))

+ 

+     def _execute(self):

+         cmd = [

+             'ansible-playbook', self.arg_data['task'],

+             '-b',

+             '-u root',

+             '-i', 'inventory',

+             '-e', 'artifacts=%s' % self.arg_data['artifactsdir'],

+             '-e', 'subjects=%s' % self.arg_data['item'],

+         ]

+ 

+         if self.run_remotely and not self.arg_data['docker']:

+             cmd.extend(['--private-key', self.arg_data['ssh_privkey']])

+         elif self.arg_data['docker']:

+             cmd.extend(['-K'])

+         else:

+             cmd.extend(['-K', '-c', 'local'])

+ 

+         log.debug('Running ansible playbook %s', ' '.join(cmd))

          try:

-             loaded_directive = imp.load_source(real_name, directive_file)

-         except ImportError, e:

-             raise exc.TaskotronImportError(e)

- 

-         self.directives[directive_name] = loaded_directive

- 

-     def _validate_env(self):

-         # TODO: implement this

-         raise NotImplementedError("Environment validation is not yet implemented")

+             output, _ = os_utils.popen_rt(cmd, stderr=subprocess.STDOUT)

+             with open(os.path.join(self.arg_data['artifactsdir'], 'ansible.log'), 'w') as f:

+                 f.write(output)

+         except subprocess.CalledProcessError, e:

+             log.error('ansible-playbook ended with %d return code', e.returncode)

+             log.debug(e.output)

+ 

+     def start(self):

+         try:

+             file_utils.makedirs(self.arg_data['artifactsdir'])

+             log.info("Task artifacts will be saved in: %s", self.arg_data['artifactsdir'])

+         except OSError:

+             log.error("Can't create artifacts directory %s", self.arg_data['artifactsdir'])

+             raise

+ 

+         self._prepare()

+         self._execute()

+ 

+         # clean up

+         if self.task_vm is not None:

+             self.task_vm.teardown()

+         os.remove('inventory')

+ 

+         if self.task_container is not None:

+             self.client.stop_container('taskotron-worker-{}'.format(self.arg_data['uuid']))

+             self.client.rm_container('taskotron-worker-{}'.format(self.arg_data['uuid']))

@@ -0,0 +1,186 @@ 

+ # -*- coding: utf-8 -*-

+ # Copyright 2009-2015, Red Hat, Inc.

+ # License: GPL-2.0+ <http://spdx.org/licenses/GPL-2.0+>

+ # See the LICENSE file for more details on Licensing

+ 

+ """Interface for locally spawned docker containers used for task execution."""

+ 

+ import os

+ import random

+ import subprocess as sp

+ 

+ from libtaskotron.logger import log

+ 

+ 

+ class DockerClient(object):

+     '''Helper class for working with the docker daemon.'''

+ 

+     def __init__(self, host_ip="127.0.0.1"):

+         ''':param host_ip: The host IP where the docker daemon runs. We

+         assume for now that it's running on localhost.'''

+ 

+         # FIXME: Support running commands against a remote host.

+         self.host = host_ip

+         self.images = self._get_images()

+         self.containers = self._get_containers()

+         # We need to keep a list of the containers we create, so

+         # we can co-exist with other users of whatever docker

+         # daemon we might be interacting with.

+         self.owned_containers = []

+         self.owned_images = []

+ 

+     def _get_images(self):

+         '''Return a list of image dicts.'''

+ 

+         template = "{{.ID}}\t{{.Repository}}"

+         raw_output = sp.check_output(['docker',

+                                       'images',

+                                       '--format',

+                                       template])

+ 

+         images = raw_output.split('\n')

+         attrs = ['id', 'name']

+         images = [image.split('\t') for image in images[:-1]]

+         images = [dict(zip(attrs, image)) for image in images]

+ 

+         return images

+ 

+     def _get_containers(self):

+         '''Return a list of all containers on the host.'''

+ 

+         # This is a go template to clean up docker ps output

+         template = "{{.ID}}\t{{.Command}}\t{{.Status}}\t{{.Ports}}\t{{.Names}}"

+ 

+         raw_output = sp.check_output(['docker',

+                                       'ps',

+                                       '-a',

+                                       '--format',

+                                       template])

+ 

+         containers = raw_output.split('\n')

+ 

+         # Clean up the output a bit, also remove the last, empty, entry

+         containers = [x.split('\t') for x in containers][:-1]

+ 

+         attrs = ['id', 'command', 'status', 'ports', 'names']

+         containers = [dict(zip(attrs, container)) for container in containers]

+ 

+         return containers

+ 

+     def _find_container(self, search_term):

+         '''Find a container based on either name or id.'''

+ 

+         for container in self.containers:

+             if search_term in container.values():

+                 return container

+             else:

+                 pass

+ 

+         return "No container found by {}.".format(search_term)

+ 

+     def _manipulate_container(self, identifier, action=None):

+         '''Start or stop a container based on either it's name or id.

+ 

+         :param action: command for docker, either 'start', 'stop', or 'rm'

+         :param identifier: a container ID or Name'''

+ 

+         if action not in ['start', 'stop', 'rm']:

+             raise ClientError("Available actions are: start, stop, and rm. "

+                               "Could not perform: {}".format(action))

+ 

+         container = self._find_container(identifier)

+         try:

+             sp.check_call(['docker',

+                            action,

+                            container['names']])

+ 

+         except:

+             log.debug("Failed to {} the container '{}'.".format(action, container['names']))

+             raise ClientError("Could not {} the container.".format(action))

+ 

+     def build_image(self, path_to_dockerfile_dir, imgtag='taskotron-worker'):

+         '''Run `docker build Dockerfile` against the specified path.'''

+         sp.check_call(['docker',

+                        'build',

+                        '-t',

+                        imgtag,

+                        path_to_dockerfile_dir])

+         # Update our lists

+         self.images = self._get_images()

+         self.owned_images.append(imgtag)

+ 

+     def create_container(self,

+                          image_name='taskotron-worker',

+                          container_name='taskotron-worker',

+                          uuid=None,

+                          artifacts=None):

+         '''Method to create a container from a specified image.

+         :param image_name: the name of the built image you want to launch.

+         '''

+ 

+         # Build the image if it's not already there

+         image_found = False

+         for image in self.images:

+             if image_name == image['name']:

+                 log.debug("Image already exists, skipping image build...")

+                 image_found = True

+ 

+         if not image_found:

+             log.debug("Image not found, building...")

+             self.build_image(os.path.abspath(os.curdir) + '/libtaskotron/ext/docker/')

+ 

+         if uuid:

+             container_name += "-{}".format(uuid)

+ 

+         # Generate random port for host to use (between 2200-2300)

+         host_port = "{}".format(random.randrange(2200, 2300))

+         log.info("Container will be available on port: {}".format(host_port))

+         sp.check_call(['docker',

+                        'run',

+                        '-d',

+                        '--name', container_name,

+                        '-p', '{}:22'.format(host_port),

+                        '-v', '{}:/artifacts'.format(artifacts),

+                        image_name])

+ 

+         # Update our list of all containers

+         self.containers = self._get_containers()

+ 

+         # Update the list of owned_containers

+         self.owned_containers.append(self._find_container(container_name))

+ 

+         return host_port

+ 

+     def start_container(self, identifier):

+         self._manipulate_container(identifier, action='start')

+ 

+     def stop_container(self, identifier):

+         self._manipulate_container(identifier, action='stop')

+ 

+     def rm_container(self, identifier):

+         # Update our list of owned containers, since we're deleting it

+         for i in xrange(len(self.owned_containers)):

+             if self.owned_containers[i]['names'] or self.owned_containers[i]['id'] == identifier:

+                 self.owned_containers.pop(i)

+                 break

+ 

+         self._manipulate_container(identifier, action='rm')

+ 

+     def clear_all(self):

+         '''Clean out all owned containers and images.'''

+ 

+         log.info("Clearing out all our containers...")

+         for container in self.owned_containers:

+             self.stop_container(container['names'])

+             self.rm_container(container['names'])

+ 

+         log.info("Clearing out all our images...")

+         for image in self.owned_images:

+             sp.check_call(['docker',

+                            'rmi',

+                            image])

+ 

+ 

+ class ClientError(Exception):

+     '''General exceptions we might run into using this client.'''

+     pass

@@ -0,0 +1,21 @@ 

+ #FROM registry.fedoraproject.org/fedora:25

+ FROM fedora:25

+ 

+ RUN dnf install openssh-server PyYAML libtaskotron-core libtaskotron-fedora \

+     libtaskotron-config python-solv python-librepo passwd -y && \

+     mkdir -p /var/log/taskotron /srv/taskotron/artifacts && \

+     dnf clean all

+ 

+ ENV LANG C.utf8

+ ENV LC_ALL C.utf8

+ 

+ ADD https://pagure.io/taskotron/libtaskotron/blob/master/f/conf/yumrepoinfo.conf.example /etc/taskotron/yumrepoinfo.conf

+ ADD taskotron.yaml /etc/taskotron/taskotron.yaml

+ ADD namespaces.yaml /etc/taskotron/namespaces.yaml

+ 

+ RUN echo "passw0rd" | passwd --stdin root

+ RUN ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key -N ""

+ 

+ EXPOSE 22

+ 

+ CMD ["/sbin/sshd", "-D"]

@@ -0,0 +1,5 @@ 

+ namespaces_safe:

+     - scratch

+     - qa

+     - pkg

+     - dist

@@ -0,0 +1,7 @@ 

+ profile: production

+ runtask_mode: local

+ resultsdb_server: http://resultsdb:5001/api/v2.0

+ resultsdb_frontend: http://localhost:5002

+ execdb_server: http://localhost:5003

+ taskotron_master: http://localhost:8010

+ artifacts_baseurl: http://localhost:8081

file modified
+6 -22
@@ -8,7 +8,6 @@ 

  import os.path

  import argparse

  import datetime

- import sys

  import copy

  

  import libtaskotron
@@ -16,7 +15,7 @@ 

  from libtaskotron import config

  from libtaskotron import check

  from libtaskotron.logger import log

- from libtaskotron.overlord import Overlord

+ from libtaskotron.executor import Executor

  

  

  def get_argparser():
@@ -24,11 +23,9 @@ 

  

      :rtype: :class:`argparse.ArgumentParser`

      '''

-     # IMPORTANT: When you add a new argument, look into `minion.BaseMinion.arg_data_exclude` and

-     # update it if needed!

  

      parser = argparse.ArgumentParser()

-     parser.add_argument("task", help="task formula to run")

+     parser.add_argument("task", help="task playbook to run")

      parser.add_argument("-a", "--arch",

                          choices=["i386", "x86_64", "armhfp", "noarch"], default='noarch',

                          help="architecture specifying the item to be checked. 'noarch' value "
@@ -47,13 +44,6 @@ 

                          help="Unique job identifier for the execution"

                               "status tracking purposes. If unset, defaults to"

                               "current datetime in UTC")

-     parser.add_argument("--override", action="append", default=[], metavar='VAR=VALUE',

-                         help="override internal variable values used in runner "

-                              "and the task formula. Value itself is evaluated "

-                              "by eval(). This option can be used multiple times. "

-                              "Example: --override \"workdir='/some/dir/'\"")

-     parser.add_argument("-p", "--patch", help="patch to apply to remote system"

-                                               "before execution of task")

      parser.add_argument("--local", action="store_true",

                          help="make the task run locally on this very machine (the default "

                               "behavior for development profile). This also approves any required "
@@ -68,6 +58,8 @@ 

                          help="path to private key for remote connections over ssh")

      parser.add_argument("--no-destroy", action="store_true",

                          help="do not destroy disposable client at the end of task execution")

+     parser.add_argument("--docker", action="store_true",

+                         help="Run the task inside a docker container.")

  

      return parser

  
@@ -107,12 +99,6 @@ 

      if args['type'] in check.ReportType.list():

          args[args['type']] = args['item']

  

-     override = {}

-     for var in args['override']:

-         name, value = var.split('=', 1)

-         override[name] = value

-     args['override'] = override

- 

      # parse ssh

      if args['ssh']:

          args['user'], machine = args['ssh'].split('@')
@@ -152,12 +138,10 @@ 

      logger.init(level_stream=level_stream)

  

      # start execution

-     overlord = Overlord(arg_data)

-     overlord.start()

+     executor = Executor(arg_data)

+     executor.start()

  

      # finalize

      log.info('Execution finished at: %s. Task artifacts were saved in: %s',

               datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC'),

               arg_data['artifactsdir'])

-     log.info("Exiting with exit status %d" % overlord.exitcode)

-     sys.exit(overlord.exitcode)

file removed
-257
@@ -1,257 +0,0 @@ 

- # -*- coding: utf-8 -*-

- # Copyright 2009-2015, Red Hat, Inc.

- # License: GPL-2.0+ <http://spdx.org/licenses/GPL-2.0+>

- # See the LICENSE file for more details on Licensing

- 

- from __future__ import absolute_import

- import os.path

- import yaml

- import pipes

- 

- from libtaskotron import logger

- from libtaskotron import config

- from libtaskotron import remote_exec

- from libtaskotron import python_utils

- from libtaskotron.logger import log

- from libtaskotron import taskformula

- import libtaskotron.exceptions as exc

- 

- try:

-     from libtaskotron.ext.disposable import vm

- except ImportError, e:

-     raise exc.TaskotronImportError(e)

- 

- 

- class BaseMinion(object):

-     '''Base Minion class that shouldn't be used on its own, it solely initiates inner attributes

-     and environment. It also provides method _run() that delegates the task execution to Executor

-     over SSH.

- 

-     :cvar tuple arg_data_exclude: a tuple of cmdline options/arguments which should not be

-         forwarded from the host to the minion when execution the task. Use the same names as the

-         stored variables in ``arg_data`` use (i.e. dashes converted to underscores, etc).

-     :ivar dict formula: parsed task formula

-     :ivar dict arg_data: processed cli arguments with some extra runtime variables

-     :ivar str artifactsdir: path to ``artifactsdir`` for storing logs

-     :ivar int exitcode: exit code of libtaskotron runner executed on the minion

-     :ivar ssh: an instance of :class:`.ParamikoWrapper` which is used to execute all remote

-         commands on a minion. ``None`` by default, you need to set this before you start

-         :meth:`execute`.

-     '''

- 

-     arg_data_exclude = ('task', 'libvirt', 'ssh', 'ssh_privkey', 'no_destroy', 'patch')

- 

-     def __init__(self, formula, arg_data):

-         self.formula = formula

-         self.arg_data = arg_data

-         self.artifactsdir = self.arg_data['artifactsdir']

- 

-         self.user = arg_data.get('user', None)

-         self.machine = arg_data.get('machine', None)

-         self.port = arg_data.get('port', None)

-         self.ssh_privkey = arg_data['ssh_privkey'] or config.get_config().ssh_privkey

- 

-         self.exitcode = None

-         self.ssh = None

- 

-     def _prepare_task(self):

-         '''Prepare the environment to run the task.'''

-         # TODO: add sudo support

- 

-         log.info('Examining the minion and installing libtaskotron...')

-         self.taskdir = config.get_config().client_taskdir

- 

-         # create initial DNF cache (but only if needed)

-         # uses the same logic as in libtaskotron.ext.fedora.rpm_utils.dnf_cache_available()

-         self.ssh.cmd('dnf --cacheonly repolist || dnf makecache')

- 

-         # add additional repos to minion

-         repos = ['--add-repo %s' % pipes.quote(repo) for repo in config.get_config().minion_repos]

-         if repos:

-             self.ssh.cmd('dnf config-manager %s' % ' '.join(repos))

- 

-         # create DNF cache for taskotron repos

-         # this hard-coding is very suboptimal, but it's hard to find a better way how to refresh

-         # only taskotron repos (unless we require their names listed in configuration)

-         excluded = ['fedora', 'updates', 'updates-testing']

-         exclude_cmd = ' '.join(['--disablerepo %s' % pipes.quote(repo) for repo in excluded])

-         self.ssh.cmd('dnf makecache %s' % exclude_cmd)

- 

-         # install libtaskotron

-         # FIXME this should only need libtaskotron-core T651. libtaskotron-fedora should be

-         # installed as a check dep.

-         # TODO: remove tar dep once libtaskotron 0.4.16 has been released for long enough

-         # (tar is here just for backward compatibility, see D965)

-         self.ssh.install_pkgs(['libtaskotron-core', 'libtaskotron-fedora', 'tar'])

- 

-         log.info('Copying task files onto the minion...')

- 

-         # Copy config files. Go through CONF_DIRS in reversed order to preserve priority.

-         for config_dir in reversed(config.CONF_DIRS):

-             if os.path.exists(config_dir):

-                 self.ssh.put_dir(config_dir, '/etc/taskotron/')

- 

-         # create taskdir, remove the old one first

-         self.ssh.cmd('rm -rf {0} && mkdir -p {0}'.format(pipes.quote(self.taskdir)))

- 

-         # patch remote libtaskotron if needed (for dev, shouldn't be used in production)

-         if self.arg_data['patch'] is not None:

-             self.ssh.put_file(os.path.abspath(self.arg_data['patch']), '%s/%s' % (

-                 self.taskdir, self.arg_data['patch']))

-             self.ssh.install_pkgs(['patch'])

-             self.ssh.cmd('patch -d /usr/lib/python2.7/site-packages/ -p1 -i %s/%s' % (

-                 pipes.quote(self.taskdir), pipes.quote(self.arg_data['patch'])))

- 

-         # put files needed for execution (task, input files, etc)

-         self.ssh.put_dir(os.path.dirname(os.path.abspath(self.arg_data['task'])), self.taskdir)

- 

-         # need to have default_flow_style false to get valid yaml w/ nested dicts

-         self.ssh.write_file(os.path.join(self.taskdir,

-                                          os.path.basename(self.arg_data['task'])),

-                             yaml.dump(self.formula, default_flow_style=False))

- 

-     def _run(self):

-         '''The main method that runs the task'''

- 

-         task = os.path.basename(self.arg_data['task'])

-         cmdline = python_utils.reverse_argparse(self.arg_data['_orig_args'],

-                                                 ignore=self.arg_data_exclude)

-         cmdline.append('--local')

-         cmdline.append(task)

- 

-         # be paranoid and escape everything

-         cmdline = [pipes.quote(elem) for elem in cmdline]

- 

-         # execute task

-         log.info('Executing the task on the minion...')

-         task_cmd = ['cd', pipes.quote(self.taskdir), '&&', 'runtask'] + cmdline

- 

-         self.exitcode = self.ssh.cmd(' '.join(task_cmd))

-         log.info('Task execution on the minion is complete')

- 

-     def _get_output(self):

-         '''copy artifacts from vm'''

-         log.info('Copying task artifacts and logs from the minion...')

-         try:

-             self.ssh.get_dir(self.artifactsdir, self.artifactsdir)

-         except exc.TaskotronRemoteError, e:

-             log.error(e)

- 

-     def execute(self):

-         '''This method has to be implemented by classes that inherits from :class:`.BaseMinion`.

-         It should contain environment initialization (namely set :attr:`ssh` to properly

-         instantiated :class:`.ParamikoWrapper`) and cleanup (if necessary). Moreover, the order of

-         private methods has to be: :meth:`_prepare_task`, :meth:`_run` and :meth:`_get_output`.

-         '''

-         raise exc.TaskotronNotImplementedError()

- 

- 

- class PersistentMinion(BaseMinion):

-     '''Minion class that connects to alredy running machine using SSH and runs the task there.

-     '''

- 

-     def execute(self):

-         '''Init environment and connect to remote machine for task execution.'''

- 

-         filelog_path = os.path.join(self.artifactsdir, 'taskotron-overlord.log')

-         logger.add_filehandler(filelog_path=filelog_path, remove_mem_handler=True)

- 

-         stdio_filename = os.path.join(self.artifactsdir, 'taskotron-stdio.log')

- 

-         log.info("Running task over SSH on machine '%s'...", self.machine)

- 

-         with remote_exec.ParamikoWrapper(self.machine, self.port, self.user,

-                                          self.ssh_privkey, stdio_filename) as self.ssh:

-             try:

-                 self._prepare_task()

-                 self._run()

-             except exc.TaskotronRemoteTimeoutError:

-                 # do not try to get logs - the minion is likely dead and we would hang on

-                 # getting the logs, see https://phab.qa.fedoraproject.org/T665

-                 log.error('Connection to the minion timed out')

-                 raise

-             except exc.TaskotronRemoteError:

-                 # this is the case when remote execution failed but it should be possible to

-                 # get logs from the minion, so try it

-                 log.error('Error occurred on the minion, task execution stopped')

-                 self._get_output()

-                 raise

-             else:

-                 self._get_output()

- 

- 

- class DisposableMinion(BaseMinion):

-     '''Minion class that creates a disposable client, connects to it over SSH and runs the task

-     there.

-     '''

- 

-     def execute(self):

-         '''Init environment and connect to disposable client for task execution.'''

- 

-         filelog_path = os.path.join(self.artifactsdir, 'taskotron-overlord.log')

-         logger.add_filehandler(filelog_path=filelog_path, remove_mem_handler=True)

- 

-         stdio_filename = os.path.join(self.artifactsdir, 'taskotron-stdio.log')

- 

-         execution_ok = False

-         try:

-             log.info("Running task on a disposable client using libvirt...")

- 

-             log.info("Creating VM for task")

- 

-             env = taskformula.devise_environment(self.formula, self.arg_data)

- 

-             task_vm = vm.TestCloudMachine(self.arg_data['uuid'])

-             task_vm.prepare(**env)

-             self.machine = task_vm.ipaddr

-             self.port = 22

-             self.user = task_vm.username

- 

-             log.info('Running task over SSH on machine %s (%s@%s:%s)', task_vm.instancename,

-                      self.user, self.machine, self.port)

- 

-             task_vm.wait_for_port(self.port)

- 

-             with remote_exec.ParamikoWrapper(self.machine, self.port, self.user,

-                                              self.ssh_privkey, stdio_filename) as self.ssh:

-                 try:

-                     self._prepare_task()

-                     self._run()

-                 except exc.TaskotronRemoteTimeoutError:

-                     # do not try to get logs - the minion is likely dead and we would hang on

-                     # getting the logs, see https://phab.qa.fedoraproject.org/T665

-                     log.error('Connection to the minion timed out')

-                     raise

-                 except exc.TaskotronRemoteError:

-                     # this is the case when remote execution failed but it should be possible to

-                     # get logs from the minion, so try it

-                     log.error('Error occurred on the minion, task execution stopped')

-                     self._get_output()

-                     raise

-                 else:

-                     self._get_output()

- 

-             execution_ok = True

-         finally:

-             if not execution_ok:

-                 log.warn('Task execution was interrupted by an error, doing emergency cleanup')

- 

-             if self.arg_data['no_destroy']:

-                 log.info('--no-destroy mode was set from command line, not tearing VM '

-                          'down for task %s: %s@%s:%s', self.arg_data['uuid'], self.user,

-                          self.machine, self.port)

-             else:

-                 try:

-                     log.info('Shutting down the minion...')

-                     task_vm.teardown()

-                 # Catch 'em all since we can't be sure that every exception trown

-                 # is TaskotronRemoteError. If we omitted any exception that would

-                 # be later thrown in the teardown, we would suppress (effectively

-                 # make it silent) any possible exception from execution above

-                 # which is more important.

-                 except Exception, e:

-                     if execution_ok:

-                         raise e

-                     else:

-                         # a different exception is already being raised, don't override it

-                         log.exception(e)

file removed
-100
@@ -1,100 +0,0 @@ 

- # -*- coding: utf-8 -*-

- # Copyright 2009-2015, Red Hat, Inc.

- # License: GPL-2.0+ <http://spdx.org/licenses/GPL-2.0+>

- # See the LICENSE file for more details on Licensing

- 

- from __future__ import absolute_import

- 

- from libtaskotron import config

- from libtaskotron.logger import log

- from libtaskotron import executor

- from libtaskotron import file_utils

- from libtaskotron import exceptions as exc

- from libtaskotron.directives import exitcode_directive

- 

- 

- class Overlord(object):

-     """Overlord class encapsulates decision whether the task is run locally or remotely (in

-     persistent or disposable client) and orchestrates the execution.

- 

-     :ivar dict arg_data: processed cli arguments with some extra runtime variables

-     :ivar int exitcode: exit code of the task; if ``None``, it means :meth:`start` was not called

-                         yet

-     :ivar dict formula: parsed task formula

-     """

- 

-     def __init__(self, arg_data):

-         self.arg_data = arg_data

-         self.exitcode = None

- 

-         # parse task formula

-         self.formula = config.parse_yaml_from_file(arg_data['task'])

-         if not self.formula:

-             raise exc.TaskotronYamlError('Task formula file should not be empty: %s' %

-                                          arg_data['task'])

- 

-         log.debug('Found task: %s.%s',

-                   self.formula.get('namespace') or executor.Executor.default_namespace,

-                   self.formula['name'])

- 

-     def _get_runner(self):

-         """Decide the run mode and get appropriate runner (Executor or Minion).

- 

-         :returns: either :class:`.Executor` (local run mode) or :class:`.PersistentMinion` or

-                   :class:`.DisposableMinion` (remote run mode)

-         """

-         # run locally or remotely

-         run_remotely = False

-         # when running remotely, run directly over ssh, instead of using libvirt

-         persistent = False

- 

-         # decide whether to run locally or remotely

-         runtask_mode = config.get_config().runtask_mode

-         if runtask_mode == config.RuntaskModeName.LOCAL:

-             run_remotely = False

-         elif runtask_mode == config.RuntaskModeName.LIBVIRT:

-             run_remotely = True

-         else:

-             assert False, 'This should never occur'

- 

-         if self.arg_data['local']:

-             log.debug("Forcing local execution (option --local)")

-             run_remotely = False

- 

-         elif self.arg_data['libvirt']:

-             log.debug("Forcing remote execution (option --libvirt)")

-             run_remotely = True

-             persistent = False

- 

-         elif self.arg_data['ssh']:

-             log.debug('Forcing remote execution (option --ssh)')

-             run_remotely = True

-             persistent = True

- 

-         log.debug('Execution mode: %s', 'remote' if run_remotely else 'local')

- 

-         if run_remotely:

-             from libtaskotron import minion

-             if persistent:

-                 return minion.PersistentMinion(self.formula, self.arg_data)

-             else:

-                 return minion.DisposableMinion(self.formula, self.arg_data)

-         else:

-             return executor.Executor(self.formula, self.arg_data)

- 

-     def start(self):

-         """Start the overlord, get runner and execute the task (either locally or remotely).

-         """

-         try:

-             file_utils.makedirs(self.arg_data['artifactsdir'])

-             log.info("Task artifacts will be saved in: %s", self.arg_data['artifactsdir'])

-         except OSError:

-             log.error("Can't create artifacts directory %s", self.arg_data['artifactsdir'])

-             raise

- 

-         runner = self._get_runner()

-         runner.execute()

- 

-         self.exitcode = 0

-         if runner.exitcode == exitcode_directive.FAILURE:

-             self.exitcode = runner.exitcode

@@ -1,363 +0,0 @@ 

- # -*- coding: utf-8 -*-

- # Copyright 2009-2015, Red Hat, Inc.

- # License: GPL-2.0+ <http://spdx.org/licenses/GPL-2.0+>

- # See the LICENSE file for more details on Licensing

- 

- '''Tools for remote execution primary for disposable clients'''

- 

- from __future__ import absolute_import

- import os

- import os.path

- import sys

- import socket

- import stat

- import time

- import pipes

- import tarfile

- import tempfile

- 

- import paramiko

- 

- from .logger import log

- from . import exceptions as exc

- 

- from libtaskotron.directives import exitcode_directive

- from libtaskotron import file_utils

- 

- 

- class ParamikoWrapper(object):

-     '''Wrapper for SSH communication using paramiko library'''

- 

-     #: timeout for network operations in seconds (900 seconds = 15 minutes)

-     TIMEOUT = 900

- 

-     def __init__(self, hostname, port, username, key_filename, stdio_filename=None):

-         self.ssh = None

-         self.sftp = None

-         self.hostname = hostname

-         self.port = port

-         self.username = username

-         self.key_filename = key_filename

-         self.stdio_filename = stdio_filename

-         self.outstream = file_utils.Tee(sys.stdout)

- 

-     def __enter__(self):

-         self.connect()

-         return self

- 

-     def __exit__(self, type, value, traceback):

-         self.close()

- 

-     def __str__(self):

-         return '<%s: %s@%s:%s>' % (self.__class__.__name__, self.username,

-                                    self.hostname, self.port)

- 

-     def connect(self):

-         '''Connect to a machine over ssh. Open sftp channel and, if applicable, file that

-            stdout/err from the machine will be saved to.

- 

-            :raise TaskotronRemoteError: when the connection does not succeed'''

- 

-         self.ssh = paramiko.SSHClient()

-         # accept unknown hosts

-         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

- 

-         log.debug('Connecting to remote host: %s@%s:%s', self.username, self.hostname, self.port)

- 

-         try:

-             self.ssh.connect(self.hostname,

-                              port=self.port,

-                              username=self.username,

-                              # we use empty string by default (no special key), but paramiko

-                              # requires receiving None in that case

-                              key_filename=self.key_filename or None)

- 

-             self.sftp = self.ssh.open_sftp()

-             self.sftp.get_channel().settimeout(self.TIMEOUT)

- 

-             if self.stdio_filename is not None:

-                 try:

-                     f = open(self.stdio_filename, 'w')

-                     self.outstream.add(f)

-                 except IOError, e:

-                     log.warning('Could not open %s. Falling back to writing vm\'s output '

-                                 'to stdout only.', self.stdio_filename)

-         except paramiko.BadHostKeyException, e:

-             raise exc.TaskotronRemoteError('Server\'s (%s@%s:%s) hostkey could not be verified: %s'

-                                            % (self.username, self.hostname, self.port, str(e)))

-         except paramiko.AuthenticationException, e:

-             raise exc.TaskotronRemoteError('Authentication to %s@%s:%s failed: %s' %

-                                            (self.username, self.hostname, self.port, str(e)))

-         except (paramiko.SSHException, socket.error), e:

-             raise exc.TaskotronRemoteError('Could not connect to %s@%s:%s: %s' %

-                                            (self.username, self.hostname, self.port, str(e)))

-         except IOError as e:

-             # let's hope this does not occur in more situations

-             raise exc.TaskotronRemoteError("The private key '%s' could not be read: %s" %

-                                            (self.key_filename, str(e)))

- 

-     def close(self):

-         '''Close open connections and files.'''

- 

-         self.outstream.close()

-         self.sftp.close()

-         self.ssh.close()

- 

-     def cmd(self, cmd, debug=True):

-         '''Execute a command.

- 

-         :param str cmd: A command to be executed. Make sure you escape it properly to prevent shell

-                         expansion, in case it is not desired.

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :returns: returncode of the command

-         :raise TaskotronRemoteProcessError: If the command has non-zero return code and it isn't a

-                                             code of the exitcode directive.

-         :raise TaskotronRemoteTimeoutError: If the remote hasn't sent any output for

-                                             :attr:`TIMEOUT`.

-         '''

- 

-         if debug:

-             log.debug('Running command on remote host: %s', cmd)

-             # write the executed command also to stdio log, so that it's clear what's happening

-             # when reading it

-             self.outstream.write('$ %s\n' % cmd)

- 

-         # bufsize=1 means line buffering, which should improve the chance of receiving complete

-         # lines. get_pty=True allocates a pseudo-terminal, which is needed to have stdout and

-         # stderr interlaced properly (in the exact chronological order)

-         stdin, stdout, _ = self.ssh.exec_command(cmd, timeout=self.TIMEOUT, bufsize=1,

-                                                  get_pty=True)

- 

-         # stdout.channel represents channel for both stdout and stderr

-         channel = stdout.channel

- 

-         # there seems to be no way to tell whether the remote is alive,

-         # so just use the counter

-         # https://phab.qa.fedoraproject.org/T593

-         alive_counter = 0

-         while not channel.exit_status_ready():

-             if channel.recv_ready():

-                 data = channel.recv(65536)

-                 self.outstream.write(data)

-                 alive_counter = 0

- 

-             time.sleep(0.1)

-             alive_counter += 0.1

- 

-             if alive_counter >= self.TIMEOUT:

-                 raise exc.TaskotronRemoteTimeoutError(

-                     'No output received from machine %s@%s:%s for %d seconds while running: %s' %

-                     (self.username, self.hostname, self.port, self.TIMEOUT, cmd))

- 

-         retcode = channel.recv_exit_status()

-         # reprint the rest of data that was left in the buffer

-         self.outstream.write(stdout.read())

- 

-         if retcode != 0 and retcode != exitcode_directive.FAILURE:

-             raise exc.TaskotronRemoteProcessError('Command "%s" on %s@%s exited with code %s' %

-                                                   (cmd, self.username, self.hostname, retcode))

-         return retcode

- 

-     def install_pkgs(self, pkgs):

-         '''Install packages via dnf.

- 

-         First tries to install packages using ``dnf --cacheonly --best``. Running from cache

-         improves performance, while ``--best`` ensures updating to the latest version. If this

-         doesn't work for some reason, we try again with refreshed cache, and if that still

-         doesn't work, we drop ``--best`` to allow for broken deps of latest packages, if there

-         are older packages with working deps.

- 

-         :param list pkgs: A list of packages to be installed (supports any argument that

-                           ``dnf install`` accepts)

-         :raise TaskotronRemoteError: If the command has non-zero return code or times out (see

-                                      :meth:`cmd`).

-         '''

- 

-         pkgs_esc = [pipes.quote(pkg) for pkg in pkgs]

-         log.info('Installing %d packages on remote host...', len(pkgs))

-         try:

-             self.cmd('dnf --assumeyes --cacheonly --best install %s' % ' '.join(pkgs_esc))

-         except exc.TaskotronRemoteProcessError as e:

-             log.debug('Installation failed: %s', e)

-             log.debug('Trying again with forced metadata refresh...')

-             try:

-                 self.cmd('dnf --assumeyes --refresh --best install %s' % ' '.join(pkgs_esc))

-             except exc.TaskotronRemoteError as e:

-                 log.debug('Installation failed: %s', e)

-                 log.debug('Trying again without --best...')

-                 self.cmd('dnf --assumeyes install %s' % ' '.join(pkgs_esc))

- 

-     def write_file(self, remote_path, data, overwrite=True, debug=True):

-         '''Write data to a remote file.

- 

-         :param str remote_path: A path to the remote file

-         :param str data: Data to be written

-         :param bool overwrite: Whether to overwrite remote path. Default is True.

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :raise TaskotronRemoteError: If data could not be written

-         '''

- 

-         if debug:

-             log.debug('Writing data to %s@%s:%s ...', self.username, self.hostname, remote_path)

- 

-         # keep the conditionals in this order to avoid unnecessary remote calls

-         if not overwrite and self._remote_file_exists(remote_path):

-             log.info('Remote path %s already exists, not overwriting.', remote_path)

-             return

- 

-         try:

-             with self.sftp.open(remote_path, 'w') as remote_file:

-                 remote_file.write(data)

-         except (socket.error, IOError) as e:

-             raise exc.TaskotronRemoteError('Could not write data to %s@%s:%s: %s' %

-                                            (self.username, self.hostname, remote_path, e))

- 

-     def put_file(self, local_path, remote_path, overwrite=True, debug=True):

-         '''Copy a file to a remote path. File permissions are not preserved.

- 

-         :param str local_path: A path to the local file

-         :param str remote_path: A path to the remote file

-         :param bool overwrite: Whether to overwrite remote path. Default is True.

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :return: :class:`paramiko.SFTPAttributes` object containing attributes about the given

-                  file, if successful. ``None`` otherwise.

-         :raise TaskotronRemoteError: If the file could not be copied

-         '''

-         if debug:

-             log.debug('Copying %s to %s@%s:%s ...', local_path, self.username, self.hostname,

-                       remote_path)

- 

-         # keep the conditionals in this order to avoid unnecessary remote calls

-         if not overwrite and self._remote_file_exists(remote_path):

-             if debug:

-                 log.debug('Remote path %s already exists, not overwriting.', remote_path)

-             return

- 

-         try:

-             return self.sftp.put(local_path, remote_path)

-         except (socket.error, IOError) as e:

-             raise exc.TaskotronRemoteError('Could not put file %s to %s@%s:%s: %s' %

-                                            (local_path, self.username, self.hostname,

-                                             remote_path, e))

- 

-     def get_file(self, remote_path, local_path, debug=True):

-         '''Get a file from a remote path. File permissions are not preserved.

- 

-         :param str remote_path: A path to the remote file

-         :param str local_path: A path to the local file

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :raise TaskotronRemoteError: If the file could not be downloaded

-         '''

- 

-         if debug:

-             log.debug('Copying %s@%s:%s to %s ...', self.username, self.hostname, remote_path,

-                       local_path)

- 

-         try:

-             self.sftp.get(remote_path, local_path)

-         except (socket.error, IOError) as e:

-             raise exc.TaskotronRemoteError('Could not copy file %s@%s:%s to %s: %s' %

-                                            (self.username, self.hostname, remote_path,

-                                             local_path, e))

- 

-     def _remote_file_exists(self, remote_path):

-         try:

-             path_stats = self.sftp.lstat(remote_path)

-             if path_stats is not None:

-                 return True

-         except IOError:

-             pass

- 

-         return False

- 

-     def _remote_isdir(self, remote_path):

-         '''Return ``True`` if ``remote_path`` exists and is a directory, otherwise ``False`` (so

-         e.g. even if it exists, but is a file).'''

-         try:

-             return stat.S_ISDIR(self.sftp.lstat(remote_path).st_mode)

-         except IOError:

-             # when the dir doesn't exist: IOError: [Errno 2] No such file

-             return False

- 

-     def put_dir(self, local_path, remote_path, overwrite=True, debug=True):

-         '''Copy a directory to a remote path. This method creates a tarball from contents of

-         ``local_path``, copies the tarball to remote machine and extracts it to ``remote_path``.

-         File permissions and symlinks are preserved.

- 

-         :param str remote_path: A path to the remote directory. This directory will be created, if

-                                 its parent exists. Otherwise you need to create the full tree

-                                 structure manually beforehand.

-         :param str local_path: A path to the local directory

-         :param bool overwrite: Whether to overwrite remote path (merge local dir with the remote

-                                dir). If you choose to not overwrite and ``remote_path`` exists,

-                                this method with just immediately return.

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :raise TaskotronRemoteError: If the directory could not be copied

-         '''

-         if debug:

-             log.debug('Copying %s to %s@%s:%s ...', local_path, self.username,

-                       self.hostname, remote_path)

- 

-         # keep the conditionals in this order to avoid unnecessary remote calls

-         if not overwrite and self._remote_file_exists(remote_path):

-             if debug:

-                 log.debug('Remote path %s already exists, not overwriting.', remote_path)

-             return

- 

-         try:

-             if not self._remote_isdir(remote_path):

-                 self.sftp.mkdir(remote_path)

- 

-             files_to_copy = os.listdir(local_path)

-             tar_local_path = tempfile.mktemp()

-             tar_remote_path = os.path.join(remote_path, 'tarred_files.tar')

- 

-             with tarfile.open(tar_local_path, 'w') as tar:

-                 for filename in files_to_copy:

-                     tar.add(os.path.join(local_path, filename), arcname=filename)

- 

-             self.put_file(tar_local_path, tar_remote_path, debug=False)

-             self.cmd('tar xf %s -C %s' % (pipes.quote(tar_remote_path), pipes.quote(remote_path)),

-                      debug=False)

- 

-             os.remove(tar_local_path)

-             self.cmd('rm -f %s' % pipes.quote(tar_remote_path), debug=False)

- 

-         except (OSError, IOError) as e:

-             log.exception(e)

-             raise exc.TaskotronRemoteError('Could not copy dir %s to %s@%s:%s: %s' %

-                                            (local_path, self.username, self.hostname,

-                                             remote_path, e))

- 

-     def get_dir(self, remote_path, local_path, debug=True):

-         '''Get a directory from a remote path. File permissions are not preserved.

- 

-         :param str remote_path: A path to the remote directory

-         :param str local_path: A path to the local directory. This directory will be created, if

-                                its parent exists. Otherwise you need to create the full tree

-                                structure manually beforehand.

-         :param bool debug: Whether print out debugging log messages about what's happening.

-         :raise TaskotronRemoteError: If the directory could not be downloaded

-         '''

- 

-         if debug:

-             log.debug('Recursively copying %s@%s:%s to %s ...', self.username, self.hostname,

-                       remote_path, local_path)

- 

-         try:

-             if not os.path.isdir(local_path):

-                 os.mkdir(local_path)

- 

-             fileattrs = self.sftp.listdir_attr(remote_path)

- 

-             for fileattr in fileattrs:

-                 remote_file = os.path.join(remote_path, fileattr.filename)

-                 local_file = os.path.join(local_path, fileattr.filename)

-                 if stat.S_ISDIR(fileattr.st_mode):

-                     self.get_dir(remote_file, local_file, debug=False)

-                 else:

-                     self.get_file(remote_file, local_file, debug=False)

-         except (OSError, IOError) as e:

-             raise exc.TaskotronRemoteError('Could not copy dir %s@%s:%s to %s: %s' %

-                                            (self.username, self.hostname, remote_path,

-                                             local_path, e))

@@ -1,246 +0,0 @@ 

- # -*- coding: utf-8 -*-

- # Copyright 2009-2014, Red Hat, Inc.

- # License: GPL-2.0+ <http://spdx.org/licenses/GPL-2.0+>

- # See the LICENSE file for more details on Licensing

- 

- '''Methods for operating with a task formula'''

- 

- from __future__ import absolute_import

- import collections

- import string

- import re

- 

- from libtaskotron import exceptions as exc

- from libtaskotron.logger import log

- from libtaskotron.ext.fedora import rpm_utils

- 

- 

- class DotTemplate(string.Template):

-     """Redefines the :class:`string.Template`'s ``idpattern`` property to

-     allow using ``.`` in the variable names. This is then later used for

-     key/property access in variable replacement.

-     """

-     idpattern = '[_a-z](?:\.?[_a-z0-9]+)*'

- 

- 

- def _get_key_or_attribute(variables, var_name):

-     """Allows for the ``.`` selector to work as selector for variables.

-     When a variable is dict-like (collections.MutableMapping), the selector looks for a key.

-     When a variable is not dict-like, it looks for the object's attribute.

- 

-     :param dict variables: names (keys) and values of available variables

-     :param str var_name: variable name, possibly containing ``.`` selector to be found

-     :raise KeyError: when a dict-like object (anywhere in the chain) does not contain requested key

-     :raise AttributeError: when an non-dic object (anywhere in the chain)

-                         does not have the requested attribute

-     """

-     if '.' not in var_name:

-         return variables[var_name]

- 

-     var_name, selector = var_name.split('.', 1)

-     outcome = variables[var_name]

-     for part in selector.split('.'):

-         if isinstance(outcome, collections.MutableMapping):

-             outcome = outcome[part]

-         else:

-             outcome = outcome.__getattribute__(part)

-     return outcome

- 

- def _replace_vars(text, variables):

-     '''Go through ``text`` and replace all variables (in the form of what is

-     supported by :class:`VariableTemplate`) with their values, as provided in

-     ``variables``. This is used for variable expansion in the task formula.

- 

-     :param str text: input text where to search for variables

-     :param dict variables: names (keys) and values of variables to replace

-     :return: if ``text`` contains just a single variable and nothing else, the

-              variable value is directly returned (i.e. with matching type, not

-              cast to ``str``). If ``text`` contains something else as well

-              (other variables or text), a string is returned.

-     :raise TaskotronYamlError: if ``text`` contains a variable that is not

-                                present in ``variables``, or if the variable

-                                syntax is incorrect

-     '''

- 

-     try:

-         # try to find the first match

-         match = DotTemplate.pattern.search(text)

- 

-         if not match:

-             return text

- 

-         # There are 4 groups in the pattern: 1 - escaped, 2 - named, 3 - braced,

-         # 4 - invalid. Group 0 returns the whole match.

-         if match.group(0) == text and (match.group(2) or match.group(3)):

-             # We found a single variable and nothing more. We shouldn't return

-             # a string, but the exact value, so that we don't lose value type.

-             # This makes it possible to pass lists, dicts, etc as variables.

-             var_name = match.group(2) or match.group(3)

-             return _get_key_or_attribute(variables, var_name)

- 

-         # Now it's clear there's also something else in `text` than just a

-         # single variable. We will replace all variables and return a string

-         # again.

- 

-         # To make things easy, we'll create expanded_variables dict, that contains values for

-         # all the replacable variables in the text.

-         # i.e. if variables = {"n": 1}, and text is "foo bar {$n.imag}" then

-         # "n.imag" key is added to variables, with the respective value.

- 

-         expanded_variables = {}

-         for match in DotTemplate.pattern.findall(text):

-             var_name = match[1] or match[2]

-             if var_name:

-                 expanded_variables[var_name] = _get_key_or_attribute(variables, var_name)

- 

-         output = DotTemplate(text).substitute(expanded_variables)

-         return output

- 

-     except (KeyError, AttributeError) as e:

-         raise exc.TaskotronYamlError("The task formula includes a variable, "

-             "but no value has been provided for it: %s" % e)

- 

-     except ValueError as e:

-         raise exc.TaskotronYamlError("The task formula includes an incorrect "

-             "variable definition. Dollar signs must be doubled if they "

-             "shouldn't be considered as a variable denotation.\n"

-             "Error: %s\n"

-             "Text: %s" % (e, text))

- 

- 

- def replace_vars_in_action(action, variables):

-     '''Replace variables (${var} or ${var.key}) with their values in an action (YAML dictionary).

- 

-     Only basestring values are searched for variables syntax, which means variables can either be

-     inside leaves (in a tree sense, meaning they can't be traversed further and are of a primitive

-     type), or inside key names in a dictionary.

- 

-     :param dict action: An action specification parsed from the task formula. See

-                         :meth:`.Runner.do_actions` to see what an action looks like.

-     :param dict variables: names (keys) and values of variables to replace

-     :raise TaskotronYamlError: if ``text`` contains a variable that is not present in

-                                ``variables``; or if the variable syntax is incorrect; of if you try

-                                to replace a dictionary key with something else than a string

-     '''

- 

-     visited = []  # all visited nodes in a tree

-     stack = [action]  # nodes waiting for inspection

- 

-     while stack:

-         vertex = stack.pop()

- 

-         if vertex in visited:

-             continue

- 

-         visited.append(vertex)

-         children = []  # list of tuples (index/key, child_value)

- 

-         if isinstance(vertex, collections.MutableMapping):

-             children = vertex.items()  # list of (key, value)

-         elif isinstance(vertex, collections.MutableSequence):

-             children = list(enumerate(vertex))  # list of (index, value)

-         else:

-             log.warn("Unknown structure '%s' in YAML file, this shouldn't happen: %s",

-                      type(vertex), vertex)

- 

-         for index, child_val in children:

-             if isinstance(child_val, basestring):

-                 # leaf node and a string, replace variables

-                 vertex[index] = _replace_vars(child_val, variables)

-             elif isinstance(child_val, collections.Iterable):

-                 # traversable further down, mark for visit

-                 stack.append(child_val)

- 

-             # for dicts, also replace vars in key names

-             if isinstance(index, basestring):  # meaning it's a dictionary

-                 new_key = _replace_vars(index, variables)

-                 if not isinstance(new_key, basestring):

-                     raise exc.TaskotronYamlError(

-                         "Dictionary key '%s' can't be replaced with value %r, because it's not a "

-                         "string, but %s" % (index, new_key, type(new_key)))

-                 # only swap keys if the new key is actually different from the old one

-                 if new_key != index:

-                     vertex[new_key] = vertex[index]

-                     del vertex[index]

- 

- 

- def devise_environment(formula, arg_data):

-     '''Takes a parsed formula, and returns a required run-environment (i.e.

-     distro, arch, fedora release, and base-image flavor), based on item and type.

- 

-     :param dict formula: parsed formula file (or dict with equivalent structure)

-     :param dict arg_data: parsed command-line arguments. item, type and arch

-                           are used in this method

-     :return: dict containing release, flavor, arch. Each either set, or None

-     :raise TaskotronValueError: when environment can't be parsed from the formula

-     '''

- 

-     environment = formula.get('environment', {})

-     env = {'distro': None, 'release': None, 'flavor': None, 'arch': None}

- 

-     if not isinstance(environment, dict):

-         raise exc.TaskotronValueError(

-             "Environment must be dictionary, not %s" % type(environment))

- 

-     env['distro'] = environment.get('distro', None)

-     env['release'] = environment.get('release', None)

-     env['flavor'] = environment.get('flavor', None)

-     env['arch'] = environment.get('arch', None)

- 

-     item = arg_data.get('item', None)

-     item_type = arg_data.get('type', None)

- 

-     if not env['distro']:

-         if item_type == 'koji_build':

-             # FIXME: find a way to make this not Fedora-specific

-             # For `xchat-2.8.8-21.fc20` disttag is `fc20` for example

-             try:

-                 distro = rpm_utils.get_dist_tag(item)[:2]

-                 env['distro'] = {'fc': 'fedora'}.get(distro, None)

-             except exc.TaskotronValueError:

-                 env['distro'] = None

- 

-         elif item_type == 'koji_tag':

-             if re.match(r'^f[0-9]{2}-.*', item):

-                 env['distro'] = 'fedora'

- 

-         if env['distro']:

-             log.debug("Environment/distro overriden with ENVVAR from %r:%r to %r" %

-                       (item_type, item, env['distro']))

-         else:

-             log.debug("Environment/distro can not be inferred from %r:%r. Using default." %

-                       (item_type, item))

- 

-     if not env['release']:

-         if item_type == 'koji_build':

-             # FIXME: find a way to make this not Fedora-specific

-             # Last two characters in rpm's disttag are the Fedora release.

-             # For `xchat-2.8.8-21.fc20` disttag is `fc20` for example

-             try:

-                 env['release'] = rpm_utils.get_dist_tag(item)[-2:]

-             except exc.TaskotronValueError:

-                 env['release'] = None

- 

-         elif item_type == 'koji_tag':

-             if re.match(r'^f[0-9]{2}-.*', item):

-                 env['release'] = item[1:3]

- 

-         if env['release']:

-             log.debug("Environment/release inferred from %r:%r to %r" %

-                       (item_type, item, env['release']))

-         else:

-             log.debug("Environment/release can not be inferred from %r:%r. Using default." %

-                       (item_type, item))

- 

-     if not env['flavor']:

-         log.debug("Environment/flavor not specified. Using default")

- 

-     if not env['arch']:

-         arch = arg_data.get('arch')

-         if not arch or arch == 'noarch':

-             log.warn("Environment/arch can not be inferred from %r. Using default." % arch)

-         else:

-             env['arch'] = arch

-             log.debug("Environment/arch inferred from %r to %r" % (arch, env['arch']))

- 

-     return env

file modified
-1
@@ -21,7 +21,6 @@ 

  munch >= 2.0.2

  

  # libtaskotron-disposable

- paramiko >= 1.15.1

  testcloud >= 0.1.10

  

  # Test suite requirements

This patch builds off of Martin's ansible tasks commit (rLTRN2b720ca61cb8).
I've added a Dockerfile in the 'libtaskotron/ext/' directory as
well as a module to 'libtaskotron/ext/disposable' for dealing with
Docker containers.

This expects a docker daemon to be running on the host (so is akin
to --libvirt), and will build the 'taskotron-worker' image if it's
not already found on the system.

Testing this is very similar to testing the patch Martin wrote,
except you use the '--docker' flag during invocation.

To test:
$ git clone https://github.com/stefwalter/gzip-dist-git
$ sudo runtask -d -i gzip -t koji_build -a x86_64 --docker gzip-dist-git/tests/test_rpm.yml

Read the whole conversation at https://fedorapeople.org/groups/qa/phabarchive/differentials/phab.qa.fedoraproject.org/D1196.html

There's an updated version of this in PR #393 from @lnie .

Pull-Request has been closed by jskladan

5 years ago