#50466 Issue: 50443 - Create a module in lib389 to Convert a byte sequence to a properly escaped for LDAP
Closed 3 months ago by aborah. Opened 8 months ago by aborah.
aborah/389-ds-base utils  into  master

file modified
+97 -37

@@ -31,6 +31,7 @@ 

  import ldap

  import socket

  import time

+ from datetime import datetime

  import sys

  import filecmp

  import pwd

@@ -38,6 +39,8 @@ 

  import shlex

  import operator

  import subprocess

+ import math

+ from pkg_resources.extern.packaging.version import LegacyVersion

  from socket import getfqdn

  from ldapurl import LDAPUrl

  from contextlib import closing

@@ -196,13 +199,6 @@ 

          # No python module, so who knows what state we are in.

          log.error('selinux python module not found, will not relabel files.' )

  

-     try:

-         if status:

-             # Only if we know it's enabled, check if we can manage ports too.

-             import sepolicy

-     except ImportError:

-         log.error('sepolicy python module not found, will not relabel ports.' )

- 

      return status

  

  

@@ -230,11 +226,35 @@ 

          log.debug("Failed to run restorecon on: " + path)

  

  

+ def _get_selinux_port_policies(port):

+     """Get a list of selinux port policies for the specified port, 'tcp' protocol and

+     excluding 'unreserved_port_t', 'reserved_port_t', 'ephemeral_port_t' labels"""

+ 

+     # [2:] - cut first lines containing the headers. [:-1] - empty line

+     policy_lines = subprocess.check_output(["semanage", "port", "-l"], encoding='utf-8').split("\n")[2:][:-1]

+     policies = []

+     for line in policy_lines:

+         data = line.split()

+         ports_list = []

+         for p in data[2:]:

+             if "," in p:

+                 p = p[:-1]

+             if "-" in p:

+                 p = range(int(p.split("-")[0]), int(p.split("-")[1]))

+             else:

+                 p = [int(p)]

+             ports_list.extend(p)

+         if data[1] == 'tcp' and port in ports_list and \

+            data[0] not in ['unreserved_port_t', 'reserved_port_t', 'ephemeral_port_t']:

+             policies.append({'protocol': data[1], 'type': data[0], 'ports': ports_list})

+     return policies

+ 

+ 

  def selinux_label_port(port, remove_label=False):

      """

      Either set or remove an SELinux label(ldap_port_t) for a TCP port

  

-     :param port: The TCP port to be labelled

+     :param port: The TCP port to be labeled

      :type port: str

      :param remove_label: Set True if the port label should be removed

      :type remove_label: boolean

@@ -246,53 +266,44 @@ 

          log.debug('selinux python module not found, skipping port labeling.')

          return

  

-     try:

-         import sepolicy

-     except ImportError:

-         log.debug('sepolicy python module not found, skipping port labeling.')

-         return

- 

      if not selinux.is_selinux_enabled():

          log.debug('selinux is disabled, skipping port relabel')

          return

  

      # We only label ports that ARE NOT in the default policy that comes with

      # a RH based system.

+     port = int(port)

      selinux_default_ports = [389, 636, 3268, 3269, 7389]

      if port in selinux_default_ports:

-         log.debug('port %s already in %s, skipping port relabel' % (port, selinux_default_ports))

+         log.debug('port {} already in {}, skipping port relabel'.format(port, selinux_default_ports))

          return

- 

      label_set = False

      label_ex = None

  

-     policies = [p for p in sepolicy.info(sepolicy.PORT)

-                 if p['protocol'] == 'tcp'

-                 if port in range(p['low'], p['high'] + 1)

-                 if p['type'] not in ['unreserved_port_t', 'reserved_port_t', 'ephemeral_port_t']]

+     policies = _get_selinux_port_policies(port)

  

      for policy in policies:

          if "ldap_port_t" == policy['type']:

              label_set = True  # Port already has our label

-             if policy['low'] != policy['high']:

-                 # We have a range

-                 if port in range(policy['low'], policy['high'] + 1):

-                     # The port is within the range, just return

-                     return

+             if port in policy['ports']:

+                 # The port is within the range, just return

+                 return

              break

          elif not remove_label:

              # Port belongs to someone else (bad)

              # This is only an issue during setting a label, not removing a label

-             raise ValueError("Port {} was already labelled with: ({})  Please choose a different port number".format(port, policy['type']))

+             raise ValueError("Port {} was already labeled with: ({})  Please choose a different port number".format(port, policy['type']))

  

      if (remove_label and label_set) or (not remove_label and not label_set):

          for i in range(5):

- 

              try:

-                 subprocess.check_call(["semanage", "port",

-                                        "-d" if remove_label else "-a",

-                                        "-t", "ldap_port_t",

-                                        "-p", "tcp", str(port)])

+                 result = subprocess.run(["semanage", "port",

+                                          "-d" if remove_label else "-a",

+                                          "-t", "ldap_port_t",

+                                          "-p", "tcp", str(port)],

+                                          stdout=subprocess.PIPE,

+                                          stderr=subprocess.PIPE)

+                 log.debug(f"CMD: {' '.join(result.args)} ; STDOUT: {result.stdout} ; STDERR: {result.stderr}")

                  return

              except (OSError, subprocess.CalledProcessError) as e:

                  label_ex = e

@@ -1056,33 +1067,57 @@ 

      return p.version

  

  

- def ds_is_related(relation, *ver):

+ def ds_is_related(relation, ds_ver, *ver):

      """

      Return a result of a comparison between the current version of ns-slapd and a provided version.

      """

      ops = {'older': operator.lt,

             'newer': operator.ge}

-     ds_ver = get_ds_version()

      if len(ver) > 1:

          for cmp_ver in ver:

              if cmp_ver.startswith(ds_ver[:3]):

-                 return ops[relation](ds_ver,cmp_ver)

+                 return ops[relation](LegacyVersion(ds_ver),LegacyVersion(cmp_ver))

      else:

-         return ops[relation](ds_ver, ver[0])

+         return ops[relation](LegacyVersion(ds_ver), LegacyVersion(ver[0]))

  

  

  def ds_is_older(*ver):

      """

      Return True if the current version of ns-slapd is older than a provided version

      """

-     return ds_is_related('older', *ver)

+     ds_ver = get_ds_version()

+     return ds_is_related('older', ds_ver, *ver)

  

  

  def ds_is_newer(*ver):

      """

      Return True if the current version of ns-slapd is newer than a provided version

      """

-     return ds_is_related('newer', *ver)

+     ds_ver = get_ds_version()

+     return ds_is_related('newer', ds_ver, *ver)

+ 

+ 

+ def gentime_to_datetime(gentime):

+     """Convert Generalized time to datetime object

