| |
@@ -669,13 +669,12 @@
|
| |
|
| |
# see Maximum RPM Appendix A: Format of the RPM File
|
| |
|
| |
- def __init__(self, data):
|
| |
- if rpm is None:
|
| |
- raise GenericError("rpm's python bindings are not installed")
|
| |
+ def __init__(self, data, decode=False):
|
| |
if data[0:3] != RPM_HEADER_MAGIC:
|
| |
raise GenericError("Invalid rpm header: bad magic: %r" % (data[0:3],))
|
| |
self.header = data
|
| |
self._index()
|
| |
+ self.decode = decode
|
| |
|
| |
def version(self):
|
| |
# fourth byte is the version
|
| |
@@ -703,7 +702,7 @@
|
| |
self.datalen = dl
|
| |
self.index = index
|
| |
|
| |
- def dump(self):
|
| |
+ def dump(self, sig=None):
|
| |
print("HEADER DUMP:")
|
| |
# calculate start of store
|
| |
il = len(self.index)
|
| |
@@ -714,35 +713,84 @@
|
| |
# sort entries by offset, dtype
|
| |
# also rearrange: tag, dtype, offset, count -> offset, dtype, tag, count
|
| |
order = sorted([(x[2], x[1], x[0], x[3]) for x in six.itervalues(self.index)])
|
| |
- next = store
|
| |
# map some rpmtag codes
|
| |
tags = {}
|
| |
- for name, code in six.iteritems(rpm.__dict__):
|
| |
- if name.startswith('RPMTAG_') and isinstance(code, int):
|
| |
- tags[code] = name[7:].lower()
|
| |
+ if rpm:
|
| |
+ for name, code in six.iteritems(rpm.__dict__):
|
| |
+ if name.startswith('RPMTAG_') and isinstance(code, int):
|
| |
+ tags[code] = name[7:].lower()
|
| |
+ else:
|
| |
+ print("rpm's python bindings are not installed. Unable to convert tag codes")
|
| |
+ if sig is None:
|
| |
+ # detect whether this is a signature header
|
| |
+ sig = bool(self.get(RPM_TAG_HEADERSIGNATURES))
|
| |
+ if sig:
|
| |
+ print("Parsing as a signature header")
|
| |
+ # signature headers have a few different values
|
| |
+ # the SIGTAG_* values are not exposed in the python api
|
| |
+ # see rpmtag.h
|
| |
+ tags[1000] = 'size'
|
| |
+ tags[1001] = 'lemd5_1'
|
| |
+ tags[1002] = 'pgp'
|
| |
+ tags[1003] = 'lemd5_2'
|
| |
+ tags[1004] = 'md5'
|
| |
+ tags[1005] = 'gpg'
|
| |
+ tags[1006] = 'pgp5'
|
| |
+ tags[1007] = 'payloadsize'
|
| |
+ tags[1008] = 'reservedspace'
|
| |
+ # expect first entry at start
|
| |
+ expected_ofs = store
|
| |
for entry in order:
|
| |
# tag, dtype, offset, count = entry
|
| |
offset, dtype, tag, count = entry
|
| |
pos = store + offset
|
| |
- if next is not None:
|
| |
- if pos > next:
|
| |
+ if expected_ofs is not None:
|
| |
+ # expected_ofs will be None after an unrecognized data type
|
| |
+ # integer types are byte aligned for their size
|
| |
+ align = None
|
| |
+ pad = 0
|
| |
+ if dtype == 3: # INT16
|
| |
+ align = 2
|
| |
+ elif dtype == 4: # INT32
|
| |
+ align = 4
|
| |
+ elif dtype == 5: # INT64
|
| |
+ align = 8
|
| |
+ if align:
|
| |
+ pad = (align - (expected_ofs % align)) % align
|
| |
+ expected_ofs += pad
|
| |
+ if pos > expected_ofs:
|
| |
print("** HOLE between entries")
|
| |
- print("Hex: %s" % hex_string(self.header[next:pos]))
|
| |
- print("Data: %r" % self.header[next:pos])
|
| |
- elif pos < next:
|
| |
+ print("Size: %d" % (pos - expected_ofs))
|
| |
+ print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))
|
| |
+ print("Data: %r" % self.header[expected_ofs:pos])
|
| |
+ print("Padding: %i" % pad)
|
| |
+ print("Expected offset: 0x%x" % (expected_ofs - store))
|
| |
+ elif pad and pos == expected_ofs - pad:
|
| |
+ print("** Missing expected padding")
|
| |
+ print("Padding: %i" % pad)
|
| |
+ print("Expected offset: 0x%x" % (expected_ofs - store))
|
| |
+ elif pos < expected_ofs:
|
| |
print("** OVERLAPPING entries")
|
| |
- print("Tag: %d [%s], Type: %d, Offset: %x, Count: %d"
|
| |
+ print("Overlap size: %d" % (expected_ofs - pos))
|
| |
+ print("Expected offset: 0x%x" % (expected_ofs - store))
|
| |
+ elif pad:
|
| |
+ # pos == expected_ofs
|
| |
+ print("Alignment padding: %i" % pad)
|
| |
+ padbytes = self.header[pos - pad:pos]
|
| |
+ if padbytes != b'\0' * pad:
|
| |
+ print("NON-NULL padding bytes: %s" % hex_string(padbytes))
|
| |
+ print("Tag: %d [%s], Type: %d, Offset: 0x%x, Count: %d"
|
| |
% (tag, tags.get(tag, '?'), dtype, offset, count))
|
| |
if dtype == 0:
|
| |
# null
|
| |
print("[NULL entry]")
|
| |
- next = pos
|
| |
+ expected_ofs = pos
|
| |
elif dtype == 1:
|
| |
# char
|
| |
for i in range(count):
|
| |
print("Char: %r" % self.header[pos])
|
| |
pos += 1
|
| |
- next = pos
|
| |
+ expected_ofs = pos
|
| |
elif dtype >= 2 and dtype <= 5:
|
| |
# integer
|
| |
n = 1 << (dtype - 2)
|
| |
@@ -752,98 +800,145 @@
|
| |
num = multibyte(data)
|
| |
print("Int(%d): %d" % (n, num))
|
| |
pos += n
|
| |
- next = pos
|
| |
+ expected_ofs = pos
|
| |
elif dtype == 6:
|
| |
# string (null terminated)
|
| |
end = self.header.find(six.b('\0'), pos)
|
| |
+ value = self.header[pos:end]
|
| |
try:
|
| |
- print("String(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))
|
| |
- except ValueError:
|
| |
+ value = self.decode_bytes(value)
|
| |
+ except Exception:
|
| |
print('INVALID STRING')
|
| |
- print("String(%d): %r" % (end - pos, self.header[pos:end]))
|
| |
- raise
|
| |
- next = end + 1
|
| |
+ print("String(%d): %r" % (end - pos, value))
|
| |
+ expected_ofs = end + 1
|
| |
elif dtype == 7:
|
| |
print("Data: %s" % hex_string(self.header[pos:pos + count]))
|
| |
- next = pos + count
|
| |
+ expected_ofs = pos + count
|
| |
elif dtype == 8:
|
| |
# string array
|
| |
for i in range(count):
|
| |
end = self.header.find(six.b('\0'), pos)
|
| |
- print("String(%d): %r" % (end - pos, self.header[pos:end]))
|
| |
+ value = self.header[pos:end]
|
| |
+ try:
|
| |
+ value = self.decode_bytes(value)
|
| |
+ except Exception:
|
| |
+ print('INVALID STRING')
|
| |
+ print("String(%d): %r" % (end - pos, value))
|
| |
pos = end + 1
|
| |
- next = pos
|
| |
+ expected_ofs = pos
|
| |
elif dtype == 9:
|
| |
- # unicode string array
|
| |
+ # i18n string array
|
| |
for i in range(count):
|
| |
end = self.header.find(six.b('\0'), pos)
|
| |
+ value = self.header[pos:end]
|
| |
try:
|
| |
- print("i18n(%d): %r" % (end - pos, _decode_item(self.header[pos:end])))
|
| |
+ value = self.decode_bytes(value)
|
| |
except Exception:
|
| |
print('INVALID STRING')
|
| |
- print("i18n(%d): %r" % (end - pos, self.header[pos:end]))
|
| |
+ print("i18n(%d): %r" % (end - pos, value))
|
| |
pos = end + 1
|
| |
- next = pos
|
| |
+ expected_ofs = pos
|
| |
else:
|
| |
- print("Skipping data type %x" % dtype)
|
| |
- next = None
|
| |
- if next is not None:
|
| |
+ print("Skipping data type 0x%x" % dtype)
|
| |
+ expected_ofs = None
|
| |
+ if expected_ofs is not None:
|
| |
pos = store + self.datalen
|
| |
- if next < pos:
|
| |
+ if expected_ofs < pos:
|
| |
print("** HOLE at end of data block")
|
| |
- print("Hex: %s" % hex_string(self.header[next:pos]))
|
| |
- print("Data: %r" % self.header[next:pos])
|
| |
- elif pos > next:
|
| |
+ print("Size: %d" % (pos - expected_ofs))
|
| |
+ print("Hex: %s" % hex_string(self.header[expected_ofs:pos]))
|
| |
+ print("Data: %r" % self.header[expected_ofs:pos])
|
| |
+ print("Offset: 0x%x" % self.datalen)
|
| |
+ elif pos > expected_ofs:
|
| |
print("** OVERFLOW in data block")
|
| |
+ print("Overflow size: %d" % (expected_ofs - pos))
|
| |
+ print("Offset: 0x%x" % self.datalen)
|
| |
+
|
| |
+ def decode_bytes(self, value):
|
| |
+ if six.PY2:
|
| |
+ return value
|
| |
+ else:
|
| |
+ return value.decode(errors='surrogateescape')
|
| |
|
| |
def __getitem__(self, key):
|
| |
tag, dtype, offset, count = self.index[key]
|
| |
assert tag == key
|
| |
return self._getitem(dtype, offset, count)
|
| |
|
| |
- def _getitem(self, dtype, offset, count):
|
| |
+ def _getitem(self, dtype, offset, count, decode=None):
|
| |
+ if decode is None:
|
| |
+ decode = self.decode
|
| |
# calculate start of store
|
| |
il = len(self.index)
|
| |
store = 16 + il * 16
|
| |
pos = store + offset
|
| |
if dtype >= 2 and dtype <= 5:
|
| |
- n = 1 << (dtype - 2)
|
| |
- # n-byte integer
|
| |
- data = [_ord(x) for x in self.header[pos:pos + n]]
|
| |
- return multibyte(data)
|
| |
+ values = []
|
| |
+ for _ in range(count):
|
| |
+ n = 1 << (dtype - 2)
|
| |
+ # n-byte integer
|
| |
+ data = [_ord(x) for x in self.header[pos:pos + n]]
|
| |
+ values.append(multibyte(data))
|
| |
+ pos += n
|
| |
+ return values
|
| |
+ elif dtype == 1:
|
| |
+ # char treated like int8
|
| |
+ return [_ord(c) for c in self.header[pos:pos + count]]
|
| |
elif dtype == 6:
|
| |
# string (null terminated)
|
| |
- end = self.header.find('\0', pos)
|
| |
- return self.header[pos:end]
|
| |
+ end = self.header.find(six.b('\0'), pos)
|
| |
+ value = self.header[pos:end]
|
| |
+ if decode:
|
| |
+ value = self.decode_bytes(value)
|
| |
+ return value
|
| |
elif dtype == 7:
|
| |
# raw data
|
| |
return self.header[pos:pos + count]
|
| |
elif dtype == 8:
|
| |
# string array
|
| |
result = []
|
| |
- for i in range(count):
|
| |
+ for _ in range(count):
|
| |
end = self.header.find(six.b('\0'), pos)
|
| |
- result.append(self.header[pos:end])
|
| |
+ value = self.header[pos:end]
|
| |
+ if decode:
|
| |
+ value = self.decode_bytes(value)
|
| |
+ result.append(value)
|
| |
pos = end + 1
|
| |
return result
|
| |
elif dtype == 9:
|
| |
- # unicode string array
|
| |
+ # i18n string array
|
| |
+ # note that we do not apply localization
|
| |
result = []
|
| |
- for i in range(count):
|
| |
+ for _ in range(count):
|
| |
end = self.header.find(six.b('\0'), pos)
|
| |
- result.append(_decode_item(self.header[pos:end]))
|
| |
+ value = self.header[pos:end]
|
| |
+ if decode:
|
| |
+ value = self.decode_bytes(value)
|
| |
+ result.append(value)
|
| |
pos = end + 1
|
| |
return result
|
| |
else:
|
| |
- # XXX - not all valid data types are handled
|
| |
- raise GenericError("Unable to read header data type: %x" % dtype)
|
| |
-
|
| |
- def get(self, key, default=None):
|
| |
+ raise GenericError("Unknown header data type: %x" % dtype)
|
| |
+
|
| |
+ def get(self, key, default=None, decode=None, single=False):
|
| |
+ # With decode on, we will _mostly_ return the same value that rpmlib will.
|
| |
+ # There are exceptions where rpmlib will automatically translate or update values, e.g.
|
| |
+ # * fields that rpm treats as scalars
|
| |
+ # * special tags like Headerimmutable
|
| |
+ # * i18n string translations
|
| |
+ # * the Fileclass extension tag that overlaps a concrete tag
|
| |
+ # * auto converting PREINPROG/POSTINPROG/etc to string arrays for older rpms
|
| |
entry = self.index.get(key)
|
| |
if entry is None:
|
| |
return default
|
| |
else:
|
| |
- return self._getitem(*entry[1:])
|
| |
+ value = self._getitem(*entry[1:], decode=decode)
|
| |
+ if single and isinstance(value, list):
|
| |
+ if len(value) == 1:
|
| |
+ return value[0]
|
| |
+ else:
|
| |
+ raise ValueError('single value requested for array at key %s' % key)
|
| |
+ return value
|
| |
|
| |
|
| |
def rip_rpm_sighdr(src):
|
| |
While looking at #3690 I ended up using
RawHeader.dump()
to do some analysis, and this prompted me to fix a few minor issues there.This also includes fixes from #3714
get
: char, string array, i18n arrayFixes https://pagure.io/koji/issue/3713