#3631 All scheduler work
Closed 9 months ago by tkopecek. Opened a year ago by tkopecek.
tkopecek/koji scheduler-all  into  master

file modified
+225
@@ -22,6 +22,12 @@ 

  import six.moves.xmlrpc_client

  from six.moves import filter, map, range, zip

  

+ from rich.console import Console

+ from rich.style import Style

+ from rich.table import Table, Column

+ from rich.theme import Theme

+ from rich import print

+ 

  import koji

  from koji.util import base64encode, md5_constructor, to_list

  from koji_cli.lib import (
@@ -7742,3 +7748,222 @@ 

          print("Number of tasks: %d" % tasks.result)

          print("Number of builds: %d" % builds.result)

          print('')

+ 

+ 

+ def anon_handle_scheduler_info(goptions, session, args):

+     """[monitor] Show information about scheduling"""

+     usage = "usage: %prog schedulerinfo [options]"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("-t", "--task", action="store", type=int, default=None,

+                       help="Limit data to given task id")

+     parser.add_option("--host", action="store", default=None,

+                       help="Limit data to given builder (name/id)")

+     parser.add_option("--state", action="store", type='str', default=None,

+                       help="Limit data to task state")

+     parser.add_option("-c", "--children", action="store_true", default=False,

+                       help="If --task is given, also display its children")

+     (options, args) = parser.parse_args(args)

+     if len(args) > 0:

+         parser.error("This command takes no arguments")

+ 

+     ensure_connection(session, goptions)

+ 

+     host_id = None

+     if options.host:

+         try:

+             host_id = int(options.host)

+         except ValueError:

+             host_id = session.getHost(options.host, strict=True)['id']

+ 

+     if options.state:

+         states = [koji.TASK_STATES[options.state]]

+     else:

+         states = None

+ 

+     if options.children:

+         if not options.task:

+             parser.error("--children makes sense only with --task")

+         tasks = sorted([int(x) for x in session.getTaskDescendents(options.task).keys()])

+     else:

+         tasks = [options.task]

+ 

+     # get the data

+     result = []

+     with session.multicall() as m:

+         for task in tasks:

+             result.append(m.scheduler.getTaskRuns(taskID=task, hostID=host_id, states=states))

+     runs = itertools.chain(*[x.result for x in result])

+     '''

+     mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s'

+     if not goptions.quiet:

+         header = mask % {

+             'task_id': 'Task',

+             'host_id': 'Host',

+             'state': 'State',

+             'create_time': 'Created',

+             'start_time': 'Started',

+             'end_time': 'Ended'

+         }

+         print(header)

+         print('-' * len(header))

+     for run in runs:

+         run['state'] = koji.TASK_STATES[run['state']]

+         print(mask % run)

+     '''

+ 

+     theme = Theme({

+         'free': Style(color="#00030c"),

+         'scheduled': Style(color="#00030c"),

+         'open': Style(color="#ff8c00"),

+         'closed': Style(color="#008000"),

+         'assigned': Style(color="#000c0f"),

+         'canceled': Style(color="#000c90"),

+         'failed': Style(color="#ff0000"),

+     })

+ 

+     table = Table(

+         Column("Task", justify="right"),

+         "Host",

+         "State",

+         "Created",

+         "Started",

+         "Ended",

+         "Waited",

+     )

+     for run in runs:

+         run['state'] = koji.TASK_STATES[run['state']]

+         if run['start_ts']:

+             diff = run['start_ts'] - run['create_ts']

+             s = diff % 60

+             diff = (diff - s) / 60

+             m = diff % 60

+             h = (diff - s) / 60

+             run['waited'] = '%02d:%02d:%02d' % (h, m, s)

+         else:

+             run['waited'] = ''

+         for ts in ('create_ts', 'start_ts', 'end_ts'):

+             if run[ts] is not None:

+                 run[ts] = time.asctime(time.localtime(run[ts]))

+             else:

+                 run[ts] = ''

+         table.add_row(

+             str(run["task_id"]),

+             str(run["host_id"]),

+             f'[{run["state"].lower()}]{run["state"]}[/{run["state"].lower()}]',

+             run["create_ts"],

+             run["start_ts"],

+             run["end_ts"],

+             str(run["waited"]),

+         )

+     Console(theme=theme).print(table)

+     #print(datetime.fromtimestamp(run['create_time']).isoformat(' '))

+ 

+     if host_id:

+         print('Host data for %s:' % options.host)

+         host_data = session.scheduler.getHostData(hostID=host_id)

+         if len(host_data) > 0:

+             print(host_data[0]['data'])

+         else:

+             print('-')

+ 

+ 

+ def handle_scheduler_logs(goptions, session, args):

+     "[monitor] Query scheduler logs"

+     usage = "usage: %prog scheduler-logs <options>"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--task", type="int", action="store",

+                       help="Filter by task ID")

+     parser.add_option("--host", type="str", action="store",

+                       help="Filter by host (name/ID)")

+     parser.add_option("--level", type="str", action="store",

+                       help="Filter by message level")

+     parser.add_option("--from", type="float", action="store", dest="from_ts",

+                       help="Logs from given timestamp")

+     parser.add_option("--to", type="float", action="store", dest="to_ts",

+                       help="Logs until given timestamp (included)")

+     parser.add_option("--logger", type="str", action="store",

+                       help="Filter by logger name")

+     (options, args) = parser.parse_args(args)

+     if len(args) != 0:

+         parser.error("There are no arguments for this command")

+ 

+     kwargs = {}

+     if options.task:

+         kwargs['taskID'] = options.task

+     if options.host:

+         try:

+             kwargs['hostID'] = int(options.host)

+         except ValueError:

+             kwargs['hostID'] = session.getHost(options.host)['id']

+     if options.level:

+         kwargs['level'] = options.level

+     if options.from_ts:

+         kwargs['from_ts'] = options.from_ts

+     if options.to_ts:

+         kwargs['to_ts'] = options.to_ts

+     if options.logger:

+         kwargs['logger_name'] = options.logger

+ 

+     logs = session.scheduler.getLogs(**kwargs)

+ 