+ 

+     :param gentime: Time in the format - YYYYMMDDHHMMSSZ (20170126120000Z)

+     :type password: str

+     :returns: datetime.datetime object

+     """

+ 

+     return datetime.strptime(gentime, '%Y%m%d%H%M%SZ')

+ 

+ 

+ def gentime_to_posix_time(gentime):

+     """Convert Generalized time to POSIX time format

+ 

+     :param gentime: Time in the format - YYYYMMDDHHMMSSZ (20170126120000Z)

+     :type password: str

+     :returns: Epoch time int

+     """

+ 

+     target_timestamp = gentime_to_datetime(gentime)

+     return datetime.timestamp(target_timestamp)

  

  

  def getDateTime():

@@ -1269,3 +1304,28 @@ 

  def display_log_data(data, hide_sensitive=True):

      # Take a dict and mask all the sensitive data

      return {a: display_log_value(a, v, hide_sensitive) for a, v in data.items()}

+ 

+ 

+ def convert_bytes(bytes):

+     bytes = int(bytes)

+     if bytes == 0:

+         return "0 B"

+     size_name = ["B", "KB", "MB", "GB", "TB", "PB"]

+     i = int(math.floor(math.log(bytes, 1024)))

+     pow = math.pow(1024, i)

+     siz = round(bytes / pow, 2)

+     return "{} {}".format(siz, size_name[i])

+ 

+ 

+     

+ def escape_bytes(bytes_value):

+     """ Convert a byte sequence to a properly escaped for 

+     LDAP (format BACKSLASH HEX HEX) string"""

+     # copied from https://github.com/cannatag/ldap3/blob/master/ldap3/utils/conv.py

+     if str is not bytes:

+         if isinstance(bytes_value, str):

+             bytes_value = bytearray(bytes_value, encoding='utf-8')

+         escaped = '\\'.join([('%02x' % int(b)) for b in bytes_value])

+     else:

+         escaped = ''

+     return ('\\' + escaped) if escaped else '' 

\ No newline at end of file

Create a module in lib389 to Convert a byte sequence to a properly escaped for LDAP

Fixes: https://pagure.io/389-ds-base/issue/50443

Author: aborah

Reviewed by: ???

rebased onto 6c75fbecf8ea0ab3f42ee5d8a0d21d4c013665a9

7 months ago

rebased onto 0519e99

3 months ago

Pull-Request has been closed by aborah

3 months ago
Metadata