| |
@@ -10,13 +10,13 @@
|
| |
import re
|
| |
import time
|
| |
import six
|
| |
-
|
| |
+ import json
|
| |
+ import datetime
|
| |
from lib389._constants import *
|
| |
from lib389.properties import *
|
| |
from lib389._entry import FormatDict
|
| |
- from lib389.utils import normalizeDN, ensure_bytes, ensure_str, ensure_dict_str
|
| |
+ from lib389.utils import normalizeDN, ensure_bytes, ensure_str, ensure_dict_str, ensure_list_str
|
| |
from lib389 import Entry, DirSrv, NoSuchEntryError, InvalidArgumentError
|
| |
-
|
| |
from lib389._mapped_object import DSLdapObject, DSLdapObjects
|
| |
|
| |
|
| |
@@ -33,16 +33,25 @@
|
| |
:type dn: str
|
| |
"""
|
| |
|
| |
- def __init__(self, instance, dn=None):
|
| |
+ csnpat = r'(.{8})(.{4})(.{4})(.{4})'
|
| |
+ csnre = re.compile(csnpat)
|
| |
+
|
| |
+ def __init__(self, instance, dn=None, winsync=False):
|
| |
super(Agreement, self).__init__(instance, dn)
|
| |
self._rdn_attribute = 'cn'
|
| |
self._must_attributes = [
|
| |
'cn',
|
| |
]
|
| |
- self._create_objectclasses = [
|
| |
- 'top',
|
| |
- 'nsds5replicationagreement',
|
| |
- ]
|
| |
+ if winsync:
|
| |
+ self._create_objectclasses = [
|
| |
+ 'top',
|
| |
+ 'nsDSWindowsReplicationAgreement',
|
| |
+ ]
|
| |
+ else:
|
| |
+ self._create_objectclasses = [
|
| |
+ 'top',
|
| |
+ 'nsds5replicationagreement',
|
| |
+ ]
|
| |
self._protected = False
|
| |
|
| |
def begin_reinit(self):
|
| |
@@ -59,6 +68,7 @@
|
| |
"""
|
| |
done = False
|
| |
error = False
|
| |
+ inprogress = False
|
| |
status = self.get_attr_val_utf8('nsds5ReplicaLastInitStatus')
|
| |
self._log.debug('agreement tot_init status: %s' % status)
|
| |
if not status:
|
| |
@@ -67,33 +77,300 @@
|
| |
error = True
|
| |
elif 'Total update succeeded' in status:
|
| |
done = True
|
| |
+ inprogress = False
|
| |
elif 'Replication error' in status:
|
| |
error = True
|
| |
+ elif 'Total update in progress' in status:
|
| |
+ inprogress = True
|
| |
+ elif 'LDAP error' in status:
|
| |
+ error = True
|
| |
|
| |
- return (done, error)
|
| |
+ return (done, inprogress, error)
|
| |
|
| |
def wait_reinit(self, timeout=300):
|
| |
"""Wait for a reinit to complete. Returns done and error. A correct
|
| |
reinit will return (True, False).
|
| |
-
|
| |
+ :param timeout: timeout value for how long to wait for the reinit
|
| |
+ :type timeout: int
|
| |
:returns: tuple(done, error), where done, error are bool.
|
| |
"""
|
| |
done = False
|
| |
error = False
|
| |
count = 0
|
| |
while done is False and error is False:
|
| |
- (done, error) = self.check_reinit()
|
| |
+ (done, inprogress, error) = self.check_reinit()
|
| |
if count > timeout and not done:
|
| |
error = True
|
| |
count = count + 2
|
| |
time.sleep(2)
|
| |
return (done, error)
|
| |
|
| |
+ def get_agmt_maxcsn(self):
|
| |
+ """Get the agreement maxcsn from the database RUV entry
|
| |
+ :returns: CSN string if found, otherwise None is returned
|
| |
+ """
|
| |
+ from lib389.replica import Replicas
|
| |
+ suffix = self.get_attr_val_utf8(REPL_ROOT)
|
| |
+ agmt_name = self.get_attr_val_utf8('cn')
|
| |
+ replicas = Replicas(self._instance)
|
| |
+ replica = replicas.get(suffix)
|
| |
+ maxcsns = replica.get_ruv_agmt_maxcsns()
|
| |
+
|
| |
+ if maxcsns is None or len(maxcsns) == 0:
|
| |
+ self._log.debug('get_agmt_maxcsn - Failed to get agmt maxcsn from RUV')
|
| |
+ return None
|
| |
+
|
| |
+ for csn in maxcsns:
|
| |
+ comps = csn.split(';')
|
| |
+ if agmt_name == comps[1]:
|
| |
+ # same replica, get maxcsn
|
| |
+ if len(comps) < 6:
|
| |
+ return None
|
| |
+ else:
|
| |
+ return comps[5]
|
| |
+
|
| |
+ self._log.debug('get_agmt_maxcsn - did not find matching agmt maxcsn from RUV')
|
| |
+ return None
|
| |
+
|
| |
+ def get_consumer_maxcsn(self, binddn=None, bindpw=None):
|
| |
+ """Attempt to get the consumer's maxcsn from its database RUV entry
|
| |
+ :param binddn: Specifies a specific bind DN to use when contacting the remote consumer
|
| |
+ :type binddn: str
|
| |
+ :param bindpw: Password for the bind DN
|
| |
+ :type bindpw: str
|
| |
+ :returns: CSN string if found, otherwise "Unavailable" is returned
|
| |
+ """
|
| |
+ host = self.get_attr_val_utf8(AGMT_HOST)
|
| |
+ port = self.get_attr_val_utf8(AGMT_PORT)
|
| |
+ suffix = self.get_attr_val_utf8(REPL_ROOT)
|
| |
+ protocol = self.get_attr_val_utf8('nsds5replicatransportinfo').lower()
|
| |
+
|
| |
+ result_msg = "Unavailable"
|
| |
+
|
| |
+ # If we are using LDAPI we need to provide the credentials, otherwise
|
| |
+ # use the existing credentials
|
| |
+ if binddn is None:
|
| |
+ binddn = self._instance.binddn
|
| |
+ if bindpw is None:
|
| |
+ bindpw = self._instance.bindpw
|
| |
+
|
| |
+ # Get the replica id from supplier to compare to the consumer's rid
|
| |
+ from lib389.replica import Replicas
|
| |
+ replicas = Replicas(self._instance)
|
| |
+ replica = replicas.get(suffix)
|
| |
+ rid = replica.get_attr_val_utf8(REPL_ID)
|
| |
+
|
| |
+ # Open a connection to the consumer
|
| |
+ consumer = DirSrv(verbose=self._instance.verbose)
|
| |
+ args_instance[SER_HOST] = host
|
| |
+ if protocol == "ssl" or protocol == "ldaps":
|
| |
+ args_instance[SER_SECURE_PORT] = int(port)
|
| |
+ else:
|
| |
+ args_instance[SER_PORT] = int(port)
|
| |
+ args_instance[SER_ROOT_DN] = binddn
|
| |
+ args_instance[SER_ROOT_PW] = bindpw
|
| |
+ args_standalone = args_instance.copy()
|
| |
+ consumer.allocate(args_standalone)
|
| |
+ try:
|
| |
+ consumer.open()
|
| |
+ except ldap.LDAPError as e:
|
| |
+ self._instance.log.debug('Connection to consumer ({}:{}) failed, error: {}'.format(host, port, e))
|
| |
+ return result_msg
|
| |
+
|
| |
+ # Search for the tombstone RUV entry
|
| |
+ try:
|
| |
+ entry = consumer.search_s(suffix, ldap.SCOPE_SUBTREE,
|
| |
+ REPLICA_RUV_FILTER, ['nsds50ruv'])
|
| |
+ if not entry:
|
| |
+ self.log.error("Failed to retrieve database RUV entry from consumer")
|
| |
+ else:
|
| |
+ elements = ensure_list_str(entry[0].getValues('nsds50ruv'))
|
| |
+ for ruv in elements:
|
| |
+ if ('replica %s ' % rid) in ruv:
|
| |
+ ruv_parts = ruv.split()
|
| |
+ if len(ruv_parts) == 5:
|
| |
+ result_msg = ruv_parts[4]
|
| |
+ break
|
| |
+ except ldap.LDAPError as e:
|
| |
+ self._instance.log.debug('Failed to search for the suffix ' +
|
| |
+ '({}) consumer ({}:{}) failed, error: {}'.format(
|
| |
+ suffix, host, port, e))
|
| |
+ consumer.close()
|
| |
+ return result_msg
|
| |
+
|
| |
+ def get_agmt_status(self, binddn=None, bindpw=None):
|
| |
+ """Return the status message
|
| |
+ :param binddn: Specifies a specific bind DN to use when contacting the remote consumer
|
| |
+ :type binddn: str
|
| |
+ :param bindpw: Password for the bind DN
|
| |
+ :type bindpw: str
|
| |
+ :returns: A status message about the replication agreement
|
| |
+ """
|
| |
+ status = "Unknown"
|
| |
+
|
| |
+ agmt_maxcsn = self.get_agmt_maxcsn()
|
| |
+ if agmt_maxcsn is not None:
|
| |
+ con_maxcsn = self.get_consumer_maxcsn(binddn=binddn, bindpw=bindpw)
|
| |
+ if con_maxcsn:
|
| |
+ if agmt_maxcsn == con_maxcsn:
|
| |
+ status = "In Synchronization"
|
| |
+ else:
|
| |
+ # Not in sync - attempt to discover the cause
|
| |
+ repl_msg = "Unknown"
|
| |
+ if self.get_attr_val_utf8(AGMT_UPDATE_IN_PROGRESS) == 'TRUE':
|
| |
+ # Replication is on going - this is normal
|
| |
+ repl_msg = "Replication still in progress"
|
| |
+ elif "Can't Contact LDAP" in \
|
| |
+ self.get_attr_val_utf8(AGMT_UPDATE_STATUS):
|
| |
+ # Consumer is down
|
| |
+ repl_msg = "Consumer can not be contacted"
|
| |
+
|
| |
+ status = ("Not in Synchronization: supplier " +
|
| |
+ "(%s) consumer (%s) Reason(%s)" %
|
| |
+ (agmt_maxcsn, con_maxcsn, repl_msg))
|
| |
+ return status
|
| |
+
|
| |
+ def get_lag_time(self, suffix, agmt_name, binddn=None, bindpw=None):
|
| |
+ """Get the lag time between the supplier and the consumer
|
| |
+ :param suffix: The replication suffix
|
| |
+ :type suffix: str
|
| |
+ :param agmt_name: The name of the agreement
|
| |
+ :type agmt_name: str
|
| |
+ :param binddn: Specifies a specific bind DN to use when contacting the remote consumer
|
| |
+ :type binddn: str
|
| |
+ :param bindpw: Password for the bind DN
|
| |
+ :type bindpw: str
|
| |
+ :returns: A time-formated string of the the replication lag (HH:MM:SS).
|
| |
+ :raises: ValueError - if unable to get consumer's maxcsn
|
| |
+ """
|
| |
+ agmt_maxcsn = self.get_agmt_maxcsn()
|
| |
+ con_maxcsn = self.get_consumer_maxcsn(binddn=binddn, bindpw=bindpw)
|
| |
+ if con_maxcsn is None:
|
| |
+ raise ValueError("Unable to get consumer's max csn")
|
| |
+ if con_maxcsn == "Unavailable":
|
| |
+ return con_maxcsn
|
| |
+
|
| |
+ # Extract the csn timstamps and compare them
|
| |
+ match = Agreement.csnre.match(agmt_maxcsn)
|
| |
+ if match:
|
| |
+ agmt_time = int(match.group(1), 16)
|
| |
+ match = Agreement.csnre.match(con_maxcsn)
|
| |
+ if match:
|
| |
+ con_time = int(match.group(1), 16)
|
| |
+ diff = con_time - agmt_time
|
| |
+ if diff < 0:
|
| |
+ lag = datetime.timedelta(seconds=-diff)
|
| |
+ else:
|
| |
+ lag = datetime.timedelta(seconds=diff)
|
| |
+
|
| |
+ # Return a nice formated timestamp
|
| |
+ return "{:0>8}".format(str(lag))
|
| |
+
|
| |
+ def status(self, winsync=False, just_status=False, use_json=False, binddn=None, bindpw=None):
|
| |
+ """Get the status of a replication agreement
|
| |
+ :param winsync: Specifies if the the agreement is a winsync replication agreement
|
| |
+ :type winsync: boolean
|
| |
+ :param just_status: Just return the status string and not all of the status attributes
|
| |
+ :type just_status: boolean
|
| |
+ :param use_json: Return the status in a JSON object
|
| |
+ :type use_json: boolean
|
| |
+ :param binddn: Specifies a specific bind DN to use when contacting the remote consumer
|
| |
+ :type binddn: str
|
| |
+ :param bindpw: Password for the bind DN
|
| |
+ :type bindpw: str
|
| |
+ :returns: A status message
|
| |
+ :raises: ValueError - if failing to get agmt status
|
| |
+ """
|
| |
+ status_attrs_dict = self.get_all_attrs()
|
| |
+ status_attrs_dict = dict((k.lower(), v) for k, v in list(status_attrs_dict.items()))
|
| |
+
|
| |
+ # We need a bind DN and passwd so we can query the consumer. If this is an LDAPI
|
| |
+ # connection, and the consumer does not allow anonymous access to the tombstone
|
| |
+ # RUV entry under the suffix, then we can't get the status. So in this case we
|
| |
+ # need to provide a DN and password.
|
| |
+ if not winsync:
|
| |
+ try:
|
| |
+ status = self.get_agmt_status(binddn=binddn, bindpw=bindpw)
|
| |
+ except ValueError as e:
|
| |
+ status = str(e)
|
| |
+ if just_status:
|
| |
+ if use_json:
|
| |
+ return (json.dumps(status))
|
| |
+ else:
|
| |
+ return status
|
| |
+
|
| |
+ # Get the lag time
|
| |
+ suffix = ensure_str(status_attrs_dict['nsds5replicaroot'][0])
|
| |
+ agmt_name = ensure_str(status_attrs_dict['cn'][0])
|
| |
+ lag_time = self.get_lag_time(suffix, agmt_name, binddn=binddn, bindpw=bindpw)
|
| |
+ else:
|
| |
+ status = "Not available for Winsync agreements"
|
| |
+
|
| |
+ # handle the attributes that are not always set in the agreement
|
| |
+ if 'nsds5replicaenabled' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5replicaenabled'] = ['on']
|
| |
+ if 'nsds5agmtmaxcsn' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5agmtmaxcsn'] = ["unavailable"]
|
| |
+ if 'nsds5replicachangesskippedsince' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5replicachangesskippedsince'] = ["unavailable"]
|
| |
+ if 'nsds5beginreplicarefresh' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5beginreplicarefresh'] = [""]
|
| |
+ if 'nsds5replicalastinitstatus' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5replicalastinitstatus'] = ["unavilable"]
|
| |
+ if 'nsds5replicachangessentsincestartup' not in status_attrs_dict:
|
| |
+ status_attrs_dict['nsds5replicachangessentsincestartup'] = ['0']
|
| |
+ if ensure_str(status_attrs_dict['nsds5replicachangessentsincestartup'][0]) == '':
|
| |
+ status_attrs_dict['nsds5replicachangessentsincestartup'] = ['0']
|
| |
+
|
| |
+ # Case sensitive?
|
| |
+ if use_json:
|
| |
+ result = {'replica-enabled': ensure_str(status_attrs_dict['nsds5replicaenabled'][0]),
|
| |
+ 'update-in-progress': ensure_str(status_attrs_dict['nsds5replicaupdateinprogress'][0]),
|
| |
+ 'last-update-start': ensure_str(status_attrs_dict['nsds5replicalastupdatestart'][0]),
|
| |
+ 'last-update-end': ensure_str(status_attrs_dict['nsds5replicalastupdateend'][0]),
|
| |
+ 'number-changes-sent': ensure_str(status_attrs_dict['nsds5replicachangessentsincestartup'][0]),
|
| |
+ 'number-changes-skipped:': ensure_str(status_attrs_dict['nsds5replicachangesskippedsince'][0]),
|
| |
+ 'last-update-status': ensure_str(status_attrs_dict['nsds5replicalastupdatestatus'][0]),
|
| |
+ 'init-in-progress': ensure_str(status_attrs_dict['nsds5beginreplicarefresh'][0]),
|
| |
+ 'last-init-start': ensure_str(status_attrs_dict['nsds5replicalastinitstart'][0]),
|
| |
+ 'last-init-end': ensure_str(status_attrs_dict['nsds5replicalastinitend'][0]),
|
| |
+ 'last-init-status': ensure_str(status_attrs_dict['nsds5replicalastinitstatus'][0]),
|
| |
+ 'reap-active': ensure_str(status_attrs_dict['nsds5replicareapactive'][0]),
|
| |
+ 'replication-status': status,
|
| |
+ 'replication-lag-time': lag_time
|
| |
+ }
|
| |
+ return (json.dumps(result))
|
| |
+ else:
|
| |
+ retstr = (
|
| |
+ "Status for %(cn)s agmt %(nsDS5ReplicaHost)s:"
|
| |
+ "%(nsDS5ReplicaPort)s" "\n"
|
| |
+ "Replica Enabled: %(nsds5ReplicaEnabled)s" "\n"
|
| |
+ "Update In Progress: %(nsds5replicaUpdateInProgress)s" "\n"
|
| |
+ "Last Update Start: %(nsds5replicaLastUpdateStart)s" "\n"
|
| |
+ "Last Update End: %(nsds5replicaLastUpdateEnd)s" "\n"
|
| |
+ "Number Of Changes Sent: %(nsds5replicaChangesSentSinceStartup)s"
|
| |
+ "\n"
|
| |
+ "Number Of Changes Skipped: %(nsds5replicaChangesSkippedSince"
|
| |
+ "Startup)s" "\n"
|
| |
+ "Last Update Status: %(nsds5replicaLastUpdateStatus)s" "\n"
|
| |
+ "Init In Progress: %(nsds5BeginReplicaRefresh)s" "\n"
|
| |
+ "Last Init Start: %(nsds5ReplicaLastInitStart)s" "\n"
|
| |
+ "Last Init End: %(nsds5ReplicaLastInitEnd)s" "\n"
|
| |
+ "Last Init Status: %(nsds5ReplicaLastInitStatus)s" "\n"
|
| |
+ "Reap Active: %(nsds5ReplicaReapActive)s" "\n"
|
| |
+ )
|
| |
+ # FormatDict manages missing fields in string formatting
|
| |
+ entry_data = ensure_dict_str(status_attrs_dict)
|
| |
+ result = retstr % FormatDict(entry_data)
|
| |
+ result += "Replication Status: {}\n".format(status)
|
| |
+ result += "Replication Lag Time: {}\n".format(lag_time)
|
| |
+ return result
|
| |
+
|
| |
def pause(self):
|
| |
"""Pause outgoing changes from this server to consumer. Note
|
| |
that this does not pause the consumer, only that changes will
|
| |
not be sent from this master to consumer: the consumer may still
|
| |
- recieve changes from other replication paths!
|
| |
+ receive changes from other replication paths!
|
| |
"""
|
| |
self.set('nsds5ReplicaEnabled', 'off')
|
| |
|
| |
@@ -122,6 +399,34 @@
|
| |
"""
|
| |
return self.get_attr_val_utf8('nsDS5ReplicaWaitForAsyncResults')
|
| |
|
| |
+
|
| |
+ class WinsyncAgreement(Agreement):
|
| |
+ """A replication agreement from this server instance to
|
| |
+ another instance of directory server.
|
| |
+
|
| |
+ - must attributes: [ 'cn' ]
|
| |
+ - RDN attribute: 'cn'
|
| |
+
|
| |
+ :param instance: An instance
|
| |
+ :type instance: lib389.DirSrv
|
| |
+ :param dn: Entry DN
|
| |
+ :type dn: str
|
| |
+ """
|
| |
+
|
| |
+ def __init__(self, instance, dn=None):
|
| |
+ super(Agreement, self).__init__(instance, dn)
|
| |
+ self._rdn_attribute = 'cn'
|
| |
+ self._must_attributes = [
|
| |
+ 'cn',
|
| |
+ ]
|
| |
+ self._create_objectclasses = [
|
| |
+ 'top',
|
| |
+ 'nsDSWindowsReplicationAgreement',
|
| |
+ ]
|
| |
+
|
| |
+ self._protected = False
|
| |
+
|
| |
+
|
| |
class Agreements(DSLdapObjects):
|
| |
"""Represents the set of agreements configured on this instance.
|
| |
There are two possible ways to use this interface.
|
| |
@@ -149,11 +454,15 @@
|
| |
:type rdn: str
|
| |
"""
|
| |
|
| |
- def __init__(self, instance, basedn=DN_MAPPING_TREE, rdn=None):
|
| |
+ def __init__(self, instance, basedn=DN_MAPPING_TREE, rdn=None, winsync=False):
|
| |
super(Agreements, self).__init__(instance)
|
| |
- self._childobject = Agreement
|
| |
- self._objectclasses = [ 'nsds5replicationagreement' ]
|
| |
- self._filterattrs = [ 'cn', 'nsDS5ReplicaRoot' ]
|
| |
+ if winsync:
|
| |
+ self._childobject = WinsyncAgreement
|
| |
+ self._objectclasses = ['nsDSWindowsReplicationAgreement']
|
| |
+ else:
|
| |
+ self._childobject = Agreement
|
| |
+ self._objectclasses = ['nsds5replicationagreement']
|
| |
+ self._filterattrs = ['cn', 'nsDS5ReplicaRoot']
|
| |
if rdn is None:
|
| |
self._basedn = basedn
|
| |
else:
|
| |
@@ -167,6 +476,7 @@
|
| |
raise ldap.UNWILLING_TO_PERFORM("Refusing to create agreement in %s" % DN_MAPPING_TREE)
|
| |
return super(Agreements, self)._validate(rdn, properties)
|
| |
|
| |
+
|
| |
class AgreementLegacy(object):
|
| |
"""An object that helps to work with agreement entry
|
| |
|
| |
@@ -194,7 +504,6 @@
|
| |
:type agreement_dn: str
|
| |
:param just_status: If True, returns just status
|
| |
:type just_status: bool
|
| |
-
|
| |
:returns: str -- See below
|
| |
:raises: NoSuchEntryError - if agreement_dn is an unknown entry
|
| |
|
| |
@@ -208,7 +517,7 @@
|
| |
Last Update End: 0
|
| |
Num. Changes Sent: 1:10/0
|
| |
Num. changes Skipped: None
|
| |
- Last update Status: 0 Replica acquired successfully:
|
| |
+ Last update Status: Error (0) Replica acquired successfully:
|
| |
Incremental update started
|
| |
Init in progress: None
|
| |
Last Init Start: 0
|
| |
First of all, I think, we really should get away from this legacy methods/objects and we should use existing structures for Replicas, RUV, Agreements.
This method can be put to Agreement(DSLdapObject) and we can benefit from it. The object already has the binddn, bindpw, consumer instance information, etc.
In the later comments, I'll point out what can be changed with what.