#3703 RawHeader improvements
Merged a year ago by tkopecek. Opened a year ago by mikem.
mikem/koji sig-header-fields  into  master

file modified
+148 -53
@@ -669,13 +669,12 @@ 

  

      # see Maximum RPM Appendix A: Format of the RPM File

  

-     def __init__(self, data):

-         if rpm is None:

-             raise GenericError("rpm's python bindings are not installed")

+     def __init__(self, data, decode=False):

          if data[0:3] != RPM_HEADER_MAGIC:

              raise GenericError("Invalid rpm header: bad magic: %r" % (data[0:3],))

          self.header = data

          self._index()

+         self.decode = decode

  

      def version(self):

          # fourth byte is the version
@@ -703,7 +702,7 @@ 

          self.datalen = dl

          self.index = index

  

-     def dump(self):

+     def dump(self, sig=None):

          print("HEADER DUMP:")

          # calculate start of store

          il = len(self.index)
@@ -714,35 +713,84 @@ 

          # sort entries by offset, dtype

          # also rearrange: tag, dtype, offset, count -> offset, dtype, tag, count

          order = sorted([(x[2], x[1], x[0], x[3]) for x in six.itervalues(self.index)])

-         next = store

          # map some rpmtag codes

          tags = {}

-         for name, code in six.iteritems(rpm.__dict__):

-             if name.startswith('RPMTAG_') and isinstance(code, int):

-                 tags[code] = name[7:].lower()

+         if rpm:

+             for name, code in six.iteritems(rpm.__dict__):

+                 if name.startswith('RPMTAG_') and isinstance(code, int):

+                     tags[code] = name[7:].lower()

+         else:

+             print("rpm's python bindings are not installed. Unable to convert tag codes")

+         if sig is None:

+             # detect whether this is a signature header

+             sig = bool(self.get(RPM_TAG_HEADERSIGNATURES))

+         if sig:

+             print("Parsing as a signature header")

+             # signature headers have a few different values

+             # the SIGTAG_* values are not exposed in the python api

+             # see rpmtag.h

+             tags[1000] = 'size'

+             tags[1001] = 'lemd5_1'

+             tags[1002] = 'pgp'

+             tags[1003] = 'lemd5_2'

+             tags[1004] = 'md5'

+             tags[1005] = 'gpg'

+             tags[1006] = 'pgp5'

+             tags[1007] = 'payloadsize'

+             tags[1008] = 'reservedspace'

+         # expect first entry at start

+         expected_ofs = store

          for entry in order:

              # tag, dtype, offset, count = entry

              offset, dtype, tag, count = entry

              pos = store + offset

-             if next is not None:

-                 if pos > next:

+             if expected_ofs is not None:

+                 # expected_ofs will be None after an unrecognized data type

+                 # integer types are byte aligned for their size

+                 align = None

+                 pad = 0

+                 if dtype == 3:  # INT16

+                     align = 2

+                 elif dtype == 4:  # INT32

+                     align = 4

+                 elif dtype == 5:  # INT64

+                     align = 8

+                 if align:

+                     pad = (align - (expected_ofs % align)) % align

+                     expected_ofs += pad

+                 if pos > expected_ofs:

                      print("** HOLE between entries")

-                     print("Hex: %s" % hex_string(self.header[next:pos]))

-                     print("Data: %r" % self.header[next:pos])

-                 elif pos < next:

+                     print("Size: %d" % (pos - expected_ofs))

+                     print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))

+                     print("Data: %r" % self.header[expected_ofs:pos])

+                     print("Padding: %i" % pad)

+                     print("Expected offset: 0x%x" % (expected_ofs - store))

+                 elif pad and pos == expected_ofs - pad:

+                     print("** Missing expected padding")

+                     print("Padding: %i" % pad)

+                     print("Expected offset: 0x%x" % (expected_ofs - store))

+                 elif pos < expected_ofs:

                      print("** OVERLAPPING entries")

