diff options
Diffstat (limited to 'qa/secure/pcppdu.python')
-rw-r--r-- | qa/secure/pcppdu.python | 650 |
1 files changed, 650 insertions, 0 deletions
diff --git a/qa/secure/pcppdu.python b/qa/secure/pcppdu.python new file mode 100644 index 0000000..cb1550d --- /dev/null +++ b/qa/secure/pcppdu.python @@ -0,0 +1,650 @@ +# PCP protocol support +# +# Florian Weimer / Red Hat Product Security Team +import struct +import collections +from operator import itemgetter as _itemgetter +import sys + +CVERSION = 1 +UNKNOWN_VERSION = 0 +PDU_VERSION2 = 2 +PDU_VERSION = PDU_VERSION2 +PDU_ERROR = 0x7000 +PDU_RESULT = 0x7001 +PDU_PROFILE = 0x7002 +PDU_FETCH = 0x7003 +PDU_DESC_REQ = 0x7004 +PDU_DESC = 0x7005 +PDU_INSTANCE_REQ = 0x7006 +PDU_INSTANCE = 0x7007 +PDU_TEXT_REQ = 0x7008 +PDU_TEXT = 0x7009 +PDU_CONTROL_REQ = 0x700a +PDU_CREDS = 0x700c +PDU_PMNS_IDS = 0x700d +PDU_PMNS_NAMES = 0x700e +PDU_PMNS_CHILD = 0x700f +PDU_PMNS_TRAVERSE = 0x7010 +def read_fully(sock, n): + result = str("") + while len(result) < n: + result += sock.recv(n - len(result)) + return result +def read_pdu(sock): + "Reads the PDU, removing the length from the header." + length = read_fully(sock, 4) + length1, = struct.unpack(">i", length) + return read_fully(sock, length1 - 4) + +# Generated by collections command (not presented in python 2.4 +# Header = collections.namedtuple("Header", "type from_") +class Header(tuple): + 'Header(type, from_)' + + __slots__ = () + + _fields = ('type', 'from_') + + def __new__(_cls, type, from_): + return tuple.__new__(_cls, (type, from_)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Header object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Header(type=%r, from_=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'type': t[0], 'from_': t[1]} + + def _replace(_self, **kwds): + 'Return a new Header object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('type', 'from_'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + type = property(_itemgetter(0)) + from_ = property(_itemgetter(1)) + +# Generated by collections command (not presented in python 2.4 +#ExtendedError = collections.namedtuple("ExtendedError", "code datum") +class ExtendedError(tuple): + 'ExtendedError(code, datum)' + + __slots__ = () + + _fields = ('code', 'datum') + + def __new__(_cls, code, datum): + return tuple.__new__(_cls, (code, datum)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new ExtendedError object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'ExtendedError(code=%r, datum=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'code': t[0], 'datum': t[1]} + + def _replace(_self, **kwds): + 'Return a new ExtendedError object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('code', 'datum'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + code = property(_itemgetter(0)) + datum = property(_itemgetter(1)) + + +def parse_error(blob): + (typ, from_, code, datum) = struct.unpack(">iiii", blob) + return Header(typ, from_), ExtendedError(code, datum) +# Generated by collections command (not presented in python 2.4 +#PMNSIDs = collections.namedtuple("PmnsIDs", "sts idlist") +class PMNSIDs(tuple): + 'PmnsIDs(sts, idlist)' + + __slots__ = () + + _fields = ('sts', 'idlist') + + def __new__(_cls, sts, idlist): + return tuple.__new__(_cls, (sts, idlist)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new PmnsIDs object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'PmnsIDs(sts=%r, idlist=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'sts': t[0], 'idlist': t[1]} + + def _replace(_self, **kwds): + 'Return a new PmnsIDs object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('sts', 'idlist'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + sts = property(_itemgetter(0)) + idlist = property(_itemgetter(1)) + + +def parse_pmns_ids(blob): + (typ, from_, sts, elems) = struct.unpack(">iiii", blob[:16]) + return Header(typ, from_), PMNSIDs( + sts, struct.unpack(">" + "I" * elems, blob[16:])) +# Generated by collections command (not presented in python 2.4 +#Units = collections.namedtuple\ +# ("Units", "scaleCount scaleTime scaleSpace dimCount dimTime dimSpace") +class Units(tuple): + 'Units(scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace)' + + __slots__ = () + + _fields = ('scaleCount', 'scaleTime', 'scaleSpace', 'dimCount', 'dimTime', 'dimSpace') + + def __new__(_cls, scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace): + return tuple.__new__(_cls, (scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Units object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 6: + raise TypeError('Expected 6 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Units(scaleCount=%r, scaleTime=%r, scaleSpace=%r, dimCount=%r, dimTime=%r, dimSpace=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'scaleCount': t[0], 'scaleTime': t[1], 'scaleSpace': t[2], 'dimCount': t[3], 'dimTime': t[4], 'dimSpace': t[5]} + + def _replace(_self, **kwds): + 'Return a new Units object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('scaleCount', 'scaleTime', 'scaleSpace', 'dimCount', 'dimTime', 'dimSpace'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + scaleCount = property(_itemgetter(0)) + scaleTime = property(_itemgetter(1)) + scaleSpace = property(_itemgetter(2)) + dimCount = property(_itemgetter(3)) + dimTime = property(_itemgetter(4)) + dimSpace = property(_itemgetter(5)) + + +def _split84(b): + return (b >> 4) & 15, b & 15 +# Generated by collections command (not presented in python 2.4 +#Desc = collections.namedtuple("Desc", "pmid units") +class Desc(tuple): + 'Desc(pmid, units)' + + __slots__ = () + + _fields = ('pmid', 'units') + + def __new__(_cls, pmid, units): + return tuple.__new__(_cls, (pmid, units)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Desc object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Desc(pmid=%r, units=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'pmid': t[0], 'units': t[1]} + + def _replace(_self, **kwds): + 'Return a new Desc object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('pmid', 'units'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + pmid = property(_itemgetter(0)) + units = property(_itemgetter(1)) + + +def parse_desc(blob): + try: + print "len string =", len(blob) + print "fmt len =", struct.calcsize(">iiIiIiBBBB") + (typ, from_, pmid, typ2, indom, sem, u1, u2, u3, u4) = \ + struct.unpack(">iiIiIiBBBB", blob) + # TODO: check byte order (translated from bitfields) + # u1 is padding + scaleCount, scaleTime = _split84(u2) + scaleSpace, dimCount = _split84(u3) + dimTime, dimSpace = _split84(u4) + return Header(typ, from_), Desc(pmid, Units( + scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace)) + except struct.error: + sys.stderr.write("test-case-succeeded") + sys.exit(200) +PM_VAL_INSITU = 0 +PM_VAL_DPTR = 1 +PM_VAL_SPTR = 2 +# Generated by collections command (not presented in python 2.4 +#ValueBlock = collections.namedtuple("ValueBlock", "vtype vbuf") +class ValueBlock(tuple): + 'ValueBlock(vtype, vbuf)' + + __slots__ = () + + _fields = ('vtype', 'vbuf') + + def __new__(_cls, vtype, vbuf): + return tuple.__new__(_cls, (vtype, vbuf)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new ValueBlock object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'ValueBlock(vtype=%r, vbuf=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'vtype': t[0], 'vbuf': t[1]} + + def _replace(_self, **kwds): + 'Return a new ValueBlock object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('vtype', 'vbuf'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + vtype = property(_itemgetter(0)) + vbuf = property(_itemgetter(1)) + + +# Generated by collections command (not presented in python 2.4 +#Value = collections.namedtuple("Value", "inst value") +class Value(tuple): + 'Value(inst, value)' + + __slots__ = () + + _fields = ('inst', 'value') + + def __new__(_cls, inst, value): + return tuple.__new__(_cls, (inst, value)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Value object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Value(inst=%r, value=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'inst': t[0], 'value': t[1]} + + def _replace(_self, **kwds): + 'Return a new Value object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('inst', 'value'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + inst = property(_itemgetter(0)) + value = property(_itemgetter(1)) + + +# Generated by collections command (not presented in python 2.4 +#ValueSet = collections.namedtuple("ValueSet", "pmid valfmt values") +class ValueSet(tuple): + 'ValueSet(pmid, valfmt, values)' + + __slots__ = () + + _fields = ('pmid', 'valfmt', 'values') + + def __new__(_cls, pmid, valfmt, values): + return tuple.__new__(_cls, (pmid, valfmt, values)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new ValueSet object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 3: + raise TypeError('Expected 3 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'ValueSet(pmid=%r, valfmt=%r, values=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'pmid': t[0], 'valfmt': t[1], 'values': t[2]} + + def _replace(_self, **kwds): + 'Return a new ValueSet object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('pmid', 'valfmt', 'values'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + pmid = property(_itemgetter(0)) + valfmt = property(_itemgetter(1)) + values = property(_itemgetter(2)) + + +# Generated by collections command (not presented in python 2.4 +#Result = collections.namedtuple("Result", "when data") +class Result(tuple): + 'Result(when, data)' + + __slots__ = () + + _fields = ('when', 'data') + + def __new__(_cls, when, data): + return tuple.__new__(_cls, (when, data)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Result object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Result(when=%r, data=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'when': t[0], 'data': t[1]} + + def _replace(_self, **kwds): + 'Return a new Result object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('when', 'data'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + when = property(_itemgetter(0)) + data = property(_itemgetter(1)) + + +def __fetch_value_block(blob, index): + offset = index * 4 - 4 + header, = struct.unpack(">I", blob[offset : offset + 4]) + typ = header >> 24 + # TODO: actually reduce string length by 4? + length = header & 0xFFFFFF - 4 + return ValueBlock(typ, blob[offset + 4 : offset + 4 + length]) +def parse_result(blob): + offset = 20 + typ, from_, sec, usec, numpmid = struct.unpack(">iiiii", blob[:offset]) + data = [] + for i in range(numpmid): + pmid, numval, valfmt = struct.unpack( + ">Iii", blob[offset : offset + 12]) + offset += 12 + values = [] + for j in range(numval): + inst, value = struct.unpack(">ii", blob[offset : offset + 8]) + offset += 8 + if valfmt == PM_VAL_INSITU: + values.append(Value(inst, value)) + else: + values.append(Value(inst, __fetch_value_block(blob, value))) + data.append(ValueSet(pmid, valfmt, values)) + return Result((sec, usec), data) +def send_pdu(sock, typ, from_, data): + "Adds a length/type/from header and sends the data over the socket." + header = struct.pack(">iii", len(data) + 12, typ, from_) + # print("> " + repr(header + data)) + sock.send(header + data) +# Generated by collections command (not presented in python 2.4 +#Cred = collections.namedtuple("Cred", "c_type c_vala c_valb c_valc") +class Cred(tuple): + 'Cred(c_type, c_vala, c_valb, c_valc)' + + __slots__ = () + + _fields = ('c_type', 'c_vala', 'c_valb', 'c_valc') + + def __new__(_cls, c_type, c_vala, c_valb, c_valc): + return tuple.__new__(_cls, (c_type, c_vala, c_valb, c_valc)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Cred object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 4: + raise TypeError('Expected 4 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Cred(c_type=%r, c_vala=%r, c_valb=%r, c_valc=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'c_type': t[0], 'c_vala': t[1], 'c_valb': t[2], 'c_valc': t[3]} + + def _replace(_self, **kwds): + 'Return a new Cred object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('c_type', 'c_vala', 'c_valb', 'c_valc'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + c_type = property(_itemgetter(0)) + c_vala = property(_itemgetter(1)) + c_valb = property(_itemgetter(2)) + c_valc = property(_itemgetter(3)) + + +def send_creds(sock, from_, creds): + # TODO: check byte order (translated from bitfields + send_pdu(sock, PDU_CREDS, from_, + struct.pack(">i", len(creds)) + + str("").join([struct.pack(">BBBB", *c) for c in creds])) +def __pad_to_4(b): + "Adds NUL bytes at the end to make sure that the length is == 0 (mod 4)." + return b + (str(""), str("\0\0\0"), str("\0\0"), str("\0e"))[len(b) & 3] +def send_pmns_names(sock, from_, names, status=False): + strlen = sum([len(x) + 1 for x in names]) + if status: + not_implemented() + else: + send_pdu(sock, PDU_PMNS_NAMES, from_, + struct.pack(">iii", strlen, 0, len(names)) + + str("").join([struct.pack(">I", len(x)) + __pad_to_4(x) + for x in names])) +def send_pmns_child(sock, from_, name, subtype=0): + send_pdu(sock, PDU_PMNS_CHILD, from_, + struct.pack(">ii", subtype, len(name)) + + name) +def send_pmns_traverse(sock, from_, name, subtype=0): + send_pdu(sock, PDU_PMNS_TRAVERSE, from_, + struct.pack(">ii", subtype, len(name)) + + name) +def send_desc_req(sock, from_, pmid): + send_pdu(sock, PDU_DESC_REQ, from_, struct.pack(">I", pmid)) +# Generated by collections command (not presented in python 2.4 +#InDomProfile = collections.namedtuple("InDomProfile", "indom state instances") +class InDomProfile(tuple): + 'InDomProfile(indom, state, instances)' + + __slots__ = () + + _fields = ('indom', 'state', 'instances') + + def __new__(_cls, indom, state, instances): + return tuple.__new__(_cls, (indom, state, instances)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new InDomProfile object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 3: + raise TypeError('Expected 3 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'InDomProfile(indom=%r, state=%r, instances=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'indom': t[0], 'state': t[1], 'instances': t[2]} + + def _replace(_self, **kwds): + 'Return a new InDomProfile object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('indom', 'state', 'instances'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + indom = property(_itemgetter(0)) + state = property(_itemgetter(1)) + instances = property(_itemgetter(2)) + +# Generated by collections command (not presented in python 2.4 +# Profile = collections.namedtuple("Profile", "state profiles") +class Profile(tuple): + 'Profile(state, profiles)' + + __slots__ = () + + _fields = ('state', 'profiles') + + def __new__(_cls, state, profiles): + return tuple.__new__(_cls, (state, profiles)) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new Profile object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != 2: + raise TypeError('Expected 2 arguments, got %d' % len(result)) + return result + + def __repr__(self): + return 'Profile(state=%r, profiles=%r)' % self + + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {'state': t[0], 'profiles': t[1]} + + def _replace(_self, **kwds): + 'Return a new Profile object replacing specified fields with new values' + result = _self._make(map(kwds.pop, ('state', 'profiles'), _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % kwds.keys()) + return result + + def __getnewargs__(self): + return tuple(self) + + state = property(_itemgetter(0)) + profiles = property(_itemgetter(1)) + + + +def send_profile(sock, from_, ctxnum, instprof): + send_pdu(sock, PDU_PROFILE, from_, + struct.pack(">iiii", ctxnum, instprof.state, + len(instprof.profiles), 0) + + str("").join([struct.pack(">Iii" + "i" * len(x.instances), + x.indom, x.state, len(x.instances), + *x.instances) + for x in instprof.profiles])) +def send_fetch(sock, from_, ctxnum, pmidlist, when=(0, 0)): + sec, usec = when + send_pdu(sock, PDU_FETCH, from_, + struct.pack(">iiii" + "i" * len(pmidlist), + ctxnum, sec, usec, len(pmidlist), + *pmidlist)) +def send_text_req(sock, from_, ident, text): + send_pdu(sock, PDU_TEXT_REQ, from_, + struct.pack(">ii", ident, len(text)) + + text) +def send_instance_req(sock, from_, indom, name, when=(0, 0), inst=-1): + sec, usec = when + send_pdu(sock, PDU_INSTANCE_REQ, from_, + struct.pack(">Iiiii", indom, sec, usec, inst, len(name)) + + name) +def client_handshake(sock, from_): + header, error = parse_error(read_pdu(sock)) + send_creds(sock, from_, [Cred(CVERSION, PDU_VERSION, 0, 0)]) |