+     """

+     mask = ("%(task_id)s\t%(host_name)s\t%(msg_time)s\t%(logger_name)s"

+             "\t%(level)s\t%(location)s\t%(msg)s")

+     if not goptions.quiet:

+         h = mask % {

+             'task_id': 'Task',

+             'host_name': 'Host',

+             'msg_time': 'Time',

+             'logger_name': 'Logger',

+             'level': 'Level',

+             'location': 'Location',

+             'msg': 'Message',

+         }

+         print(h)

+         print('-' * len(h))

+ 

+     for log in logs:

+         print(mask % log)

+ 

+     """

+     table = Table(

+         "Time",

+         Column("Task", justify="right"),

+         "Method",

+         "Host",

+         "Logger",

+         "Level",

+         "Location",

+         "Message",

+     )

+     result = []

+     task_ids = set([log['task_id'] for log in logs if log['task_id']])

+     with session.multicall() as m:

+         for task_id in task_ids:

+             result.append(m.getTaskInfo(task_id))

+     tasks = {}

+     for task in result:

+         tinfo = task.result

+         tasks[str(tinfo['id'])] = tinfo

+ 

+     for log in logs:

+         if log['task_id']:

+             task = str(log['task_id'])

+         else:

+             task = ''

+         if log['msg_ts']:

+             msg_ts = time.asctime(time.localtime(log['msg_ts']))

+         else:

+             msg_ts = ''

+         table.add_row(

+             msg_ts,

+             task,

+             tasks.get(task, {}).get('method', ''),

+             log['host_name'],

+             log['logger_name'],

+             log['level'],

+             log['location'],

+             log['msg'],

+             style=f"logging.level.{log['level'].lower()}",

+         )

+     Console().print(table)

file modified
+37
@@ -955,4 +955,41 @@ 

  ) WITHOUT OIDS;

  

  

+ -- Scheduler tables

+ CREATE TYPE logger_level AS ENUM ('NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL');

+ CREATE TABLE scheduler_log_messages (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id),

+         host_id INTEGER REFERENCES host (id),

+         msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         logger_name VARCHAR(200) NOT NULL,

+         level logger_level NOT NULL,

+         location VARCHAR(200),

+         msg TEXT NOT NULL

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_log_messages_task_id ON scheduler_log_messages(task_id);

+ CREATE INDEX scheduler_log_messages_host_id ON scheduler_log_messages(host_id);

+ CREATE INDEX scheduler_log_messages_msg_time ON scheduler_log_messages(msg_time);

+ CREATE INDEX scheduler_log_messages_level ON scheduler_log_messages(level);

+ 

+ CREATE TABLE scheduler_task_runs (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id) NOT NULL,

+         host_id INTEGER REFERENCES host (id) NOT NULL,

+         state INTEGER NOT NULL,

+         create_time TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),

+         start_time TIMESTAMPTZ,

+         end_time TIMESTAMPTZ

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);

+ CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);

+ CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state);

+ CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);

+ 

+ CREATE TABLE scheduler_host_data (

+         host_id INTEGER REFERENCES host (id) PRIMARY KEY,

+         data JSONB

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_host_data_host ON scheduler_host_data(host_id);

+ 

  COMMIT WORK;

file modified
+2
@@ -193,6 +193,8 @@ 

      'CANCELED',

      'ASSIGNED',

      'FAILED',

+     'SCHEDULED',

+     'REFUSED',

  ))

  

  BUILD_STATES = Enum((

file modified
+34 -109
@@ -1018,129 +1018,54 @@ 

                  else:

                      self.logger.info("Lingering task %r (pid %r)" % (id, pid))

  

+     def setHostData(self):

+         """Report all runtime data to scheduler"""

+         methods = {}

+         for method in self.handlers:

+             methods[method] = self.handlers[method]._taskWeight

+         hostdata = {

+             # 'task_load': self.task_load,

+             # 'ready': self.ready,

+             'methods': methods,

+             'maxjobs': self.options.maxjobs,

+             # kernel

+             # cpu_total

+             # cpu_available (total - reserved)

+             # memory_total

+             # memory_available (total - reserved)

+         }

+         self.session.host.setHostData(hostdata)

+         self.logger.debug("Reported hostdata %s", hostdata)

+ 

      def getNextTask(self):

          self.ready = self.readyForTask()

          self.session.host.updateHost(self.task_load, self.ready)

+         self.setHostData()

          if not self.ready:

              self.logger.info("Not ready for task")

              return False

-         hosts, tasks = self.session.host.getLoadData()

-         self.logger.debug("Load Data:")

-         self.logger.debug("  hosts: %r" % hosts)

-         self.logger.debug("  tasks: %r" % tasks)

-         # now we organize this data into channel-arch bins

-         bin_hosts = {}  # hosts indexed by bin

-         bins = {}  # bins for this host

-         our_avail = None

-         for host in hosts:

-             host['bins'] = []

-             if host['id'] == self.host_id:

-                 # note: task_load reported by server might differ from what we

-                 # sent due to precision variation

-                 our_avail = host['capacity'] - host['task_load']

-             for chan in host['channels']:

-                 for arch in host['arches'].split() + ['noarch']:

-                     bin = "%s:%s" % (chan, arch)

-                     bin_hosts.setdefault(bin, []).append(host)

-                     if host['id'] == self.host_id:

-                         bins[bin] = 1

-         self.logger.debug("bins: %r" % bins)

-         if our_avail is None:

-             self.logger.info("Server did not report this host. Are we disabled?")

-             return False

-         elif not bins:

-             self.logger.info("No bins for this host. Missing channel/arch config?")

-             # Note: we may still take an assigned task below

-         # sort available capacities for each of our bins

-         avail = {}

-         for bin in bins:

-             avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]

-             avail[bin].sort()

-             avail[bin].reverse()

-         self.cleanDelayTimes()

+         scheduled = self.session.scheduler.getTaskRuns(hostID=self.hostdata['id'],

+                                                        states=[koji.TASK_STATES['SCHEDULED']])

+         with self.session.multicall() as m:

+             tasks = [m.getTaskInfo(task['task_id']) for task in scheduled]

+ 

+         taken = False

          for task in tasks:

-             # note: tasks are in priority order

-             self.logger.debug("task: %r" % task)

+             self.ready = self.readyForTask()

+             if not self.ready:

+                 self.logger.info("Not ready for task")

+                 break

+             task = task.result

+             # last check - it should be already filtered by updateHost

              if task['method'] not in self.handlers:

                  self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)

