#1358 Allow running sub-pipelines in Weaver.
Closed 4 years ago by jkaluza. Opened 4 years ago by jkaluza.
jkaluza/pungi subpipelines  into  master

file modified
+15 -13
@@ -11,7 +11,7 @@ 

     inkscape:export-xdpi="90"

     inkscape:export-filename="/home/lsedlar/repos/pungi/doc/_static/phases.png"

     sodipodi:docname="phases.svg"

-    inkscape:version="1.0beta2 (2b71d25, 2019-12-03)"

+    inkscape:version="0.92.4 (unknown)"

     version="1.1"

     id="svg2"

     viewBox="0 0 771.66458 221.50019"
@@ -27,16 +27,16 @@ 

       inkscape:document-rotation="0"

       units="px"

       inkscape:window-maximized="1"

-      inkscape:window-y="23"

+      inkscape:window-y="0"

       inkscape:window-x="0"

-      inkscape:window-height="1035"

+      inkscape:window-height="1015"

       inkscape:window-width="1920"

       showgrid="false"

-      inkscape:current-layer="layer1"

+      inkscape:current-layer="g3668"

       inkscape:document-units="px"

-      inkscape:cy="97.894202"

-      inkscape:cx="396.63448"

-      inkscape:zoom="1.169022"

+      inkscape:cy="121.72161"

+      inkscape:cx="22.855676"

+      inkscape:zoom="1.6532468"

       inkscape:pageshadow="2"

       inkscape:pageopacity="1"

       borderopacity="1.0"
@@ -223,7 +223,8 @@ 

               style="font-size:13.1479px;line-height:1.25">Gather</tspan></text>

        </g>

        <g

-          id="g3647">

+          id="g3647"

+          transform="translate(-59.528387,63.348992)">

          <g

             id="g3644">

            <rect
@@ -245,10 +246,11 @@ 

               id="tspan3374"

               x="165.23042"

               y="923.25934"

-              style="font-size:13.1479px;line-height:1.25">ExtraFiles</tspan></text>

+              style="font-size:13.14789963px;line-height:1.25">ExtraFiles</tspan></text>

        </g>

        <g

-          id="g3658">

+          id="g3658"

+          transform="translate(-78.74996)">

          <rect

             style="fill:#e9b96e;fill-rule:evenodd;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"

             id="rect3348"
@@ -267,10 +269,10 @@ 

               id="tspan3378"

               x="243.95874"

               y="921.86945"

-              style="font-size:13.1479px;line-height:1.25">Createrepo</tspan></text>

+              style="font-size:13.14789963px;line-height:1.25">Createrepo</tspan></text>

        </g>

        <g

-          transform="translate(-150.564,114.11662)"

+          transform="translate(-151.16546,113.6154)"

           id="g3408">

          <rect

             style="fill:#729fcf;fill-rule:evenodd;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
@@ -290,7 +292,7 @@ 

               x="256.90588"

               sodipodi:role="line"

               id="tspan3406"

-              style="font-size:13.1479px;line-height:1.25">OSTree</tspan></text>

+              style="font-size:13.14789963px;line-height:1.25">OSTree</tspan></text>

        </g>

        <g

           id="g288"

file modified
+38 -8
@@ -14,8 +14,9 @@ 

      Otherwise the whole process will hang.

  

      :param compose: it is needed for logging

-     :param phases_schema: two-dimensional array of phases. Top dimension

-         denotes particular pipelines. Second dimension contains phases.

+     :param phases_schema: multi-dimensional array of phases. Top dimension

+         denotes particular pipelines. Second dimension can contain sub-pipelines

+         or phases.

      """

  

      name = "weaver"
@@ -45,7 +46,7 @@ 

  

      def run(self):

          for pipeline in shortcuts.force_list(self._phases_schema):

-             self.pool.add(PipelineThread(self.pool))

+             self.pool.add(PipelineThread(self.pool, self.compose._logger))

              self.pool.queue_put(shortcuts.force_list(pipeline))

  

          self.pool.start()
@@ -64,18 +65,47 @@ 

      Launches phases in pipeline sequentially

      """

  

+     def __init__(self, pool, logger, subpipeline=False, *args, **kwargs):

+         super(PipelineThread, self).__init__(pool, *args, **kwargs)

+         self._logger = logger

+         self._subpipeline = subpipeline

+ 

+     def get_phases_names(self, pipeline):

+         phases_names = []

+         if isinstance(pipeline, list) or isinstance(pipeline, tuple):

+             for phase in pipeline:

+                 subpipeline = self.get_phases_names(phase)

+                 if len(subpipeline) > 1:

+                     subpipeline_phases_names = "(%s)" % ", ".join(subpipeline)

+                 else:

+                     subpipeline_phases_names = subpipeline[0]

+                 phases_names.append(subpipeline_phases_names)

+         else:

+             phases_names.append(pipeline.name)

+         return phases_names

+ 

      def process(self, item, num):

          pipeline = shortcuts.force_list(item)

-         phases_names = ", ".join(phase.name for phase in pipeline)