-             print("Tag: %d [%s], Type: %d, Offset: %x, Count: %d"

+                     print("Overlap size: %d" % (expected_ofs - pos))

+                     print("Expected offset: 0x%x" % (expected_ofs - store))

+                 elif pad:

+                     # pos == expected_ofs

+                     print("Alignment padding: %i" % pad)

+                     padbytes = self.header[pos - pad:pos]

+                     if padbytes != b'\0' * pad:

+                         print("NON-NULL padding bytes: %s" % hex_string(padbytes))

+             print("Tag: %d [%s], Type: %d, Offset: 0x%x, Count: %d"

                    % (tag, tags.get(tag, '?'), dtype, offset, count))

              if dtype == 0:

                  # null

                  print("[NULL entry]")

-                 next = pos

+                 expected_ofs = pos

              elif dtype == 1:

                  # char

                  for i in range(count):

                      print("Char: %r" % self.header[pos])

                      pos += 1

-                 next = pos

+                 expected_ofs = pos

              elif dtype >= 2 and dtype <= 5:

                  # integer

                  n = 1 << (dtype - 2)
@@ -752,98 +800,145 @@ 

                      num = multibyte(data)

                      print("Int(%d): %d" % (n, num))

                      pos += n

-                 next = pos

+                 expected_ofs = pos

              elif dtype == 6:

                  # string (null terminated)

                  end = self.header.find(six.b('\0'), pos)

+                 value = self.header[pos:end]

                  try:

-                     print("String(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))

-                 except ValueError:

+                     value = self.decode_bytes(value)

+                 except Exception:

                      print('INVALID STRING')

-                     print("String(%d): %r" % (end - pos, self.header[pos:end]))

-                     raise

-                 next = end + 1

+                 print("String(%d): %r" % (end - pos, value))

+                 expected_ofs = end + 1

              elif dtype == 7:

                  print("Data: %s" % hex_string(self.header[pos:pos + count]))

-                 next = pos + count

+                 expected_ofs = pos + count

              elif dtype == 8:

                  # string array

                  for i in range(count):

                      end = self.header.find(six.b('\0'), pos)

-                     print("String(%d): %r" % (end - pos, self.header[pos:end]))

+                     value = self.header[pos:end]

+                     try:

+                         value = self.decode_bytes(value)

+                     except Exception:

+                         print('INVALID STRING')

+                     print("String(%d): %r" % (end - pos, value))

                      pos = end + 1

-                 next = pos

+                 expected_ofs = pos

              elif dtype == 9:

-                 # unicode string array

+                 # i18n string array

                  for i in range(count):

                      end = self.header.find(six.b('\0'), pos)

+                     value = self.header[pos:end]

                      try:

-                         print("i18n(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))

+                         value = self.decode_bytes(value)

                      except Exception:

                          print('INVALID STRING')

-                         print("i18n(%d): %r" % (end - pos, self.header[pos:end]))

+                     print("i18n(%d): %r" % (end - pos, value))

                      pos = end + 1

-                 next = pos

+                 expected_ofs = pos

              else:

-                 print("Skipping data type %x" % dtype)

-                 next = None

-         if next is not None:

+                 print("Skipping data type 0x%x" % dtype)

+                 expected_ofs = None

+         if expected_ofs is not None:

              pos = store + self.datalen

-             if next < pos:

+             if expected_ofs < pos:

                  print("** HOLE at end of data block")

-                 print("Hex: %s" % hex_string(self.header[next:pos]))

-                 print("Data: %r" % self.header[next:pos])

-             elif pos > next:

+                 print("Size: %d" % (pos - expected_ofs))

+                 print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))

+                 print("Data: %r" % self.header[expected_ofs:pos])

+                 print("Offset: 0x%x" % self.datalen)

+             elif pos > expected_ofs:

                  print("** OVERFLOW in data block")

+                 print("Overflow size: %d" % (expected_ofs - pos))

+                 print("Offset: 0x%x" % self.datalen)

+ 

+     def decode_bytes(self, value):

+         if six.PY2:

+             return value

+         else:

+             return value.decode(errors='surrogateescape')

  

      def __getitem__(self, key):

          tag, dtype, offset, count = self.index[key]

          assert tag == key

          return self._getitem(dtype, offset, count)

  

-     def _getitem(self, dtype, offset, count):

+     def _getitem(self, dtype, offset, count, decode=None):

+         if decode is None:

+             decode = self.decode

          # calculate start of store

          il = len(self.index)

          store = 16 + il * 16

          pos = store + offset

          if dtype >= 2 and dtype <= 5:

-             n = 1 << (dtype - 2)

-             # n-byte integer

-             data = [_ord(x) for x in self.header[pos:pos + n]]

-             return multibyte(data)

+             values = []

+             for _ in range(count):

+                 n = 1 << (dtype - 2)

+                 # n-byte integer

+                 data = [_ord(x) for x in self.header[pos:pos + n]]

+                 values.append(multibyte(data))

+                 pos += n

+             return values

+         elif dtype == 1:

+             # char treated like int8

+             return [_ord(c) for c in self.header[pos:pos + count]]

          elif dtype == 6:

              # string (null terminated)

-             end = self.header.find('\0', pos)

-             return self.header[pos:end]

+             end = self.header.find(six.b('\0'), pos)

+             value = self.header[pos:end]

+             if decode:

+                 value = self.decode_bytes(value)

+             return value

          elif dtype == 7:

              # raw data

              return self.header[pos:pos + count]

          elif dtype == 8:

              # string array

              result = []

-             for i in range(count):

+             for _ in range(count):

                  end = self.header.find(six.b('\0'), pos)

-                 result.append(self.header[pos:end])

+                 value = self.header[pos:end]

+                 if decode:

+                     value = self.decode_bytes(value)

+                 result.append(value)

                  pos = end + 1

              return result

          elif dtype == 9:

-             # unicode string array

+             # i18n string array

+             # note that we do not apply localization

              result = []

-             for i in range(count):

+             for _ in range(count):

                  end = self.header.find(six.b('\0'), pos)

-                 result.append(_decode_item(self.header[pos:end]))

+                 value = self.header[pos:end]

+                 if decode:

+                     value = self.decode_bytes(value)

+                 result.append(value)

                  pos = end + 1

              return result

          else:

-             # XXX - not all valid data types are handled

-             raise GenericError("Unable to read header data type: %x" % dtype)

- 

-     def get(self, key, default=None):

+             raise GenericError("Unknown header data type: %x" % dtype)

+ 

+     def get(self, key, default=None, decode=None, single=False):

+         # With decode on, we will _mostly_ return the same value that rpmlib will.

+         # There are exceptions where rpmlib will automatically translate or update values, e.g.

+         # * fields that rpm treats as scalars

+         # * special tags like Headerimmutable

+         # * i18n string translations

+         # * the Fileclass extension tag that overlaps a concrete tag

+         # * auto converting PREINPROG/POSTINPROG/etc to string arrays for older rpms

          entry = self.index.get(key)

          if entry is None:

              return default

          else:

-             return self._getitem(*entry[1:])

+             value = self._getitem(*entry[1:], decode=decode)

+             if single and isinstance(value, list):

+                 if len(value) == 1:

+                     return value[0]

+                 else:

+                     raise ValueError('single value requested for array at key %s' % key)

+             return value

  

  

  def rip_rpm_sighdr(src):

@@ -45,11 +45,11 @@ 

              size = None

              try:

                  tag = rpm.RPMTAG_LONGSIGSIZE

-                 size = rh.get(tag)

+                 size = rh.get(tag, single=True)

              except NameError:

                  pass

              if size is None:

-                 size = rh.get(SIGTAG_SIZE)

+                 size = rh.get(SIGTAG_SIZE, single=True)

  

              # Expected file size

              calc_size = s_lead + s_sig + size

@@ -0,0 +1,37 @@ 

+ # coding=utf-8

+ from __future__ import absolute_import

+ import os.path

+ import unittest

+ 

+ import koji

+ 

+ 

+ class TestRawHeaderFields(unittest.TestCase):

+ 

+     RPMFILES = [

+         "test-deps-1-1.fc24.x86_64.rpm",

+         "test-files-1-1.fc27.noarch.rpm",

+         "test-nosrc-1-1.fc24.nosrc.rpm",

+         "test-deps-1-1.fc24.x86_64.rpm.signed",

+         "test-nopatch-1-1.fc24.nosrc.rpm",

+         "test-src-1-1.fc24.src.rpm",

+     ]

+ 

+     def test_header_sizes(self):

+         for basename in self.RPMFILES:

+             fn = os.path.join(os.path.dirname(__file__), 'data/rpms', basename)

+ 

+             rh = koji.RawHeader(koji.rip_rpm_hdr(fn))

+             hdr = koji.get_rpm_header(fn)

+ 

+             for key in rh.index:

+                 if key in (63, 1141):

+                     continue

+                 ours = rh.get(key, decode=True)

+                 theirs = hdr[key]

+                 if type(ours) != type(theirs):

+                     if isinstance(ours, list) and len(ours) == 1 and ours[0] == theirs:

+                         # rpm is presenting as a scalar

+                         continue

+                 # otherwise

+                 self.assertEqual(ours, theirs)

While looking at #3690 I ended up using RawHeader.dump() to do some analysis, and this prompted me to fix a few minor issues there.

  • display signature header tags correctly
  • handle int alignment padding
  • avoid masking built-in next function
  • somewhat more informative output
  • remove strict rpmlib dependency

This also includes fixes from #3714

  • decode option
  • support addition datatypes in get: char, string array, i18n array
  • fix handling for integer types

Fixes https://pagure.io/koji/issue/3713

7 new commits added

  • flake8 fix
  • use surrogateescape to decode in RawHeader
  • decode option in RawHeader, handle integer counts
  • flake8 fix
  • attempt to decode strings in dump
  • also support get for unicode string arrays
  • RawHeader.get can return also string lists
a year ago

Note: this conflicts with #3690 because RawHeader.dump() will now return a list of ints for RPMTAG_LONGSIGSIZE.

1 new commit added

  • single opt for get, plus simple unit test
a year ago

So, I'm a little puzzled about what to do for scalar header values. The RawHeader class is deliberately a lower level tool for parsing rpm header structure, whereas rpm itself applies several smarter layers of interpretation. In particular, rpm treats some header fields as scalars and others as arrays without any indication in the header apart from the header tag. E.g. buildtime is a scalar int32 and changelogtime is an int32 array, both both are represented as RPM_INT32_TYPE in the actual header.

We could have RawHeader vary its behavior here based on key, but that would require embedding a bunch of tag data from rpmlib. It's doable, but we'd need to keep it up to date (and it doesn't look like this aspect of the data is exposed in the python api).

To further complicate things, the RawHeader code before this PR had a bug for int values. It only returned the first int one, effectively treating even the int arrays as scalars.

The compromise solution I have here is to add a single option to RawHeader.get as a convenience for callers that want to fetch a value they expect to be singular.

It's also worth noting that while the rpm code treats RPM_I18NSTRING_TYPE as an array type like RPM_STRING_ARRAY_TYPE, all actual i18n tags are scalar (summary, description, group).

@tkopecek @puiterwijk any thoughts on the above?

I think that single option is ok for usecases we have.

rebased onto e88639195c9e235b1f8646d86ed232f1b2b74fa6

a year ago

rebased and added a fix for a unit test that is broken by the behavior change in get().

unit tests failing locally due to (unrelated) #3733

rebased onto 1b02bfc

a year ago

rebased to pull in unit test fix

Metadata Update from @tkopecek:
- Pull-request tagged with: no_qe

a year ago

Commit c0d537c fixes this pull-request

Pull-Request has been merged by tkopecek

a year ago