-                 continue

              if task['id'] in self.tasks:

                  # we were running this task, but it apparently has been

                  # freed or reassigned. We can't do anything with it until

                  # updateTasks notices this and cleans up.

                  self.logger.debug("Task %(id)s freed or reassigned", task)

-                 continue

-             if task['state'] == koji.TASK_STATES['ASSIGNED']:

-                 self.logger.debug("task is assigned")

-                 if self.host_id == task['host_id']:

-                     # assigned to us, we can take it regardless

-                     if self.takeTask(task):

-                         return True

-             elif task['state'] == koji.TASK_STATES['FREE']:

-                 bin = "%(channel_id)s:%(arch)s" % task

-                 self.logger.debug("task is free, bin=%r" % bin)

-                 if bin not in bins:

-                     continue

-                 # see where our available capacity is compared to other hosts for this bin

-                 # (note: the hosts in this bin are exactly those that could

-                 # accept this task)

-                 bin_avail = avail.get(bin, [0])

-                 if self.checkAvailDelay(task, bin_avail, our_avail):

-                     # decline for now and give the upper half a chance

-                     continue

-                 # otherwise, we attempt to open the task

-                 if self.takeTask(task):

-                     return True

-             else:

-                 # should not happen

-                 raise Exception("Invalid task state reported by server")

-         return False

- 

-     def checkAvailDelay(self, task, bin_avail, our_avail):

-         """Check to see if we should still delay taking a task

- 

-         Returns True if we are still in the delay period and should skip the

-         task. Otherwise False (delay has expired).

-         """

- 

-         now = time.time()

-         ts = self.skipped_tasks.get(task['id'])

-         if not ts:

-             ts = self.skipped_tasks[task['id']] = now

- 

-         # determine our normalized bin rank

-         for pos, cap in enumerate(bin_avail):

-             if our_avail >= cap:

-                 break

-         if len(bin_avail) > 1:

-             rank = float(pos) / (len(bin_avail) - 1)

-         else:

-             rank = 0.0

-         # so, 0.0 for highest available capacity, 1.0 for lowest

- 

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         delay *= rank

- 

-         # return True if we should delay

-         if now - ts < delay:

-             self.logger.debug("skipping task %i, age=%s rank=%s"

-                               % (task['id'], int(now - ts), rank))

-             return True

-         # otherwise

-         del self.skipped_tasks[task['id']]

-         return False

- 

-     def cleanDelayTimes(self):

-         """Remove old entries from skipped_tasks"""

-         now = time.time()

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         cutoff = now - delay * 10

-         # After 10x the delay, we've had plenty of opportunity to take the

-         # task, so either it has already been taken or we can't take it.

-         for task_id in list(self.skipped_tasks):

-             ts = self.skipped_tasks[task_id]

-             if ts < cutoff:

-                 del self.skipped_tasks[task_id]

+             self.takeTask(task)

+         return taken

  

      def _waitTask(self, task_id, pid=None):

          """Wait (nohang) on the task, return true if finished"""

file modified
+105 -76
@@ -92,9 +92,11 @@ 

      nextval,

      currval,

  )

+ from . import scheduler

  

  

  logger = logging.getLogger('koji.hub')

+ sched_logger = scheduler.DBLogger()

  

  

  NUMERIC_TYPES = (int, float)
@@ -286,6 +288,20 @@ 

          if state == koji.TASK_STATES['OPEN']:

              update.rawset(start_time='NOW()')

          update.execute()

+ 

+         # update last task run

+         update = UpdateProcessor(

+             table='scheduler_task_runs',

+             clauses=[

+                 'id = (SELECT MAX(id) FROM scheduler_task_runs '

+                 'WHERE task_id = %(task_id)i AND host_id = %(host_id)i)',

+             ],

+             values={'task_id': task_id, 'host_id': host_id},

+             data={'state': state},

+         )

+         if state == koji.TASK_STATES['OPEN']:

+             update.rawset(start_time='NOW()')

+         update.execute()

          self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES[newstate])

          self.runCallbacks('postTaskStateChange', info, 'host_id', host_id)

          return True
@@ -315,10 +331,12 @@ 

          else:

              return None

  

-     def free(self):

+     def free(self, newstate=koji.TASK_STATES['FREE']):

          """Free a task"""

+         if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]:

+             raise koji.GenericError("Can't be called with other than FREE/REFUSED states")

          info = self.getInfo(request=True)

-         self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])

+         self.runCallbacks('preTaskStateChange', info, 'state', newstate)

          self.runCallbacks('preTaskStateChange', info, 'host_id', None)

          # access checks should be performed by calling function

          query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'],
@@ -327,14 +345,13 @@ 

          if not oldstate:

              raise koji.GenericError("No such task: %i" % self.id)

          if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']:

-             raise koji.GenericError("Cannot free task %i, state is %s" %

+             raise koji.GenericError("Cannot free/refuse task %i, state is %s" %

                                      (self.id, koji.TASK_STATES[oldstate]))

-         newstate = koji.TASK_STATES['FREE']

          newhost = None

          update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id},

                                   data={'state': newstate, 'host_id': newhost})

          update.execute()

-         self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])

+         self.runCallbacks('postTaskStateChange', info, 'state', newstate)

          self.runCallbacks('postTaskStateChange', info, 'host_id', None)

          return True

  
@@ -387,6 +404,15 @@ 

                                   data={'result': info['result'], 'state': state},

                                   rawdata={'completion_time': 'NOW()'})

          update.execute()

+         update = UpdateProcessor(table='scheduler_task_runs',

+                                  clauses=[

+                                      'id = (SELECT MAX(id) FROM scheduler_task_runs '

+                                      'WHERE task_id = %(task_id)i AND host_id = %(host_id)i)',

+                                  ],

+                                  data={'state': state},

+                                  rawdata={'end_time': 'NOW()'},

+                                  values={'task_id': self.id, 'host_id': info['host_id']})

