#360 Add an RPM wrapper that always has unicode headers
Merged 4 years ago by ngompa. Opened 4 years ago by churchyard.
churchyard/FedoraReview unicode_rpm  into  master

file modified
+2 -2
@@ -160,7 +160,7 @@ 

  

      def run_on_applicable(self):

          """ Run the check """

-         cur_version = self.spec.expand_tag("Version").decode("utf-8")

+         cur_version = self.spec.expand_tag("Version")

          up_version = self.get_upstream_r_package_version()

          if up_version is None:

              self.set_passed(
@@ -168,7 +168,7 @@ 

                  "The package does not come from one of the standard sources",

              )

              return

-         up_version = up_version.decode("utf-8").replace("-", ".")

+         up_version = up_version.replace("-", ".")

  

          self.set_passed(

              up_version == cur_version,

file modified
+1 -1
@@ -16,7 +16,7 @@ 

          if self.is_user_enabled():

              return self.user_enabled_value()

          archs = self.checks.spec.expand_tag("BuildArchs")

-         if len(archs) == 1 and archs[0].decode("utf-8").lower() == "noarch":

+         if len(archs) == 1 and archs[0].lower() == "noarch":

              return False

          if self.checks.buildsrc.is_available:

              src = self.checks.buildsrc

file modified
+5 -9
@@ -150,7 +150,7 @@ 

  

      def run(self):

          archs = self.checks.spec.expand_tag("BuildArchs")

-         if len(archs) == 1 and archs[0].decode("utf-8").lower() == "noarch":

+         if len(archs) == 1 and archs[0].lower() == "noarch":

              self.set_passed(self.NA)

              return

          self.set_passed(self.PENDING)
@@ -344,9 +344,9 @@ 

      def run(self):

          bad_tags = []

          for pkg_name in self.spec.packages:

-             if "%" in self.spec.expand_tag("Summary", pkg_name).decode("utf-8"):

+             if "%" in self.spec.expand_tag("Summary", pkg_name):

                  bad_tags.append(pkg_name + " (summary)")

-             if "%" in self.spec.expand_tag("Description", pkg_name).decode("utf-8"):

+             if "%" in self.spec.expand_tag("Description", pkg_name):

                  bad_tags.append(pkg_name + " (description)")

          if bad_tags:

              self.set_passed(self.PENDING, "Macros in: " + ", ".join(bad_tags))
@@ -677,11 +677,7 @@ 

              return "UNKNOWN" not in _license and "GENERATED" not in _license

  

          files_by_license = {}

-         raw_file = BytesIO(raw_text.encode("utf-8"))

-         while True:

-             line = raw_file.readline().decode("utf-8")

-             if not line:

-                 break

+         for line in raw_text.splitlines():

              try:

                  file_, license_ = line.split(":")

              except ValueError:
@@ -937,7 +933,7 @@ 

          self.type = "MUST"

  

      def is_applicable(self):

-         license_ = self.spec.expand_tag("License").decode("utf-8").lower().split()

+         license_ = self.spec.expand_tag("License").lower().split()

          return "and" in license_ or "or" in license_

  

  

file modified
+3 -3
@@ -263,7 +263,7 @@ 

          if len(self.spec.packages) == 1:

              self.set_passed(self.NA)

              return

-         if len(archs) == 1 and archs[0].decode("utf-8").lower() == "noarch":

+         if len(archs) == 1 and archs[0].lower() == "noarch":

              isa = ""

          else:

              isa = Mock.get_macro("%_isa", self.spec, self.flags)
@@ -305,7 +305,7 @@ 

          for tag in ("Packager", "Vendor", "PreReq", "Copyright"):

              if not self.spec.find_re(r"^\s*" + tag + r"\s*:"):

                  continue

-             value = self.spec.expand_tag(tag).decode("utf-8")

+             value = self.spec.expand_tag(tag)

              if value:

                  passed = False

                  output += "Found : {}: {}\n".format(tag, value)
@@ -639,7 +639,7 @@ 

          build_ok = self.checks.checkdict["CheckBuild"].is_passed

  

          arch = self.spec.expand_tag("BuildArch")

-         noarch = arch and arch[0].decode("utf-8").lower() == "noarch"

+         noarch = arch and arch[0].lower() == "noarch"

          one_arch = self.spec.expand_tag("ExclusiveArch")

          if build_ok and (one_arch or noarch):

              self.set_passed(self.PASS)

file modified
+1 -1
@@ -136,7 +136,7 @@ 

          self.automatic = True

  

      def run_on_applicable(self):

-         arch = self.spec.expand_tag("arch").decode("utf-8")

+         arch = self.spec.expand_tag("arch")

          if _has_extension(self):

              self.set_passed(

                  "noarch" not in arch,

file modified
+1 -1
@@ -250,7 +250,7 @@ 

      """ Substitute a spec tag into env. """

      value = spec.expand_tag(tag.upper())

      if value:

-         env = env.replace("@" + tag + "@", value.decode("utf-8"))

+         env = env.replace("@" + tag + "@", value)

      else:

          env = env.replace("@" + tag + "@", '""')

      return env

@@ -343,19 +343,19 @@ 

  

      def retrieve_description(self):

          """ Retrieve the description tag from a spec file. """

-         description = self.spec.packages[0].header[1005].decode("utf-8")

+         description = self.spec.packages[0].header[1005]

          self.log.debug("Description: %s", description)

          return description

  

      def retrieve_name(self):

          """ Retrieve the name tag from a spec file. """

-         name = self.spec.packages[0].header[1000].decode("utf-8")

+         name = self.spec.packages[0].header[1000]

          self.log.debug("Name: %s", name)

          return name

  

      def retrieve_summary(self):

          """ Retrieve the summary tag from a spec file. """

-         summary = self.spec.packages[0].header[1004].decode("utf-8")

+         summary = self.spec.packages[0].header[1004]

          self.log.debug("Summary: %s", summary)

          return summary

  

@@ -0,0 +1,82 @@ 

+ import rpm

+ import six

+ 

+ __all__ = ["spec", "hdr"]

+ 

+ 

+ def _rpm_is_sane():

+     s = "aaa"

+     h = rpm.hdr()

+     h["name"] = s

+     return h["name"] == s

+ 

+ 

+ if _rpm_is_sane():

+ 

+     spec = rpm.spec

+     hdr = rpm.hdr

+ 

+ else:

+ 

+     def surrogate(thing):

+         """

+         Always return surrogate escaped unicode instead of bytes

+         """

+         if isinstance(thing, (six.text_type, int, type(None))):

+             return thing

+         if isinstance(thing, six.binary_type):

+             return thing.decode("utf-8", errors="surrogateescape")

+         if isinstance(thing, list):

+             return [surrogate(i) for i in thing]

+         if isinstance(thing, tuple):

+             return tuple(surrogate(i) for i in thing)

+         raise TypeError("{} is not supported".format(type(thing).__name__))

+ 

+     class SurrogateHdr(rpm.hdr):

+         """

+         Wrapper around rpm.hdr that always gives unicode when we need it.

+ 

+         Since Fedora 31, this is the default behavior, but we need to support older.

+         Only wraps the parts FedoraReview actually uses.

+         """

+ 

+         def __getitem__(self, key):

+             return surrogate(super(SurrogateHdr, self).__getitem__(key))

+ 

+     class SurrogatePackage(object):

+         """

+         Wrapper around rpm package that always gives unicode when we need it.

+         An rpm package cannot be inited, so this has the package in self._pkg.

+ 

+         Since Fedora 31, this is the default behavior, but we need to support older.

+         Only wraps the parts FedoraReview actually uses.

+         """

+ 

+         def __init__(self, pkg):

+             self._pkg = pkg

+ 

+         def __getattr__(self, name):

+             if name == "header":

+                 return SurrogateHdr(self._pkg.header)

+             if name == "fileList":

+                 return surrogate(self._pkg.fileList)

+             return getattr(self._pkg, name)

+ 

+     class SurrogateSpec(rpm.spec):

+         """

+         Wrapper around rpm.spec that always gives unicode when we need it.

+ 

+         Since Fedora 31, this is the default behavior, but we need to support older.

+         Only wraps the parts FedoraReview actually uses.

+         """

+ 

+         @property

+         def sourceHeader(self):

+             return SurrogateHdr(super(SurrogateSpec, self).sourceHeader)

+ 

+         @property

+         def packages(self):

+             return [SurrogatePackage(p) for p in super(SurrogateSpec, self).packages]

+ 

+     spec = SurrogateSpec

+     hdr = SurrogateHdr

file modified
+10 -9
@@ -23,6 +23,7 @@ 

  import rpm

  

  from .mock import Mock

+ from .rpm_compat import hdr

  from .settings import Settings

  

  
@@ -53,9 +54,9 @@ 

          eq = rpm.RPMSENSE_EQUAL

          for ix in range(0, len(names) - 1):

              if not versions[ix]:

-                 dp.append(names[ix].decode("utf-8"))

+                 dp.append(names[ix])

                  continue

-             s = names[ix].decode("utf-8") + " "

+             s = names[ix] + " "

              if sense[ix] and eq:

                  s += "= "

              elif sense[ix] and lt:
@@ -65,7 +66,7 @@ 

              else:

                  s += "What? : " + bin(sense[ix])

                  self.log.warning("Bad RPMSENSE: %s", bin(sense[ix]))

-             s += versions[ix].decode("utf-8")

+             s += versions[ix]

              dp.append(s)

          return dp

  
@@ -79,7 +80,7 @@ 

  

          self.filename = Mock.get_package_rpm_path(self)

          fd = os.open(self.filename, os.O_RDONLY)

-         self.header = ts.hdrFromFdno(fd)

+         self.header = hdr(ts.hdrFromFdno(fd))

          os.close(fd)

          self._inited = True

  
@@ -87,9 +88,9 @@ 

      def header_to_str(self, tag):

          """ Convert header in a string, to cope with API changes in F18"""

          if isinstance(self.header[tag], (dict, list)):

-             return " ".join(i.decode("utf-8") for i in self.header[tag])

+             return " ".join(self.header[tag])

          elif self.header[tag]:

-             return self.header[tag].decode("utf-8")

+             return self.header[tag]

          else:

              return None

  
@@ -138,19 +139,19 @@ 

      def filelist(self):

          """ List of files in this rpm (expanded). """

          self.init()

-         return [x.decode("utf-8") for x in self.header[rpm.RPMTAG_FILENAMES]]

+         return self.header[rpm.RPMTAG_FILENAMES]

  

      @property

      def requires(self):

          """ List of requires, also auto-generated for rpm. """

          self.init()

-         return [x.decode("utf-8") for x in self.header[rpm.RPMTAG_REQUIRES]]

+         return self.header[rpm.RPMTAG_REQUIRES]

  

      @property

      def provides(self):

          """ List of provides, also auto-generated for rpm. """

          self.init()

-         return [x.decode("utf-8") for x in self.header[rpm.RPMTAG_PROVIDES]]

+         return self.header[rpm.RPMTAG_PROVIDES]

  

      @property

      def format_requires(self):

file modified
+13 -12
@@ -26,6 +26,7 @@ 

  import rpm

  

  from .review_error import ReviewError, SpecParseReviewError

+ from .rpm_compat import spec

  from .settings import Settings

  from .mock import Mock

  
@@ -71,7 +72,7 @@ 

          def parse_spec():

              """ Let rpm parse the spec and build spec.spec (sic!). """

              try:

-                 self.spec = rpm.TransactionSet().parseSpec(self.filename)

+                 self.spec = spec(self.filename)

              except Exception as ex:

                  raise SpecParseReviewError("Can't parse specfile: " + ex.__str__())

  
@@ -86,13 +87,13 @@ 

          sys.stdout = stdout

          self._packages = None

          self.name_vers_rel = [

-             self.expand_tag(rpm.RPMTAG_NAME).decode("utf-8"),

-             self.expand_tag(rpm.RPMTAG_VERSION).decode("utf-8"),

+             self.expand_tag(rpm.RPMTAG_NAME),

+             self.expand_tag(rpm.RPMTAG_VERSION),

              "*",

          ]

          update_macros()

          parse_spec()

-         self.name_vers_rel[2] = self.expand_tag(rpm.RPMTAG_RELEASE).decode("utf-8")

+         self.name_vers_rel[2] = self.expand_tag(rpm.RPMTAG_RELEASE)

  

      name = property(lambda self: self.name_vers_rel[0])

      version = property(lambda self: self.name_vers_rel[1])
@@ -112,7 +113,7 @@ 

                  )

                  return None

  

-         pkgs = [p.header[rpm.RPMTAG_NAME].decode("utf-8") for p in self.spec.packages]

+         pkgs = [p.header[rpm.RPMTAG_NAME] for p in self.spec.packages]

          pkgs = [p for p in pkgs if not self.get_files(p) is None]

          return [p for p in pkgs if check_pkg_path(p)]

  
@@ -152,7 +153,7 @@ 

          if not pkg_name:

              return self.spec.packages[0]

          for p in self.spec.packages:

-             if p.header[rpm.RPMTAG_NAME].decode("utf-8") == pkg_name:

+             if p.header[rpm.RPMTAG_NAME] == pkg_name:

                  return p

          raise KeyError(pkg_name + ": no such package")

  
@@ -274,12 +275,12 @@ 

      @property

      def build_requires(self):

          """ Return the list of build requirements. """

-         return [x.decode("utf-8") for x in self.spec.sourceHeader[rpm.RPMTAG_REQUIRES]]

+         return self.spec.sourceHeader[rpm.RPMTAG_REQUIRES]

  

      def get_requires(self, pkg_name=None):

          """ Return list of requirements i. e., Requires: """

          package = self._get_pkg_by_name(pkg_name)

-         return [x.decode("utf-8") for x in package.header[rpm.RPMTAG_REQUIRES]]

+         return package.header[rpm.RPMTAG_REQUIRES]

  

      def get_package_nvr(self, pkg_name=None):

          """ Return object with name, version, release for a package. """
@@ -292,9 +293,9 @@ 

  

          package = self._get_pkg_by_name(pkg_name)

          nvr = NVR()

-         nvr.version = package.header[rpm.RPMTAG_VERSION].decode("utf-8")

-         nvr.release = package.header[rpm.RPMTAG_RELEASE].decode("utf-8")

-         nvr.name = package.header[rpm.RPMTAG_NAME].decode("utf-8")

+         nvr.version = package.header[rpm.RPMTAG_VERSION]

+         nvr.release = package.header[rpm.RPMTAG_RELEASE]

+         nvr.name = package.header[rpm.RPMTAG_NAME]

          return nvr

  

      def get_files(self, pkg_name=None):
@@ -309,7 +310,7 @@ 

              return self._parse_files(pkg_name)

          if files is None:

              return None

-         return [l for l in [f.strip() for f in files.decode("utf-8").split("\n")] if l]

+         return [l for l in [f.strip() for f in files.split("\n")] if l]

  

      def get_section(self, section, raw=False):

          """

file modified
+1 -1
@@ -36,7 +36,7 @@ 

  

  def _proper_dist_os():

      """ Return true if we can create a dist on this OS. """

-     uname_r = check_output(["uname", "-r"]).decode("utf-8")

+     uname_r = check_output(["uname", "-r"], encoding="utf-8")

      if "el6" in uname_r:

          return False

      return True

file modified
+4 -6
@@ -515,13 +515,11 @@ 

          self.assertEqual(spec.version, "1.0")

          dist = Mock.get_macro("%dist", None, {})

          self.assertEqual(spec.release, "1" + dist)

-         self.assertEqual(spec.expand_tag("Release").decode("utf-8"), "1" + dist)

-         self.assertEqual(spec.expand_tag("License").decode("utf-8"), "GPLv2+")

-         self.assertEqual(

-             spec.expand_tag("Group").decode("utf-8"), "Development/Languages"

-         )

+         self.assertEqual(spec.expand_tag("Release"), "1" + dist)

+         self.assertEqual(spec.expand_tag("License"), "GPLv2+")

+         self.assertEqual(spec.expand_tag("Group"), "Development/Languages")

          # Test rpm value not there

-         self.assertEqual(spec.expand_tag("PreReq").decode("utf-8"), None)

+         self.assertEqual(spec.expand_tag("PreReq"), None)

          # Test get sections

          expected = ["rm -rf $RPM_BUILD_ROOT"]

          self.assertEqual(spec.get_section("%clean"), expected)

file modified
+1 -2
@@ -163,8 +163,7 @@ 

      def test_version(self):

          """ test --version option. """

          cmd = srcpath.REVIEW_PATH + " --version"

-         output = check_output(cmd, shell=True)

-         output = output.decode("utf-8")

+         output = check_output(cmd, shell=True, encoding="utf-8")

          self.assertIn("fedora-review", output)

          self.assertIn(VERSION, output)