| |
@@ -1381,10 +1381,62 @@
|
| |
"""Return the set of agreements related to this suffix replica
|
| |
:param: winsync: If True then return winsync replication agreements,
|
| |
otherwise return teh standard replication agreements.
|
| |
- :returns: Agreements object
|
| |
+ :returns: A list Replicas objects
|
| |
"""
|
| |
return Agreements(self._instance, self.dn, winsync=winsync)
|
| |
|
| |
+ def get_consumer_replicas(self, get_credentials):
|
| |
+ """Return the set of consumer replicas related to this suffix replica through its agreements
|
| |
+
|
| |
+ :param get_credentials: A user-defined callback function which returns the binding credentials
|
| |
+ using given host and port data - {"binddn": "cn=Directory Manager",
|
| |
+ "bindpw": "password"}
|
| |
+ :returns: Replicas object
|
| |
+ """
|
| |
+
|
| |
+ agmts = self.get_agreements()
|
| |
+ result_replicas = []
|
| |
+ connections = []
|
| |
+
|
| |
+ try:
|
| |
+ for agmt in agmts:
|
| |
+ host = agmt.get_attr_val_utf8("nsDS5ReplicaHost")
|
| |
+ port = agmt.get_attr_val_utf8("nsDS5ReplicaPort")
|
| |
+ protocol = agmt.get_attr_val_utf8("nsDS5ReplicaTransportInfo").lower()
|
| |
+
|
| |
+ # The function should be defined outside and
|
| |
+ # it should have all the logic for figuring out the credentials
|
| |
+ credentials = get_credentials(host, port)
|
| |
+ if not credentials["binddn"]:
|
| |
+ report_data[supplier] = {"status": "Unavailable",
|
| |
+ "reason": "Bind DN was not specified"}
|
| |
+ continue
|
| |
+
|
| |
+ # Open a connection to the consumer
|
| |
+ consumer = DirSrv(verbose=self._instance.verbose)
|
| |
+ args_instance[SER_HOST] = host
|
| |
+ if protocol == "ssl" or protocol == "ldaps":
|
| |
+ args_instance[SER_SECURE_PORT] = int(port)
|
| |
+ else:
|
| |
+ args_instance[SER_PORT] = int(port)
|
| |
+ args_instance[SER_ROOT_DN] = credentials["binddn"]
|
| |
+ args_instance[SER_ROOT_PW] = credentials["bindpw"]
|
| |
+ args_standalone = args_instance.copy()
|
| |
+ consumer.allocate(args_standalone)
|
| |
+ try:
|
| |
+ consumer.open()
|
| |
+ except ldap.LDAPError as e:
|
| |
+ self._log.debug(f"Connection to consumer ({host}:{port}) failed, error: {e}")
|
| |
+ raise
|
| |
+ connections.append(consumer)
|
| |
+ result_replicas.append(Replicas(consumer))
|
| |
+ except:
|
| |
+ for conn in connections:
|
| |
+ conn.close()
|
| |
+ raise
|
| |
+
|
| |
+ return result_replicas
|
| |
+
|
| |
def get_rid(self):
|
| |
"""Return the current replicas RID for this suffix
|
| |
|
| |
@@ -1412,6 +1464,15 @@
|
| |
|
| |
return RUV(data)
|
| |
|
| |
+ def get_maxcsn(self):
|
| |
+ """Return the current replica's maxcsn for this suffix
|
| |
+
|
| |
+ :returns: str
|
| |
+ """
|
| |
+ replica_id = self.get_rid()
|
| |
+ replica_ruvs = self.get_ruv()
|
| |
+ return replica_ruvs._rid_maxcsn.get(replica_id, '00000000000000000000')
|
| |
+
|
| |
def get_ruv_agmt_maxcsns(self):
|
| |
"""Return the in memory ruv of this replica suffix.
|
| |
|
| |
@@ -2232,3 +2293,126 @@
|
| |
replicas = Replicas(instance)
|
| |
replica = replicas.get(self._suffix)
|
| |
return replica.get_rid()
|
| |
+
|
| |
+
|
| |
+ class ReplicationMonitor(object):
|
| |
+ """The lib389 replication monitor. This is used to check the status
|
| |
+ of many instances at once.
|
| |
+ It also allows to monitor independent topologies and get them into
|
| |
+ the one combined report.
|
| |
+
|
| |
+ :param instance: A supplier or hub for replication topology monitoring
|
| |
+ :type instance: list of DirSrv objects
|
| |
+ :param logger: A logging interface
|
| |
+ :type logger: python logging
|
| |
+ """
|
| |
+
|
| |
+ def __init__(self, instance, logger=None):
|
| |
+ self._instance = instance
|
| |
+ if logger is not None:
|
| |
+ self._log = logger
|
| |
+ else:
|
| |
+ self._log = logging.getLogger(__name__)
|
| |
+
|
| |
+ def _get_replica_status(self, instance, report_data, use_json):
|
| |
+ """Load all of the status data to report
|
| |
+ and add new hostname:port pairs for future processing
|
| |
+ """
|
| |
+
|
| |
+ replicas_status = []
|
| |
+ replicas = Replicas(instance)
|
| |
+ for replica in replicas.list():
|
| |
+ replica_id = replica.get_rid()
|
| |
+ replica_root = replica.get_suffix()
|
| |
+ replica_maxcsn = replica.get_maxcsn()
|
| |
+ agmts_status = []
|
| |
+ agmts = replica.get_agreements()
|
| |
+ for agmt in agmts.list():
|
| |
+ host = agmt.get_attr_val_utf8_l("nsds5replicahost")
|
| |
+ port = agmt.get_attr_val_utf8_l("nsds5replicaport")
|
| |
+ protocol = agmt.get_attr_val_utf8_l('nsds5replicatransportinfo')
|
| |
+ # Supply protocol here because we need it only for connection
|
| |
+ # and agreement status is already preformatted for the user output
|
| |
+ consumer = f"{host}:{port}:{protocol}"
|
| |
+ if consumer not in report_data:
|
| |
+ report_data[consumer] = None
|
| |
+ agmts_status.append(agmt.status(use_json))
|
| |
+ replicas_status.append({"replica_id": replica_id,
|
| |
+ "replica_root": replica_root,
|
| |
+ "maxcsn": replica_maxcsn,
|
| |
+ "agmts_status": agmts_status})
|
| |
+ return replicas_status
|
| |
+
|
| |
+ def generate_report(self, get_credentials, use_json=False):
|
| |
+ """Generate a replication report for each supplier or hub and the instances
|
| |
+ that are connected with it by agreements.
|
| |
+
|
| |
+ :param get_credentials: A user-defined callback function with parameters (host, port) which returns
|
| |
+ a dictionary with binddn and bindpw keys -
|
| |
+ example values "cn=Directory Manager" and "password"
|
| |
+ :type get_credentials: function
|
| |
+ :returns: dict
|
| |
+ """
|
| |
+ report_data = {}
|
| |
+
|
| |
+ initial_inst_key = f"{self._instance.host.lower()}:{str(self._instance.port).lower()}"
|
| |
+ # Do this on an initial instance to get the agreements to other instances
|
| |
+ report_data[initial_inst_key] = self._get_replica_status(self._instance, report_data, use_json)
|
| |
+
|
| |
+ # Check if at least some replica report on other instances was generated
|
| |
+ repl_exists = False
|
| |
+
|
| |
+ # While we have unprocessed instances - continue
|
| |
+ while True:
|
| |
+ try:
|
| |
+ supplier = [host_port for host_port, processed_data in report_data.items() if processed_data is None][0]
|
| |
+ except IndexError:
|
| |
+ break
|
| |
+
|
| |
+ s_splitted = supplier.split(":")
|
| |
+ supplier_hostname = s_splitted[0]
|
| |
+ supplier_port = s_splitted[1]
|
| |
+ supplier_protocol = s_splitted[2]
|
| |
+
|
| |
+ # The function should be defined outside and
|
| |
+ # it should have all the logic for figuring out the credentials.
|
| |
+ # It is done for flexibility purpuses between CLI, WebUI and lib389 API applications
|
| |
+ credentials = get_credentials(supplier_hostname, supplier_port)
|
| |
+ if not credentials["binddn"]:
|
| |
+ report_data[supplier] = {"status": "Unavailable",
|
| |
+ "reason": "Bind DN was not specified"}
|
| |
+ continue
|
| |
+
|
| |
+ # Open a connection to the consumer
|
| |
+ supplier_inst = DirSrv(verbose=self._instance.verbose)
|
| |
+ args_instance[SER_HOST] = supplier_hostname
|
| |
+ if supplier_protocol == "ssl" or supplier_protocol == "ldaps":
|
| |
+ args_instance[SER_SECURE_PORT] = int(supplier_port)
|
| |
+ else:
|
| |
+ args_instance[SER_PORT] = int(supplier_port)
|
| |
+ args_instance[SER_ROOT_DN] = credentials["binddn"]
|
| |
+ args_instance[SER_ROOT_PW] = credentials["bindpw"]
|
| |
+ args_standalone = args_instance.copy()
|
| |
+ supplier_inst.allocate(args_standalone)
|
| |
+ try:
|
| |
+ supplier_inst.open()
|
| |
+ except ldap.LDAPError as e:
|
| |
+ self._log.debug(f"Connection to consumer ({supplier_hostname}:{supplier_port}) failed, error: {e}")
|
| |
+ report_data[supplier] = {"status": "Unavailable",
|
| |
+ "reason": e.args[0]['desc']}
|
| |
+ continue
|
| |
+
|
| |
+ report_data[supplier] = self._get_replica_status(supplier_inst, report_data, use_json)
|
| |
+ repl_exists = True
|
| |
+
|
| |
+ # Now remove the protocol from the name
|
| |
+ report_data_final = {}
|
| |
+ for key, value in report_data.items():
|
| |
+ # We take the initial instance only if it is the only existing part of the report
|
| |
+ if key != initial_inst_key or not repl_exists:
|
| |
+ if not value:
|
| |
+ value = {"status": "Unavailable",
|
| |
+ "reason": "No replicas were found"}
|
| |
+ report_data_final[":".join(key.split(":")[:2])] = value
|
| |
+
|
| |
+ return report_data_final
|
| |
Description: Add a new command to 'dsconf replication' CLI.
'dsconf replication monitor' generates a report which
shows the replication topology to which the instance does belong.
Additional arguments:
-c or --connection [CONNECTION [CONNECTION ...]]
The connection values for monitoring other not
connected topologies. The format:
'host:port:binddn:bindpwd'. You can use regex for host
and port.You can set bindpwd to * and it will be
requested at the runtime.
-a or --alias [ALIAS [ALIAS ...]]
If a host:port is assigned an alias, then the alias
instead of host:port will be displayed in the output.
The format: alias=host:port
https://pagure.io/389-ds-base/issue/50545
Reviewed by: ?