+         update.execute()

          self.runCallbacks('postTaskStateChange', info, 'state', state)

          self.runCallbacks('postTaskStateChange', info, 'completion_ts', now)

  
@@ -741,6 +767,7 @@ 

      opts['id'] = task_id

      koji.plugin.run_callbacks(

          'postTaskStateChange', attribute='state', old=None, new='FREE', info=opts)

+     scheduler.schedule(task_id=task_id)

      return task_id

  

  
@@ -2537,44 +2564,6 @@ 

      update.execute()

  

  

- def get_ready_hosts():

-     """Return information about hosts that are ready to build.

- 

-     Hosts set the ready flag themselves

-     Note: We ignore hosts that are late checking in (even if a host

-         is busy with tasks, it should be checking in quite often).

-     """

-     query = QueryProcessor(

-         tables=['host'],

-         columns=['host.id', 'name', 'arches', 'task_load', 'capacity'],

-         aliases=['id', 'name', 'arches', 'task_load', 'capacity'],

-         clauses=[

-             'enabled IS TRUE',

-             'ready IS TRUE',

-             'expired IS FALSE',

-             'master IS NULL',

-             'active IS TRUE',

-             "update_time > NOW() - '5 minutes'::interval"

-         ],

-         joins=[

-             'sessions USING (user_id)',

-             'host_config ON host.id = host_config.host_id'

-         ]

-     )

-     hosts = query.execute()

-     for host in hosts:

-         query = QueryProcessor(

-             tables=['host_channels'],

-             columns=['channel_id'],

-             clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],

-             joins=['channels ON host_channels.channel_id = channels.id'],

-             values=host

-         )

-         rows = query.execute()

-         host['channels'] = [row['channel_id'] for row in rows]

-     return hosts

- 

- 

  def get_all_arches():

      """Return a list of all (canonical) arches available from hosts"""

      ret = {}
@@ -2590,27 +2579,6 @@ 

      return list(ret.keys())

  

  

- def get_active_tasks(host=None):

-     """Return data on tasks that are yet to be run"""

-     fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time']

-     values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED'))

-     if host:

-         values['arches'] = host['arches'].split() + ['noarch']

-         values['channels'] = host['channels']

-         values['host_id'] = host['id']

-         clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)'

-         if values['channels']:

-             clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \

