Commit 63dba01 Run monitor and runner as separate processes.

6 files Authored and Committed by davidcarlos a year ago
Run monitor and runner as separate processes.

    - close #35

    
  1 @@ -4,47 +4,26 @@
  2   analyzed. This module provides such capabilities.
  3   """
  4   import threading
  5 - from multiprocessing import Process
  6 + from multiprocessing import Process, Queue
  7   import time
  8 + import os
  9   
 10   import kiskadee.database
 11 - import kiskadee.runner
 12 + from kiskadee.runner import Runner
 13   import kiskadee.queue
 14   from kiskadee.model import Package, Plugin, Version
 15 - from kiskadee.util import _plugin_name
 16   
 17   RUNNING = True
 18   
 19 - 
 20   class Monitor:
 21       """Provide kiskadee monitoring objects."""
 22   
 23       def __init__(self, _session):
 24           """Return a non initialized Monitor."""
 25           self.session = _session
 26 +         self.kiskadee_queue = None
 27   
 28 -     def initialize(self):
 29 -         """Start all threads related to the monitoring process.
 30 - 
 31 -         This includes all plugins that enqueue packages in the packages_queue,
 32 -         and the monitor() method, which retrieves packages from packages_queue,
 33 -         and makes the necessary database operations
 34 - 
 35 -         .. warning::
 36 - 
 37 -             If a plugin does not enqueue the packages in the packages_queue,
 38 -             the analysis will never be performed. You can use thee decorator
 39 -             `@kiskadee.queue.package_enqueuer` to easiliy enqueue a package.
 40 -         """
 41 -         _start(self.monitor)
 42 -         plugins = kiskadee.load_plugins()
 43 -         for plugin in plugins:
 44 -             self._save_plugin(plugin)
 45 -             _start(plugin.Plugin().watch)
 46 -             time.sleep(1)
 47 -         _start(kiskadee.runner.runner, True)
 48 - 
 49 -     def monitor(self):
 50 +     def monitor(self, kiskadee_queue):
 51           """Dequeue packages and check if they need to be analyzed.
 52   
 53           The packages are dequeued from the `package_queue`. When a package
 54 @@ -52,7 +31,17 @@
 55           queue so the runner component can trigger an analysis. Each plugin must
 56           enqueue its packages in the `packages_queue`.
 57           """
 58 + 
 59 +         kiskadee.logger.debug('Kiskadee PID: {}'.format(os.getppid()))
 60 +         kiskadee.logger.debug('Starting monitor subprocess')
 61 +         kiskadee.logger.debug('monitor PID: {}'.format(os.getpid()))
 62 +         plugins = kiskadee.load_plugins()
 63 +         for plugin in plugins:
 64 +             self._save_plugin(plugin)
 65 +             _start_plugin(plugin.Plugin().watch)
 66 + 
 67           while RUNNING:
 68 +             self.kiskadee_queue = kiskadee_queue
 69               pkg = self.dequeue_package()
 70               if pkg:
 71                   self._send_to_runner(pkg)
 72 @@ -63,8 +52,7 @@
 73       def dequeue_package(self):
 74           """Dequeue packages from packages_queue."""
 75           if not kiskadee.queue.packages_queue.empty():
 76 -             pkg = kiskadee.queue.dequeue_package()
 77 -             kiskadee.queue.package_done()
 78 +             pkg = kiskadee.queue.packages_queue.get()
 79               kiskadee.logger.debug(
 80                       "MONITOR: Dequed Package: {}_{}"
 81                       .format(pkg["name"], pkg["version"])
 82 @@ -74,8 +62,8 @@
 83   
 84       def dequeue_result(self):
 85           """Dequeue analyzed packages from result_queue."""
 86 -         if not kiskadee.queue.result_queue.empty():
 87 -             pkg = kiskadee.queue.dequeue_result()
 88 +         if not self.kiskadee_queue.results_empty():
 89 +             pkg = self.kiskadee_queue.dequeue_result()
 90               kiskadee.logger.debug(
 91                       "MONITOR: Dequed result for package : {}_{}"
 92                       .format(pkg["name"], pkg["version"])
 93 @@ -84,7 +72,7 @@
 94           return {}
 95   
 96       def _send_to_runner(self, pkg):
 97 -         _name = _plugin_name(pkg['plugin'])
 98 +         _name = pkg['plugin'].name
 99           _plugin = self._query(Plugin).filter(Plugin.name == _name).first()
100           _package = (
101                   self._query(Package)
102 @@ -98,13 +86,13 @@
103                           "MONITOR: Sending package {}_{} "
104                           " for analysis".format(pkg['name'], pkg['version'])
105                   )
106 -                 kiskadee.queue.enqueue_analysis(pkg)
107 +                 self.kiskadee_queue.enqueue_analysis(pkg)
108               else:
109                   new_version = pkg['version']
110                   analysed_version = _package.versions[-1].number
111 -                 plugin = pkg['plugin'].Plugin()
112 +                 plugin = pkg['plugin']
113                   if (plugin.compare_versions(new_version, analysed_version)):
114 -                     kiskadee.queue.enqueue_analysis(pkg)
115 +                     self.kiskadee_queue.enqueue_analysis(pkg)
116   
117       def _save_analyzed_pkg(self, pkg):
118           if not pkg:
119 @@ -178,8 +166,8 @@
120   
121   
122       def _save_plugin(self, plugin):
123 -         name = _plugin_name(plugin)
124 -         plugin = plugin.Plugin()
125 +         kiskadee.logger.debug(plugin)
126 +         name = plugin.Plugin().name
127           kiskadee.logger.debug(
128                   "MONITOR: Saving {} plugin in database".format(name)
129               )
130 @@ -194,7 +182,7 @@
131           return self.session.query(arg)
132   
133   
134 - def _start(module, joinable=False, timeout=None):
135 + def _start_plugin(module, joinable=False, timeout=None):
136       module_as_a_thread = threading.Thread(target=module)
137       module_as_a_thread.daemon = True
138       module_as_a_thread.start()
139 @@ -205,9 +193,15 @@
140   def daemon():
141       """Entry point to the monitor module."""
142       # TODO: improve with start/stop system
143 +     _kiskadee_queue = kiskadee.queue.KiskadeeQueue()
144       session = kiskadee.database.Database().session
145       monitor = Monitor(session)
146 -     p = Process(target=monitor.initialize())
147 -     p.daemon = True
148 -     p.start()
149 -     p.join()
150 +     runner = Runner(_kiskadee_queue)
151 +     monitor_process = Process(
152 +             target=monitor.monitor,
153 +             args=(_kiskadee_queue,)
154 +         )
155 +     runner_process = Process(target=runner.runner)
156 +     monitor_process.start()
157 +     runner_process.start()
158 +     runner_process.join()
1 @@ -78,7 +78,7 @@
2       def _create_package_dict(self, src):
3           return {'name': src["Package"],
4                   'version': self._parse_version(src["Version"]),
5 -                 'plugin': kiskadee.plugins.debian,
6 +                 'plugin': kiskadee.plugins.debian.Plugin(),
7                   'meta': {'directory': src['Directory']}
8                   }
9   
1 @@ -18,7 +18,7 @@
2           It should not matter, since example will not receive updates.
3           """
4           example = {}
5 -         example['plugin'] = sys.modules[__name__]
6 +         example['plugin'] = kiskadee.plugins.example.Plugin()
7           example['version'] = '0.1'
8           example['name'] = 'example'
9           return example
  1 @@ -1,75 +1,32 @@
  2   """Provide kiskadee queues and operations on them."""
  3   import time
  4 - 
  5   import queue
  6 + from multiprocessing import Queue
  7 + 
  8   import kiskadee
  9 - from kiskadee.util import _plugin_name
 10   
 11 - analyses_queue = queue.Queue()
 12   packages_queue = queue.Queue()
 13 - result_queue = queue.Queue()
 14 - 
 15 - def enqueue_analysis(package):
 16 -     """Enqueue a package for analysis.
 17 - 
 18 -     A package abstraction is a dict with the following key, value pairs:
 19 -         - plugin: the module object of a kiskadee plugin.
 20 -         - name: package name.
 21 -         - version: package version.
 22 -     """
 23 -     analyses_queue.put(package)
 24 - 
 25 - 
 26 - def dequeue_analysis():
 27 -     """Dequeue a package for analysis.
 28 - 
 29 -     A package abstraction is a dict with the following key, value pairs:
 30 -         - plugin: the module object of a kiskadee plugin.
 31 -         - name: package name.
 32 -         - version: package version.
 33 -     """
 34 -     return analyses_queue.get()
 35 - 
 36 - 
 37 - def analysis_done():
 38 -     """Anounce analysis is finished."""
 39 -     analyses_queue.task_done()
 40 - 
 41   
 42 - def is_empty():
 43 -     """Check if `analyses_queue is empty`.
 44 + class KiskadeeQueue():
 45   
 46 -     Returns True if the queue is empty and False otherwise.
 47 -     """
 48 -     return analyses_queue.empty()
 49 +     def __init__(self):
 50 +         self.analysis = Queue()
 51 +         self.results = Queue()
 52   
 53 +     def enqueue_analysis(self, pkg):
 54 +         return self.analysis.put(pkg)
 55   
 56 - def enqueue_package(package):
 57 -     """Enqueue a package for monitoring purposes.
 58 +     def dequeue_analysis(self):
 59 +         return self.analysis.get()
 60   
 61 -     A package abstraction is a dict with the following key, value pairs:
 62 -         - plugin: the module object of a kiskadee plugin.
 63 -         - name: package name.
 64 -         - version: package version.
 65 -     """
 66 -     packages_queue.put(package)
 67 +     def dequeue_result(self):
 68 +         return self.results.get()
 69   
 70 +     def enqueue_result(self, pkg):
 71 +         return self.results.put(pkg)
 72   
 73 - def dequeue_package():
 74 -     """Dequeue a package for monitoring purposes.
 75 - 
 76 -     A package abstraction is a dict with the following key, value pairs:
 77 -         - plugin: the module object of a kiskadee plugin.
 78 -         - name: package name.
 79 -         - version: package version.
 80 -     """
 81 -     return packages_queue.get()
 82 - 
 83 - 
 84 - def package_done():
 85 -     """Anounce package was verified."""
 86 -     packages_queue.task_done()
 87 - 
 88 +     def results_empty(self):
 89 +         return self.results.empty()
 90   
 91   def source_enqueuer(func):
 92       """Decorate functions to queue return values with enqueue_analysis."""
 93 @@ -78,26 +35,15 @@
 94           enqueue_analysis(source)
 95       return wrapper
 96   
 97 - 
 98   def package_enqueuer(func):
 99       """Decorate functions to queue return values with enqueue_package."""
