| |
@@ -197,7 +197,7 @@
|
| |
instead of triggering and waiting for new one(s) to finish.
|
| |
Intended to be used while developing the tests.
|
| |
:return: list of Build objects for the MBS builds created
|
| |
- :rtype: list of Build objects
|
| |
+ :rtype: list[Build]
|
| |
"""
|
| |
build_ids = []
|
| |
|
| |
@@ -321,7 +321,7 @@
|
| |
:param int batch: the number of the batch the components should be in
|
| |
:param string package: name of the component (package)
|
| |
:return: List of filtered components
|
| |
- :rtype: list of dict
|
| |
+ :rtype: list[dict]
|
| |
"""
|
| |
filtered = self.component_data["items"]
|
| |
if batch is not None:
|
| |
@@ -484,6 +484,15 @@
|
| |
def __init__(self, mbs_api):
|
| |
self._mbs_api = mbs_api
|
| |
|
| |
+ @staticmethod
|
| |
+ def _get_build_id(build_data):
|
| |
+ if type(build_data) is Build:
|
| |
+ return build_data.id
|
| |
+ elif type(build_data) is int:
|
| |
+ return build_data
|
| |
+ else:
|
| |
+ raise TypeError
|
| |
+
|
| |
def get_builds(self, module, stream, order_desc_by=None):
|
| |
"""Get list of Builds objects via mbs api.
|
| |
|
| |
@@ -543,13 +552,14 @@
|
| |
r.raise_for_status()
|
| |
return [Build(self._mbs_api, build["id"]) for build in r.json()["items"]]
|
| |
|
| |
- def get_module_build(self, build_id, **kwargs):
|
| |
+ def get_module_build(self, build_data, **kwargs):
|
| |
"""Query MBS API on module-builds endpoint for a specific build
|
| |
|
| |
- :attribute build_id (int): build ID
|
| |
+ :attribute build_data (int|Build): build ID
|
| |
:return: module build object
|
| |
:rtype: Build
|
| |
"""
|
| |
+ build_id = self._get_build_id(build_data)
|
| |
url = f"{self._mbs_api}module-builds/{build_id}"
|
| |
r = requests.get(url, params=kwargs)
|
| |
|
| |
@@ -565,11 +575,7 @@
|
| |
:param int interval: scan interval in seconds
|
| |
"""
|
| |
start = time.time()
|
| |
-
|
| |
- if type(build_data) is not int:
|
| |
- build_id = build_data.id
|
| |
- else:
|
| |
- build_id = build_data
|
| |
+ build_id = self._get_build_id(build_data)
|
| |
|
| |
while (time.time() - start) < timeout:
|
| |
build = self.get_module_build(build_id)
|
| |
@@ -593,3 +599,21 @@
|
| |
else:
|
| |
return False
|
| |
return self.wait_for_module_build(build_data, predicate, timeout, interval)
|
| |
+
|
| |
+ def get_final_mmds(self, build_data):
|
| |
+ """Get build's final mmds.
|
| |
+
|
| |
+ :param int|Build build_data: build obj or ID
|
| |
+ :return: dict of parsed mmds (for each arch available)
|
| |
+ :rtype: dict
|
| |
+ """
|
| |
+ build_id = self._get_build_id(build_data)
|
| |
+
|
| |
+ url = f"{self._mbs_api}final-modulemd/{build_id}"
|
| |
+ r = requests.get(url)
|
| |
+ r.raise_for_status()
|
| |
+
|
| |
+ mmds = {}
|
| |
+ for arch, mmd in r.json().items():
|
| |
+ mmds[arch] = yaml.safe_load(mmd)
|
| |
+ return mmds
|
| |