-         msg = "Running pipeline (%d/%d). Phases: %s" % (

+         phases_names = ", ".join(self.get_phases_names(pipeline))

+         msg = "Running %spipeline (%d/%d). Phases: %s" % (

+             "sub-" if self._subpipeline else "",

              num,

              self.pool.queue_total,

              phases_names,

          )

          self.pool.log_info("[BEGIN] %s" % (msg,))

  

-         for phase in pipeline:

-             phase.start()

-             phase.stop()

+         for p in pipeline:

+             phases = shortcuts.force_list(p)

+             if len(phases) > 1:

+                 pool = ThreadPool(logger=self._logger)

+                 for phase in phases:

+                     pool.add(PipelineThread(pool, self._logger, True))

+                     pool.queue_put(phase)

+                 pool.start()

+                 pool.stop()

+             else:

+                 p.start()

+                 p.stop()

  

          self.pool.log_info("[DONE ] %s" % (msg,))

file modified
+1 -1
@@ -426,7 +426,7 @@ 

      # WEAVER phase - launches other phases which can safely run in parallel

      essentials_schema = (

          buildinstall_phase,

-         (gather_phase, extrafiles_phase, createrepo_phase),

+         ((gather_phase, extrafiles_phase), createrepo_phase),

          (ostree_phase, ostree_installer_phase),

      )

      essentials_phase = pungi.phases.WeaverPhase(compose, essentials_schema)

file modified
+35
@@ -68,6 +68,41 @@ 

          self.assertFinalized(self.p1)

          self.assertFinalized(self.p2)

  

+     def test_subpipeline(self):

+         phases_schema = (((self.p1, self.p2), self.p3),)

+         weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)

+         weaver_phase.start()

+         weaver_phase.stop()

+ 

+         self.assertFinalized(self.p1)

+         self.assertFinalized(self.p2)

+         self.assertFinalized(self.p3)

+ 

+     def test_subpipeline_stop_on_failure(self):

+         self.p2.start.side_effect = self.method_with_exception

+ 

+         phases_schema = (((self.p1, self.p2), self.p3),)  # one pipeline

+         weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)

+         with self.assertRaises(Exception) as ctx:

+             weaver_phase.start()

+             weaver_phase.stop()

+ 

+         self.assertEqual("BOOM", str(ctx.exception))

+         self.assertFinalized(self.p1)

+         self.assertInterrupted(self.p2)

+         self.assertMissed(self.p3)

+ 

+     def test_subsubpipeline(self):

+         phases_schema = (((self.p1, ((self.p2, self.p3),)), self.p4),)  # one pipeline

+         weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)

+         weaver_phase.start()

+         weaver_phase.stop()

+ 

+         self.assertFinalized(self.p1)

+         self.assertFinalized(self.p2)

+         self.assertFinalized(self.p3)

+         self.assertFinalized(self.p4)

+ 

      def test_stop_on_failure(self):

          self.p2.start.side_effect = self.method_with_exception

  

Some composes recently start using scm_dict in extra_files which makes
this phase longer (around 3 minutes), but in fact it is possible to
execute extra files phase together with gather phase.

This commit allows that by introducing "sub-pipelines" in the Weaver
phase.

The saved time is interesting only for partial composes which do not
run buildinstall phase, because the buildinstall phase takes much
more time. But it might be useful even with buildinstall in case
we will start caching boot.iso.

Signed-off-by: Jan Kaluza jkaluza@redhat.com

I was also playing with idea to simply define dependencies between phases and execute pungi completely in threads constructed from this metadata. But this would be much bigger change and I don't have clear reason to do that.

Looks fine to me. Extra files only need pkgset, not full gather.

Can you update the logs to show the hierarchy as well? Just keeping the parentheses would do the trick. Currently it's not at all obvious there is this extra parallelism.

2020-03-05 12:36:06 [INFO    ] [BEGIN] Running pipeline (2/3). Phases: gather, extra_files, createrepo

Yeah, but probably tomorrow :).

Looks good.
The picture in doc https://docs.pagure.org/pungi/phases.html#phases would be updated together.

rebased onto 80aeb39

4 years ago

Better logging in the second commit.

1 new commit added

  • Update phases SVG and PNG to show ExtraFiles running in parallel to other phases.
4 years ago

Also updated the phases.svg and phases.png in the third commit.

Is the diagram correct? It shows createrepo starting right after gather. But from my understanding of the code it will also wait for extra_files.

@lsedlar, you are right, I've updated the diagram.

Hi Jan,
originally, there was one pipeline containing "gather_phase", "extrafiles_phase" and "createrepo_phase". Phases were executed one by one. I saw the .png diagram you did. There is "extrafile_phase" placed in the separate pipeline. I think it doesn't correspond to the code, but it gave me an idea. Why not create new single-item "pipeline" for "extrafile_phase"? Do extrafile have any dependencies? If not, we can safely do this. "Gather_phase" and "createrepo_phase" have to run one by one and have to be together in one pipeline.
Phases would look like this:

essentials_schema = (
    buildinstall_phase, 
    (gather_phase, createrepo_phase),
    extrafiles_phase,
    (ostree_phase, ostree_installer_phase),
)

In this case, the new sub-pipeline functionality is not necessary and code could remain simpler. What do you think? Did I miss something?

Hm, I was not 100% sure if createrepo_phase needs extrafiles_phase or not. After checking the createrepo_phase more closer, it seems it does not need extra_filesphase and therefore the change you proposed would be possible.

I will give it a try and in case it will still work I will submit new PR.

@onosek, I think you are right. Based on my tests, your proposed change works correctly too. Thanks :).

I've opened https://pagure.io/pungi/pull-request/1363 and I'm closing this PR.

Pull-Request has been closed by jkaluza

4 years ago