100       def wrapper(*args, **kwargs):
101           package = func(*args, **kwargs)
102 -         enqueue_package(package)
103 -         plugin = _plugin_name(package['plugin'])
104 +         packages_queue.put(package)
105 +         plugin = package['plugin'].name
106           kiskadee.logger.debug(
107                   "{} plugin: Sending package {}_{} for monitor"
108                   .format(plugin, package['name'], package['version'])
109               )
110           time.sleep(2)
111       return wrapper
112 - 
113 - 
114 - def enqueue_result(package):
115 -     """Enqueue a package analyzed by the runner component."""
116 -     result_queue.put(package)
117 - 
118 - 
119 - def dequeue_result():
120 -     """Dequeue a analyzed package by the runner component."""
121 -     return result_queue.get()
  1 @@ -14,144 +14,148 @@
  2   
  3   RUNNING = True
  4   
  5 - 
  6 - def runner():
  7 -     """Run static analyzers.
  8 - 
  9 -     Continuously dequeue packages from `analyses_queue` and call the
 10 -     :func:`analyze` method, passing the dequeued package. After the analysis,
 11 -     updates the status of this package on the database.
 12 -     """
 13 -     kiskadee.logger.debug('Starting runner component')
 14 -     session = kiskadee.database.Database().session
 15 -     create_analyzers(session)
 16 -     while RUNNING:
 17 -         if not kiskadee.queue.is_empty():
 18 + class Runner:
 19 + 
 20 +     def __init__(self, kiskadee_queue):
 21 +         self.kiskadee_queue = kiskadee_queue
 22 + 
 23 +     def runner(self):
 24 +         """Run static analyzers.
 25 + 
 26 +         Continuously dequeue packages from `analyses_queue` and call the
 27 +         :func:`analyze` method, passing the dequeued package. After the analysis,
 28 +         updates the status of this package on the database.
 29 +         """
 30 +         kiskadee.logger.debug('Starting runner subprocess')
 31 +         kiskadee.logger.debug('runner PID: {}'.format(os.getpid()))
 32 +         session = kiskadee.database.Database().session
 33 +         self.create_analyzers(session)
 34 +         while RUNNING:
 35               kiskadee.logger.debug('RUNNER: dequeuing...')
 36 -             source_to_analysis = kiskadee.queue.dequeue_analysis()
 37 +             source_to_analysis = self.kiskadee_queue.dequeue_analysis()
 38               kiskadee.logger.debug('RUNNER: dequeued %s-%s from %s'
 39 -                                   % (source_to_analysis['name'],
 40 -                                      source_to_analysis['version'],
 41 -                                      source_to_analysis['plugin'].__name__))
 42 - 
 43 -             call_analyzers(source_to_analysis)
 44 - 
 45 - 
 46 - def call_analyzers(source_to_analysis):
 47 -     """Iterate over the package analyzers.
 48 - 
 49 -     For each analyzer defined to analysis the source, call
 50 -     the function :func:`analyze`, passing the source dict, the analyzer
 51 -     to run the analysis, and the path to a compressed source.
 52 -     """
 53 -     plugin = source_to_analysis['plugin'].Plugin()
 54 -     source_path = _path_to_uncompressed_source(
 55 -             source_to_analysis, plugin
 56 -     )
 57 -     if not source_path:
 58 -         return None
 59 - 
 60 -     analyzers = plugin.analyzers()
 61 -     source_to_analysis['results'] = {}
 62 -     for analyzer in analyzers:
 63 -         firehose_report = analyze(
 64 -                 source_to_analysis, analyzer, source_path
 65 -         )
 66 -         if firehose_report:
 67 -             source_to_analysis['results'][analyzer] = firehose_report
 68 +                                     % (source_to_analysis['name'],
 69 +                                         source_to_analysis['version'],
 70 +                                         source_to_analysis['plugin'].name))
 71   
 72 -     if source_to_analysis['results']:
 73 -         kiskadee.logger.debug(
 74 -                 "RUNNER: Sending {}-{} to Monitor"
 75 -                 .format(source_to_analysis["name"],
 76 -                         source_to_analysis["version"])
 77 +             self.call_analyzers(source_to_analysis)
 78 + 
 79 + 
 80 +     def call_analyzers(self, source_to_analysis):
 81 +         """Iterate over the package analyzers.
 82 + 
 83 +         For each analyzer defined to analysis the source, call
 84 +         the function :func:`analyze`, passing the source dict, the analyzer
 85 +         to run the analysis, and the path to a compressed source.
 86 +         """
 87 +         plugin = source_to_analysis['plugin']
 88 +         source_path = self._path_to_uncompressed_source(
 89 +                 source_to_analysis, plugin
 90 +         )
 91 +         if not source_path:
 92 +             return None
 93 + 
 94 +         analyzers = plugin.analyzers()
 95 +         source_to_analysis['results'] = {}
 96 +         for analyzer in analyzers:
 97 +             firehose_report = self.analyze(
 98 +                     source_to_analysis, analyzer, source_path
 99               )
100 -         kiskadee.queue.enqueue_result(source_to_analysis)
101 -     shutil.rmtree(source_path)
102 - 
103 - 
104 - def analyze(source_to_analysis, analyzer, source_path):
105 -     """Run each analyzer on a source_to_analysis.
106 - 
107 -     The `source_to_analysis` dict is in the queue. The keys are:
108 -         - plugin: the plugin module itself
109 -         - name: the package name
110 -         - version: the package version
111 -         - path: plugin default path for packages
112 -         - return: list with firehose reports
113 -     The `analyzer` is the name of a static analyzer already created on the
114 -     database.
115 -     The `source_path` is the directory to a uncompressed source, returned
116 -     by the :func:`_path_to_uncompressed_source`.
117 -     """
118 -     if source_path is None:
119 -         return None
120 - 
121 -     kiskadee.logger.debug('ANALYSIS: running {} ...'.format(analyzer))
122 -     try:
123 -         analysis = kiskadee.analyzers.run(analyzer, source_path)
124 -         firehose_report = kiskadee.converter.to_firehose(analysis,
125 -                                                          analyzer)
126 -         kiskadee.logger.debug('ANALYSIS: DONE {} analysis'.format(analyzer))
127 -         return firehose_report
128 -     except Exception as err:
129 -         kiskadee.logger.debug('RUNNER: could not generate analysis')
130 -         kiskadee.logger.debug(err)
131 -         return None
132 - # TODO: remove compressed/uncompressed files after the analysis
133 - 
134 - 
135 - def _path_to_uncompressed_source(package, plugin):
136 - 
137 -     if not plugin or not package:
138 -         return None
139 - 
140 -     kiskadee.logger.debug(
141 -             'ANALYSIS: Downloading {} '
142 -             'source...'.format(package['name'])
143 -     )
144 - 
145 -     compressed_source = plugin.get_sources(package)
146 - 
147 -     if compressed_source:
148 -         kiskadee.logger.debug(
149 -                 'ANALYSIS: Downloaded {} source in {} path'
150 -                 .format(package['name'], os.path.dirname(compressed_source))
151 +             if firehose_report:
152 +                 source_to_analysis['results'][analyzer] = firehose_report
153 + 
154 +         if source_to_analysis['results']:
155 +             kiskadee.logger.debug(
156 +                     "RUNNER: Sending {}-{} to Monitor"
157 +                     .format(source_to_analysis["name"],
158 +                             source_to_analysis["version"])
159                   )
160 -         uncompressed_source_path = tempfile.mkdtemp()
161 -         shutil.unpack_archive(compressed_source, uncompressed_source_path)
162 -         kiskadee.logger.debug(
163 -                 'ANALYSIS: Unpacking {} source in {} path'
164 -                 .format(package['name'], uncompressed_source_path)
165 -                 )
166 -         if not compressed_source.find("kiskadee/tests") > -1:
167 -             shutil.rmtree(os.path.dirname(compressed_source))
168 +             self.kiskadee_queue.enqueue_result(source_to_analysis)
169 +         shutil.rmtree(source_path)
170 + 
171 + 
172 +     def analyze(self, source_to_analysis, analyzer, source_path):
173 +         """Run each analyzer on a source_to_analysis.
174 + 
175 +         The `source_to_analysis` dict is in the queue. The keys are:
176 +             - plugin: the plugin module itself
177 +             - name: the package name
178 +             - version: the package version
179 +             - path: plugin default path for packages
180 +             - return: list with firehose reports
181 +         The `analyzer` is the name of a static analyzer already created on the
182 +         database.
183 +         The `source_path` is the directory to a uncompressed source, returned
184 +         by the :func:`_path_to_uncompressed_source`.
185 +         """
186 +         if source_path is None:
187 +             return None
188 + 
189 +         kiskadee.logger.debug('ANALYSIS: running {} ...'.format(analyzer))
190 +         try:
191 +             analysis = kiskadee.analyzers.run(analyzer, source_path)
192 +             firehose_report = kiskadee.converter.to_firehose(analysis,
193 +                                                             analyzer)
194 +             kiskadee.logger.debug('ANALYSIS: DONE {} analysis'.format(analyzer))
195 +             return firehose_report
196 +         except Exception as err:
197 +             kiskadee.logger.debug('RUNNER: could not generate analysis')
198 +             kiskadee.logger.debug(err)
199 +             return None
200 +     # TODO: remove compressed/uncompressed files after the analysis
201 + 
202 + 
203 +     def _path_to_uncompressed_source(self, package, plugin):
204 + 
205 +         if not plugin or not package:
206 +             return None
207 + 
208           kiskadee.logger.debug(
209 -                 'ANALYSIS: Unpacked {} source'.format(package['name'])
210 +                 'ANALYSIS: Downloading {} '
211 +                 'source...'.format(package['name'])
212 +         )
213 + 
214 +         compressed_source = plugin.get_sources(package)
215 + 
216 +         if compressed_source:
217 +             kiskadee.logger.debug(
218 +                     'ANALYSIS: Downloaded {} source in {} path'
219 +                     .format(package['name'], os.path.dirname(compressed_source))
220 +                     )
221 +             uncompressed_source_path = tempfile.mkdtemp()
222 +             shutil.unpack_archive(compressed_source, uncompressed_source_path)
223 +             kiskadee.logger.debug(
224 +                     'ANALYSIS: Unpacking {} source in {} path'
225 +                     .format(package['name'], uncompressed_source_path)
226 +                     )
227 +             if not compressed_source.find("kiskadee/tests") > -1:
228 +                 shutil.rmtree(os.path.dirname(compressed_source))
229 +             kiskadee.logger.debug(
230 +                     'ANALYSIS: Unpacked {} source'.format(package['name'])
231 +                     )
232 +             kiskadee.logger.debug(
233 +                     'ANALYSIS: Remove {} temp directory'
234 +                     .format(os.path.dirname(compressed_source))
235                   )
236 -         kiskadee.logger.debug(
237 -                 'ANALYSIS: Remove {} temp directory'
238 -                 .format(os.path.dirname(compressed_source))
239 -             )
240 -         return uncompressed_source_path
241 -     else:
242 -         kiskadee.logger.debug('RUNNER: invalid compressed source')
243 -         return None
244 - 
245 - 
246 - def create_analyzers(_session):
247 -     """Create the analyzers on database.
248 - 
249 -     The kiskadee analyzers are defined on the section `analyzers` of the
250 -     kiskadee.conf file. The `_session` argument represents a sqlalchemy
251 -     session.
252 -     """
253 -     list_of_analyzers = dict(kiskadee.config._sections["analyzers"])
254 -     for name, version in list_of_analyzers.items():
255 -         if not (_session.query(Analyzer).filter(Analyzer.name == name).
256 -                 filter(Analyzer.version == version).first()):
257 -             new_analyzer = kiskadee.model.Analyzer()
258 -             new_analyzer.name = name
259 -             new_analyzer.version = version
260 -             _session.add(new_analyzer)
261 -     _session.commit()
262 +             return uncompressed_source_path
263 +         else:
264 +             kiskadee.logger.debug('RUNNER: invalid compressed source')
265 +             return None
266 + 
267 + 
268 +     def create_analyzers(self, _session):
269 +         """Create the analyzers on database.
270 + 
271 +         The kiskadee analyzers are defined on the section `analyzers` of the
272 +         kiskadee.conf file. The `_session` argument represents a sqlalchemy
273 +         session.
274 +         """
275 +         list_of_analyzers = dict(kiskadee.config._sections["analyzers"])
276 +         for name, version in list_of_analyzers.items():
277 +             if not (_session.query(Analyzer).filter(Analyzer.name == name).
278 +                     filter(Analyzer.version == version).first()):
279 +                 new_analyzer = kiskadee.model.Analyzer()
280 +                 new_analyzer.name = name
281 +                 new_analyzer.version = version
282 +                 _session.add(new_analyzer)
283 +         _session.commit()
 1 @@ -22,7 +22,6 @@
 2       yield
 3       os.chdir(initial_dir)
 4   
 5 - 
 6   def download(path, url, file_name):
 7       """Download something from the internet.
 8   
 9 @@ -45,6 +44,3 @@
10           )
11           kiskadee.logger.debug(err)
12           return {}
13 - 
14 - def _plugin_name(plugin):
15 -     return plugin.__name__.split('.')[len(plugin.__name__.split('.')) - 1]