- AND channel_id IN %(channels)s)'''

-         clauses = [clause]

-     else:

-         clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)']

-     queryOpts = {'limit': 100, 'order': 'priority,create_time'}

-     query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses,

-                            values=values, opts=queryOpts)

-     return query.execute()

- 

- 

  def get_task_descendents(task, childMap=None, request=False):

      if childMap is None:

          childMap = {}
@@ -7405,7 +7373,6 @@ 

      :param str description: eg. "YAML Ain't Markup Language"

      :param str extensions: space-separated list of descriptions, eg. "yaml yml"

      """

-     print(context)

      context.session.assertPerm('admin')

      verify_name_internal(name)

      convert_value(description, cast=str, check_only=True)
@@ -14093,18 +14060,25 @@ 

      def getLoadData(self):

          """Get load balancing data

  

-         This data is relatively small and the necessary load analysis is

-         relatively complex, so we let the host machines crunch it."""

-         hosts = get_ready_hosts()

-         for host in hosts:

-             if host['id'] == self.id:

-                 break

+         Compatibility version for older builders.

+         """

+         host = get_host(self.id)

+         query = QueryProcessor(tables=['host_channels'], columns=['channel_id'],

+                                clauses=['host_id = %(id)s', 'active IS TRUE'],

+                                values={'id': self.id},

+                                opts={'asList': True})

+         host['channels'] = [x[0] for x in query.execute()]

+         task_runs = scheduler.get_task_runs(hostID=self.id, states=[koji.TASK_STATES['ASSIGNED'],

+                                                                     koji.TASK_STATES['SCHEDULED']])

+         if task_runs:

+             query = QueryProcessor(tables=['task'], clauses=['id = %(id)s'],

+                                    columns=['id', 'state', 'channel_id', 'host_id', 'arch',

+                                             'method', 'priority', 'create_time'],

+                                    values={'id': task_runs[0]['task_id']})

+             tasks = query.execute()

          else:

-             # this host not in ready list

-             return [[], []]

-         # host is the host making the call

-         tasks = get_active_tasks(host)

-         return [hosts, tasks]

+             tasks = []

+         return [[host], tasks]

  

      def getTask(self):

          """Open next available task and return it"""
@@ -14170,6 +14144,7 @@ 

          host = Host()

          host.verify()

          host.updateHost(task_load, ready)

+         scheduler.schedule()

  

      def getLoadData(self):

          host = Host()
@@ -14222,6 +14197,60 @@ 

          task.assertHost(host.id)

          return task.setWeight(weight)

  

+     def setHostData(self, hostdata):

+         """Builder will update all its resources

+ 

+         Initial implementation contains:

+           - available task methods

+           - maxjobs

+           - host readiness

+ 

+         obsoletes updateHost

+         """

+         host = Host()

+         host.verify()

+         if 'task_load' in hostdata and 'ready' in hostdata:

+             self.updateHost(hostdata['task_load'], hostdata['ready'])

+             # TODO: maybe obsolete it completely and put this data into scheduler_host_data

+         hostdata = hostdata

+         clauses = ['host_id = %(host_id)i']

+         values = {'host_id': host.id}

+         table = 'scheduler_host_data'

+         query = QueryProcessor(tables=[table], clauses=clauses, values=values,

+                                opts={'countOnly': True})

+         if query.singleValue() > 0:

+             update = UpdateProcessor(table=table, clauses=clauses, values=values,

+                                      data={'data': json.dumps(hostdata)})

+             update.execute()

+         else:

+             insert = InsertProcessor(table=table,

+                                      data={'host_id': host.id, 'data': json.dumps(hostdata)})

+             insert.execute()

+         sched_logger.debug(f"Updating host data with: {hostdata}",

+                            host_id=host.id, location='setHostData')

+ 

+     def getTasks(self):

+         host = Host()

+         host.verify()

+ 

+         tasks = scheduler.get_task_runs(hostID=host.id,

+                                         states=[koji.TASK_STATES['ASSIGNED'],

+                                                 koji.TASK_STATES['SCHEDULED']])

+         for task in tasks:

+             sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'],

+                                location="getTasks")

+         return tasks

+ 

+     def refuseTask(self, task_id):

+         host = Host()

+         host.verify()

+ 

+         task = Task(task_id)

+         task.free(newstate=koji.TASK_STATES['REFUSED'])

+         sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id,

+                              location="refuseTask")

+         return True

+ 

      def getHostTasks(self):

          host = Host()

          host.verify()

file modified
+5
@@ -42,6 +42,8 @@ 

  from koji.server import ServerError, BadRequest, RequestTimeout

  from koji.xmlrpcplus import ExtendedMarshaller, Fault, dumps, getparser

  

+ from . import scheduler

+ 

  

  class Marshaller(ExtendedMarshaller):

  
@@ -473,6 +475,7 @@ 

          ['LogFormat', 'string',

           '%(asctime)s [%(levelname)s] m=%(method)s u=%(user_name)s p=%(process)s r=%(remoteaddr)s '

           '%(name)s: %(message)s'],

+         ['SchedulerLogLevel', 'string', 'WARNING'],

  

          ['MissingPolicyOk', 'boolean', True],

          ['EnableMaven', 'boolean', False],
@@ -845,8 +848,10 @@ 

      registry = HandlerRegistry()

      functions = kojihub.RootExports()

      hostFunctions = kojihub.HostExports()

+     schedulerFunctions = scheduler.SchedulerExports()

      registry.register_instance(functions)

      registry.register_module(hostFunctions, "host")

+     registry.register_module(schedulerFunctions, "scheduler")

      registry.register_function(koji.auth.login)

      registry.register_function(koji.auth.sslLogin)

      registry.register_function(koji.auth.logout)

file added
+436
@@ -0,0 +1,436 @@ 

+ import functools

+ import inspect

+ import logging

+ 

+ import koji

+ from koji.context import context

+ from koji.db import (

+     BulkInsertProcessor,

+     DeleteProcessor,

+     InsertProcessor,

+     QueryProcessor,

+     UpdateProcessor,

+ )

+ 

+ logger = logging.getLogger('koji.scheduler')

+ 

+ 

+ class HostHashTable(object):

+     """multiindexed host table for fast filtering"""

+     def __init__(self, hosts=None):

+         self.arches = {}

+         self.channels = {}

+         self.methods = {}

+         self.hosts = {}

+         self.host_ids = set()

+         self.old_hosts = set()

+         if hosts is None:

+             hosts = get_ready_hosts()

+         for hostinfo in hosts:

+             self.add_host(hostinfo)

+ 

+     def add_host(self, hostinfo):

+         host_id = hostinfo['id']

+         # priority is based on available capacity

+         hostinfo['priority'] = hostinfo['capacity'] - hostinfo['task_load']

+         # but builders running zero tasks should be always better fit

+         if hostinfo['task_load'] == 0:

+             # TODO: better heuristic?

+             hostinfo['priority'] += 100

+         # TODO: one query for all hosts

+         # TODO: add only hosts which checked in in last X minutes

+         query = QueryProcessor(tables=['task'], clauses=['host_id = %(host_id)i'],

+                                values={'host_id': host_id}, opts={'countOnly': True})

+         hostinfo['tasks'] = query.executeOne()

+ 

+         self.hosts[host_id] = hostinfo

+         self.host_ids.add(host_id)

+         for arch in hostinfo['arches']:

+             self.arches.setdefault(arch, set()).add(host_id)

+         for channel in hostinfo['channels']:

+             self.channels.setdefault(channel, set()).add(host_id)

+         if not hostinfo['data']:

+             self.old_hosts.add(host_id)

+         else:

+             for method in hostinfo['data']['methods']:

+                 self.methods.setdefault(method, set()).add(host_id)

+ 

+         # know about refused tasks

+         query = QueryProcessor(tables=['scheduler_task_runs'], columns=['task_id'],

+                                clauses=['host_id = %(host_id)i', 'state = %(state)i'],

+                                values={'host_id': host_id, 'state': koji.TASK_STATES['REFUSED']},

+                                opts={'asList': True})

+         self.hosts[host_id]['refused_tasks'] = set([x[0] for x in query.execute()])

+ 

+     def get(self, task):

+         # filter by requirements

+         host_ids = set(self.host_ids)

+         # assigned task

+         if task['host_id']:

+             host_ids &= {task['host_id']}

+         # filter by architecture

+         if task.get('arch') is not None:

+             host_ids &= self.arches.get(task['arch'], set())

+         # filter by method (unknown for old builders)

+         host_ids &= self.methods.get(task['method'], set()) | self.old_hosts

+         # filter by channel

+         if task.get('channel_id') is not None:

+             host_ids &= self.channels.get(task['channel_id'], set())

+ 

+         # select best from filtered and remove hosts which already refused this task

+         hosts = []

+         # for old builder just heurstic of 1.5

+         task_weight = 1.5

+         for host_id in host_ids:

+             hostinfo = self.hosts[host_id]

+             if task['id'] in hostinfo['refused_tasks']:

+                 dblogger.debug("Task already refused", task_id=task['id'], host_id=host_id)

+                 continue

+             if host_id not in self.old_hosts:

+                 task_weight = hostinfo['data']['methods'][task['method']]

+             if task_weight > hostinfo['capacity'] - hostinfo['task_load']:

+                 dblogger.debug(

+                     f"Higher weight {task_weight} than available capacity {hostinfo['capacity']}",

+                     task_id=task['id'], host_id=host_id)

+                 continue

+             if host_id not in self.old_hosts and hostinfo['data']['maxjobs'] < 1:

+                 dblogger.debug("Host has no free job slot", task_id=task['id'], host_id=host_id)

+                 continue

+             hosts.append(hostinfo)

+ 

+         hosts = sorted(hosts, key=lambda x: -x['priority'])

+         if not hosts:

+             return None

+ 

+         host = hosts[0]

+         # reduce resources (reserved memory, cpus)

+         host['task_load'] += task_weight

+         if host['id'] not in self.old_hosts:

+             host['data']['maxjobs'] -= 1

+         return host

+ 

+ 

+ def drop_from_queue(task_id):

+     """Delete scheduled run without checking its existence"""

+     delete = DeleteProcessor(

+         table='scheduler_task_runs',

+         clauses=['task_id = %(task_id)i'],

+         values={'task_id': task_id},

+     )

+     delete.execute()

+ 

+ 

+ def get_host_data(hostID=None):

+     """Return actual builder data

+ 

+     :param int hostID: Return data for given host (otherwise for all)

+     :returns list[dict]: list of host_id/data dicts

+     """

+     clauses = []

+     columns = ['host_id', 'data']

+     if hostID is not None:

+         clauses.append('host_id = %(hostID)i')

+     query = QueryProcessor(

+         tables=['scheduler_host_data'],

+         clauses=clauses,

+         columns=columns,

+         values=locals(),

+         opts={'order': 'host_id'}

+     )

+ 

+     return query.execute()

+ 

+ 

+ def get_task_runs(taskID=None, hostID=None, states=None):

+     """Return content of scheduler queue

+ 

+     :param int taskID: filter by task

+     :param int hostID: filter by host

+     :param list[int] states: filter by states

+     :returns list[dict]: list of dicts

+     """

+ 

+     fields = (

+         ('id', 'id'),

+         ('task_id', 'task_id'),

+         ('host_id', 'host_id'),

+         ('state', 'state'),

+         ('create_time', 'create_time'),

+         ("date_part('epoch', create_time)", 'create_ts'),

+         ('start_time', 'start_time'),

+         ("date_part('epoch', start_time)", 'start_ts'),

+         ('end_time', 'end_time'),

+         ("date_part('epoch', end_time)", 'end_ts'),

+     )

+     columns, aliases = zip(*fields)

+     clauses = []

+     if taskID is not None:

+         clauses.append('task_id = %(taskID)i')

+     if hostID is not None:

+         clauses.append('host_id = %(hostID)i')

+     if states is not None:

+         clauses.append('state IN %(states)s')

+ 

+     query = QueryProcessor(

+         tables=['scheduler_task_runs'], columns=columns, aliases=aliases,

+         clauses=clauses, values=locals(), opts={'order': 'id'},

+     )

+     return query.execute()

+ 

+ 

+ def get_ready_hosts():

+     """Return information about hosts that are ready to build.

+ 

+     Hosts set the ready flag themselves

+     Note: We ignore hosts that are late checking in (even if a host

+         is busy with tasks, it should be checking in quite often).

+ 

+     host dict contains:

+       - id

+       - name

+       - list(arches)

+       - task_load

+       - capacity

+       - list(channels) (ids)

+       - [resources]

+     """

+     query = QueryProcessor(

+         tables=['host'],

+         columns=['host.id', 'name', 'arches', 'task_load', 'capacity', 'data'],

+         aliases=['id', 'name', 'arches', 'task_load', 'capacity', 'data'],

+         clauses=[

+             'enabled IS TRUE',

+             'ready IS TRUE',

+             'expired IS FALSE',

+             'master IS NULL',

+             'active IS TRUE',

+             "update_time > NOW() - '5 minutes'::interval",

+             'capacity > task_load',

+         ],

+         joins=[

+             'sessions USING (user_id)',

+             'host_config ON host.id = host_config.host_id',

+             'LEFT JOIN scheduler_host_data ON host.id = scheduler_host_data.host_id',

+         ]

+     )

+     hosts = query.execute()

+     for host in hosts:

+         query = QueryProcessor(

+             tables=['host_channels'],

+             columns=['channel_id'],

+             clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],

+             joins=['channels ON host_channels.channel_id = channels.id'],

+             values=host,

+             opts={'asList': True},

+         )

+         rows = query.execute()

+         host['channels'] = [row[0] for row in rows]

+         host['arches'] = host['arches'].split() + ['noarch']

+     return hosts

+ 

+ 

+ def clean_scheduler_queue():

+     # FAIL inconsistent runs, but not tasks

+     query = QueryProcessor(

+         tables=['scheduler_task_runs', 'task'],

+         columns=['scheduler_task_runs.id'],

+         clauses=[

+             'task.id = scheduler_task_runs.task_id',

+             'scheduler_task_runs.state = %(state)s',

+             'scheduler_task_runs.state != task.state',

+         ],

+         values={'state': koji.TASK_STATES['OPEN']},

+         opts={'asList': True},

+     )

+     run_ids = [x[0] for x in query.execute()]

+     # FAIL (timeout) also runs which are scheduled for too long and were not picked

+     # by their respective workers, try to find new builders for them

+     query = QueryProcessor(

+         tables=['scheduler_task_runs'],

+         columns=['id'],

+         clauses=[

+             "create_time < NOW() - '5 minutes'::interval",

+             "state = %(state)i",

+         ],

+         values={'state': koji.TASK_STATES['SCHEDULED']},

+         opts={'asList': True},

+     )

+     # TODO: does it make sense to have TIMEOUTED state for runs?

+     run_ids += [x[0] for x in query.execute()]

+     if run_ids:

+         update = UpdateProcessor(

+             table='scheduler_task_runs',

+             clauses=['id IN %(run_ids)s'],

+             values={'run_ids': run_ids},

+             data={'state': koji.TASK_STATES['FAILED']},

+             rawdata={'end_time': 'NOW()'},

+         )

+         update.execute()

+ 

+ 

+ def schedule(task_id=None):

+     """Run scheduler"""

+ 

+     # TODO: locking so, only one scheduler runs in a time

+     # TODO: don't run it too often (configurable)

+     # TODO: run only reasonably, now we trigger it on every updateHost + makeTask

+ 

+     # stupid for now, just add new task to random builder

+     logger.error("SCHEDULER RUN")

+     hosts = HostHashTable()

+     if not hosts.hosts:

+         # early fail if there is nothing available

+         dblogger.debug("Hosts not found")

+         return

+ 

+     # find unscheduled tasks

+     columns = ['id', 'arch', 'method', 'channel_id', 'priority', 'host_id']

+     if not task_id:

+         clean_scheduler_queue()

+         query = QueryProcessor(

+             tables=['task'], columns=columns,

+             clauses=[

+                 'state IN %(states)s',

+                 'id NOT IN (SELECT task_id FROM scheduler_task_runs WHERE state = 6)'

+             ],

+             values={'states': [koji.TASK_STATES['FREE'], koji.TASK_STATES['ASSIGNED']]},

+             opts={'order': '-priority'},

+         )

+     else:

+         query = QueryProcessor(

+             tables=['task'], columns=columns,

+             clauses=['id = %(id)i'], values={'id': task_id},

+             opts={'order': '-priority'},

+         )

+     tasks = list(query.execute())

+ 

+     # assign them to builders fulfiling criteria in priority order

+     runs = []

+     for task in tasks:

+         host = hosts.get(task)

+         if not host:

+             # TODO: log that there is not available builder

+             dblogger.warning("Can't find adequate builder", task_id=task['id'])

+             continue

+         runs.append({

+             'host_id': host['id'],

+             'task_id': task['id'],

+             'state': koji.TASK_STATES['SCHEDULED'],

+         })

+         dblogger.info("Scheduling", task_id=task['id'], host_id=host['id'])

+     insert = BulkInsertProcessor(table='scheduler_task_runs', data=runs)

+     insert.execute()

+ 

+ 

+ class SchedulerExports():

+     getTaskRuns = staticmethod(get_task_runs)

+     getHostData = staticmethod(get_host_data)

+ 

+     def getLogs(self, taskID=None, hostID=None, level=None,

+                 from_ts=None, to_ts=None, logger_name=None):

+         """Return all related log messages

+ 

+         :param int taskID: filter by task

+         :param int hostID: filter by host

+         :param str level: filter by message level

+         :param float from_ts: filter from earliest time

+         :param float to_ts: filter to latest time (from_ts < ts <= to_ts)

+         :param str logger_name: filter by logger name

+         :return [dict]: list of messages

+         """

+         fields = (

+             ('scheduler_log_messages.id', 'id'),

+             ('task_id', 'task_id'),

+             ('host_id', 'host_id'),

+             ('msg_time', 'msg_time'),

+             ("date_part('epoch', msg_time)", 'msg_ts'),

+             ('logger_name', 'logger_name'),

+             ('level', 'level'),

+             ('location', 'location'),

+             ('msg', 'msg'),

+             ('host.name', 'host_name'),

+         )

+         clauses = []

+         values = {}

+         if taskID is not None:

+             clauses.append("task_id = %(taskID)i")

+             values['taskID'] = taskID

+         if hostID is not None:

+             clauses.append("host_id = %(hostID)i")

+             values['hostID'] = hostID

+         if level is not None:

+             clauses.append("level = %(level)s")

+             values['level'] = level.upper()

+         if from_ts is not None:

+             clauses.append("msg_time > %(from_ts)s")

+             values['from_ts'] = float(from_ts)

+         if to_ts is not None:

+             clauses.append("msg_time <= %(to_ts)s")

+             values['to_ts'] = float(to_ts)

+         if logger_name is not None:

+             clauses.append("logger_name = %(to_ts)s")

+             values['logger_name'] = logger_name

+ 

+         columns, aliases = zip(*fields)

+         query = QueryProcessor(tables=['scheduler_log_messages'],

+                                columns=columns, aliases=aliases,

+                                joins=['LEFT JOIN host ON host_id = host.id'],

+                                clauses=clauses, values=values,

+                                opts={'order': 'msg_time'})

+         return query.execute()

+ 

+ 

+ class DBLogger(object):

+     """DBLogger class for encapsulating scheduler logging. It is thread-safe

+     as both logging parts do this per se (loggind + DB handler via context)"""

+ 

+     def __init__(self, logger_name=None):

+         self.log_level = None

+         if logger_name:

+             self.logger = logger_name

+         else:

+             self.logger = 'koji.scheduler'

+ 

+     def log(self, msg, logger_name=None, level=logging.NOTSET,

+             task_id=None, host_id=None, location=None):

+         if self.log_level is None:

+             # can't be done in constructor, as config is not loaded in that time

+             log_level = context.opts.get('SchedulerLogLevel')

+             valid_levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')

+             if log_level not in valid_levels:

+                 raise koji.GenericError(f"Invalid log level: {log_level}")

+             self.log_level = logging.getLevelName(log_level)

+         if level < self.log_level:

+             return

+         if not logger_name:

+             logger_name = self.logger

+         if location is None:

+             frame = inspect.currentframe()

+             frames = inspect.getouterframes(frame)

+             frame = frames[1]

+             location = frame.function

+         # log to regular log

+         text = f"task: {task_id}, host: {host_id}, location: {location}, message: {msg}"

+         logging.getLogger(logger_name).log(level, text)

+         # log to db

+         insert = InsertProcessor(

+             'scheduler_log_messages',

+             data={

+                 'logger_name': logger_name,

+                 'level': logging._levelToName[level],

+                 'task_id': task_id,

+                 'host_id': host_id,

+                 'location': location,

+                 'msg': msg,

+             }

+         )

+         insert.execute()

+ 

+     debug = functools.partialmethod(log, level=logging.DEBUG)

+     info = functools.partialmethod(log, level=logging.INFO)

+     warning = functools.partialmethod(log, level=logging.WARNING)

+     error = functools.partialmethod(log, level=logging.ERROR)

+     critical = functools.partialmethod(log, level=logging.CRITICAL)

+ 

+ 

+ dblogger = DBLogger()

@@ -0,0 +1,107 @@ 

+ import logging

+ import mock

+ import unittest

+ 

+ import scheduler

+ 

+ IP = scheduler.InsertProcessor

+ 

+ 

+ class TestDBLogger(unittest.TestCase):

+     def setUp(self):

+         self.InsertProcessor = mock.patch('scheduler.InsertProcessor',

+                                           side_effect=self.getInsert).start()

+         self.inserts = []

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     def getInsert(self, *args, **kwargs):

+         insert = IP(*args, **kwargs)

+         insert.execute = mock.MagicMock()

+         self.inserts.append(insert)

+         return insert

+ 

+     def test_defaults(self):

+         logger = scheduler.DBLogger()

+         self.assertEqual(logger.logger, 'koji.scheduler')

+         self.assertEqual(len(self.inserts), 0)

+ 

+     def test_basic(self):

+         logger = scheduler.DBLogger()

+         logger.log("text")

+         self.assertEqual(len(self.inserts), 1)

+         insert = self.inserts[0]

+         self.assertEqual(insert.table, 'scheduler_log_messages')

+         self.assertEqual(insert.data, {

+             'task_id': None,

+             'host_id': None,

+             'logger_name': 'koji.scheduler',

+             'level': 'NOTSET',

+             'location': 'test_basic',

+             'msg': 'text',

+         })

+ 

+     def test_all(self):

+         logger = scheduler.DBLogger()

+         logger.log("text", logger_name="logger_name", level=logging.ERROR,

+                    task_id=123, host_id=456, location="location")

+         self.assertEqual(len(self.inserts), 1)

+         insert = self.inserts[0]

+         self.assertEqual(insert.data, {

+             'task_id': 123,

+             'host_id': 456,

+             'logger_name': 'logger_name',

+             'level': 'ERROR',

+             'location': 'location',

+             'msg': 'text',

+         })

+ 

+     def test_levels(self):

+         logger = scheduler.DBLogger()

+         for level in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):

+             m = getattr(logger, level.lower())

+             m("")

+             insert = self.inserts[0]

+             self.assertEqual(insert.data['level'], level)

+             self.inserts = []

+ 

+ 

+ class TestHostHashTable(unittest.TestCase):

+     def test_get(self):

+         hosts = [

+             {

+                 'id': 1,

+                 'arches': ['i386', 'x86_64', 'noarch'],

+                 'channels': [1],

+                 'capacity': 2.0,

+                 'task_load': 0.0,

+             },

+             {

+                 'id': 2,

+                 'arches': ['i386'],

+                 'channels': [1, 2],

+                 'capacity': 2.0,

+                 'task_load': 0.0,

+             },

+             {

+                 'id': 3,

+                 'arches': ['x86_64', 'noarch'],

+                 'channels': [2],

+                 'capacity': 3.0,

+                 'task_load': 0.0,

+             }

+         ]

+         ht = scheduler.HostHashTable(hosts)

+ 

+         result = ht.get({'arch': 'noarch'})

+         self.assertEqual(result['id'], 3)

+ 

+         result = ht.get({'arch': 'x86_64'})

+         self.assertEqual(result['id'], 3)

+ 

+         result = ht.get({'channel_id': 2})

+         self.assertEqual(result['id'], 3)

+ 

+         result = ht.get({'channel_id': 2, 'arch': 'i386'})

+         self.assertEqual(result['id'], 2)

file modified
+19
@@ -147,6 +147,21 @@ 

              cursor.execute("VACUUM ANALYZE buildroot")

  

  

+ def clean_scheduler_logs(cursor, vacuum, test, age):

+     clauses = f"(msg_time < NOW() - '{age:d} days'::interval)"

+     if options.verbose:

+         query = QueryProcessor(tables=["scheduler_log_messages"],

+                                clauses=clauses,

+                                opts={'countOnly': True})

+         rows = query.execute()

+         print(f"Deleting {rows} scheduler log messages")

+     if not test:

+         delete = DeleteProcessor(table="scheduler_log_messages", clauses=clauses)\

+         delete.execute()

+         if vacuum:

+             cursor.execute("VACUUM ANALYZE scheduler_log_messages")

+ 

+ 

  if __name__ == "__main__":

      global options

      parser = OptionParser("%prog cleans koji database")
@@ -180,6 +195,9 @@ 

      parser.add_option('--scratch-builds-age', type=int, dest="scratch_age",

                        action="store", default=730, metavar="DAYS",

                        help="Delete scratch builds' tasks older than this (default: 2 years")

+     parser.add_option('--logs-age', type=int,

+                       action="store", default=7, metavar="DAYS",

+                       help="Delete scheduler log messages older than this (default: 7 days)")

      parser.add_option('--buildroots', action="store_true",

                        help="Delete unreferenced buildroots")

      parser.add_option('-f', '--force', action="store_true",
@@ -240,6 +258,7 @@ 

      clean_sessions(cursor, options.vacuum, options.test, options.sessions_age,

                     options.sessions_absolute_age)

      clean_reservations(cursor, options.vacuum, options.test, options.reservations_age)

+     clean_scheduler_logs(cursor, options.vacuum, options.test, options.logs_age)

      if options.tag_notifications:

          clean_notification_tasks(cursor, options.vacuum, options.test,

                                   age=options.tag_notifications_age)

3 new commits added

  • configurable log level for scheduler
  • make hostdata available to scheduler
  • upload host data
a year ago

10 new commits added

  • configurable log level for scheduler
  • make hostdata available to scheduler
  • upload host data
  • wip
  • wip
  • wip
  • wip
  • fix aftermerge issues
  • Merge branch 'scheduler-api' into scheduler-all
  • Merge branch 'scheduler2' into scheduler-all
a year ago

1 new commit added

  • test rich interface
a year ago

3 new commits added

  • filter hosts by methods
  • rich print for CLI
  • fix query
a year ago

1 new commit added

  • add method to log print
a year ago

15 new commits added

  • add method to log print
  • filter hosts by methods
  • rich print for CLI
  • fix query
  • test rich interface
  • configurable log level for scheduler
  • make hostdata available to scheduler
  • upload host data
  • wip
  • wip
  • wip
  • wip
  • fix aftermerge issues
  • Merge branch 'scheduler-api' into scheduler-all
  • Merge branch 'scheduler2' into scheduler-all
a year ago

2 new commits added

  • fix query
  • limit tasks by maxjobs
a year ago

1 new commit added

  • backward compatibility
a year ago

1 new commit added

  • fix task closing
a year ago

1 new commit added

  • fix query
a year ago

Pull-Request has been closed by tkopecek

9 months ago