diff options
Diffstat (limited to 'qa/secure/pcppdu.python')
| -rw-r--r-- | qa/secure/pcppdu.python | 650 | 
1 files changed, 650 insertions, 0 deletions
| diff --git a/qa/secure/pcppdu.python b/qa/secure/pcppdu.python new file mode 100644 index 0000000..cb1550d --- /dev/null +++ b/qa/secure/pcppdu.python @@ -0,0 +1,650 @@ +# PCP protocol support +# +# Florian Weimer / Red Hat Product Security Team +import struct +import collections +from operator import itemgetter as _itemgetter +import sys + +CVERSION           = 1 +UNKNOWN_VERSION    = 0 +PDU_VERSION2       = 2 +PDU_VERSION        = PDU_VERSION2 +PDU_ERROR          = 0x7000 +PDU_RESULT         = 0x7001 +PDU_PROFILE        = 0x7002 +PDU_FETCH          = 0x7003 +PDU_DESC_REQ       = 0x7004 +PDU_DESC           = 0x7005 +PDU_INSTANCE_REQ   = 0x7006 +PDU_INSTANCE       = 0x7007 +PDU_TEXT_REQ       = 0x7008 +PDU_TEXT           = 0x7009 +PDU_CONTROL_REQ    = 0x700a +PDU_CREDS          = 0x700c +PDU_PMNS_IDS       = 0x700d +PDU_PMNS_NAMES     = 0x700e +PDU_PMNS_CHILD     = 0x700f +PDU_PMNS_TRAVERSE  = 0x7010 +def read_fully(sock, n): +    result = str("") +    while len(result) < n: +        result += sock.recv(n - len(result)) +    return result +def read_pdu(sock): +    "Reads the PDU, removing the length from the header." +    length = read_fully(sock, 4) +    length1, = struct.unpack(">i", length) +    return read_fully(sock, length1 - 4) + +# Generated by collections command (not presented in python 2.4 +# Header = collections.namedtuple("Header", "type from_") +class Header(tuple): +        'Header(type, from_)'  + +        __slots__ = ()  + +        _fields = ('type', 'from_')  + +        def __new__(_cls, type, from_): +            return tuple.__new__(_cls, (type, from_))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Header object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Header(type=%r, from_=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'type': t[0], 'from_': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new Header object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('type', 'from_'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        type = property(_itemgetter(0)) +        from_ = property(_itemgetter(1)) + +# Generated by collections command (not presented in python 2.4 +#ExtendedError = collections.namedtuple("ExtendedError", "code datum") +class ExtendedError(tuple): +        'ExtendedError(code, datum)'  + +        __slots__ = ()  + +        _fields = ('code', 'datum')  + +        def __new__(_cls, code, datum): +            return tuple.__new__(_cls, (code, datum))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new ExtendedError object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'ExtendedError(code=%r, datum=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'code': t[0], 'datum': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new ExtendedError object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('code', 'datum'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        code = property(_itemgetter(0)) +        datum = property(_itemgetter(1)) + + +def parse_error(blob): +    (typ, from_, code, datum) = struct.unpack(">iiii", blob) +    return Header(typ, from_), ExtendedError(code, datum) +# Generated by collections command (not presented in python 2.4 +#PMNSIDs = collections.namedtuple("PmnsIDs", "sts idlist") +class PMNSIDs(tuple): +        'PmnsIDs(sts, idlist)'  + +        __slots__ = ()  + +        _fields = ('sts', 'idlist')  + +        def __new__(_cls, sts, idlist): +            return tuple.__new__(_cls, (sts, idlist))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new PmnsIDs object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'PmnsIDs(sts=%r, idlist=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'sts': t[0], 'idlist': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new PmnsIDs object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('sts', 'idlist'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        sts = property(_itemgetter(0)) +        idlist = property(_itemgetter(1)) + + +def parse_pmns_ids(blob): +    (typ, from_, sts, elems) = struct.unpack(">iiii", blob[:16]) +    return Header(typ, from_), PMNSIDs( +        sts, struct.unpack(">" + "I" * elems, blob[16:])) +# Generated by collections command (not presented in python 2.4 +#Units = collections.namedtuple\ +#    ("Units", "scaleCount scaleTime scaleSpace dimCount dimTime dimSpace") +class Units(tuple): +        'Units(scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace)'  + +        __slots__ = ()  + +        _fields = ('scaleCount', 'scaleTime', 'scaleSpace', 'dimCount', 'dimTime', 'dimSpace')  + +        def __new__(_cls, scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace): +            return tuple.__new__(_cls, (scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Units object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 6: +                raise TypeError('Expected 6 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Units(scaleCount=%r, scaleTime=%r, scaleSpace=%r, dimCount=%r, dimTime=%r, dimSpace=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'scaleCount': t[0], 'scaleTime': t[1], 'scaleSpace': t[2], 'dimCount': t[3], 'dimTime': t[4], 'dimSpace': t[5]}  + +        def _replace(_self, **kwds): +            'Return a new Units object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('scaleCount', 'scaleTime', 'scaleSpace', 'dimCount', 'dimTime', 'dimSpace'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        scaleCount = property(_itemgetter(0)) +        scaleTime = property(_itemgetter(1)) +        scaleSpace = property(_itemgetter(2)) +        dimCount = property(_itemgetter(3)) +        dimTime = property(_itemgetter(4)) +        dimSpace = property(_itemgetter(5)) + + +def _split84(b): +    return (b >> 4) & 15, b & 15 +# Generated by collections command (not presented in python 2.4 +#Desc = collections.namedtuple("Desc", "pmid units") +class Desc(tuple): +        'Desc(pmid, units)'  + +        __slots__ = ()  + +        _fields = ('pmid', 'units')  + +        def __new__(_cls, pmid, units): +            return tuple.__new__(_cls, (pmid, units))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Desc object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Desc(pmid=%r, units=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'pmid': t[0], 'units': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new Desc object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('pmid', 'units'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        pmid = property(_itemgetter(0)) +        units = property(_itemgetter(1)) + + +def parse_desc(blob): +    try: +        print "len string =", len(blob) +        print "fmt len =", struct.calcsize(">iiIiIiBBBB") +        (typ, from_, pmid, typ2, indom, sem, u1, u2, u3, u4) = \ +            struct.unpack(">iiIiIiBBBB", blob) +        # TODO: check byte order (translated from bitfields) +        # u1 is padding +        scaleCount, scaleTime = _split84(u2) +        scaleSpace, dimCount = _split84(u3) +        dimTime, dimSpace = _split84(u4) +        return Header(typ, from_), Desc(pmid, Units( +                scaleCount, scaleTime, scaleSpace, dimCount, dimTime, dimSpace)) +    except struct.error: +        sys.stderr.write("test-case-succeeded") +        sys.exit(200) +PM_VAL_INSITU   = 0 +PM_VAL_DPTR     = 1 +PM_VAL_SPTR     = 2 +# Generated by collections command (not presented in python 2.4 +#ValueBlock = collections.namedtuple("ValueBlock", "vtype vbuf") +class ValueBlock(tuple): +        'ValueBlock(vtype, vbuf)'  + +        __slots__ = ()  + +        _fields = ('vtype', 'vbuf')  + +        def __new__(_cls, vtype, vbuf): +            return tuple.__new__(_cls, (vtype, vbuf))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new ValueBlock object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'ValueBlock(vtype=%r, vbuf=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'vtype': t[0], 'vbuf': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new ValueBlock object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('vtype', 'vbuf'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        vtype = property(_itemgetter(0)) +        vbuf = property(_itemgetter(1)) + + +# Generated by collections command (not presented in python 2.4 +#Value = collections.namedtuple("Value", "inst value") +class Value(tuple): +        'Value(inst, value)'  + +        __slots__ = ()  + +        _fields = ('inst', 'value')  + +        def __new__(_cls, inst, value): +            return tuple.__new__(_cls, (inst, value))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Value object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Value(inst=%r, value=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'inst': t[0], 'value': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new Value object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('inst', 'value'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        inst = property(_itemgetter(0)) +        value = property(_itemgetter(1)) + + +# Generated by collections command (not presented in python 2.4 +#ValueSet = collections.namedtuple("ValueSet", "pmid valfmt values") +class ValueSet(tuple): +        'ValueSet(pmid, valfmt, values)'  + +        __slots__ = ()  + +        _fields = ('pmid', 'valfmt', 'values')  + +        def __new__(_cls, pmid, valfmt, values): +            return tuple.__new__(_cls, (pmid, valfmt, values))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new ValueSet object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 3: +                raise TypeError('Expected 3 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'ValueSet(pmid=%r, valfmt=%r, values=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'pmid': t[0], 'valfmt': t[1], 'values': t[2]}  + +        def _replace(_self, **kwds): +            'Return a new ValueSet object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('pmid', 'valfmt', 'values'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        pmid = property(_itemgetter(0)) +        valfmt = property(_itemgetter(1)) +        values = property(_itemgetter(2)) + + +# Generated by collections command (not presented in python 2.4 +#Result = collections.namedtuple("Result", "when data") +class Result(tuple): +        'Result(when, data)'  + +        __slots__ = ()  + +        _fields = ('when', 'data')  + +        def __new__(_cls, when, data): +            return tuple.__new__(_cls, (when, data))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Result object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Result(when=%r, data=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'when': t[0], 'data': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new Result object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('when', 'data'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        when = property(_itemgetter(0)) +        data = property(_itemgetter(1)) + + +def __fetch_value_block(blob, index): +    offset  = index * 4 - 4 +    header, = struct.unpack(">I", blob[offset : offset + 4]) +    typ = header >> 24 +    # TODO: actually reduce string length by 4? +    length = header & 0xFFFFFF - 4 +    return ValueBlock(typ, blob[offset + 4 : offset + 4 + length]) +def parse_result(blob): +    offset = 20 +    typ, from_, sec, usec, numpmid = struct.unpack(">iiiii", blob[:offset]) +    data = [] +    for i in range(numpmid): +        pmid, numval, valfmt = struct.unpack( +            ">Iii", blob[offset : offset + 12]) +        offset += 12 +        values = [] +        for j in range(numval): +            inst, value = struct.unpack(">ii", blob[offset : offset + 8]) +            offset += 8 +            if valfmt == PM_VAL_INSITU: +                values.append(Value(inst, value)) +            else: +                values.append(Value(inst, __fetch_value_block(blob, value))) +        data.append(ValueSet(pmid, valfmt, values)) +    return Result((sec, usec), data) +def send_pdu(sock, typ, from_, data): +    "Adds a length/type/from header and sends the data over the socket." +    header = struct.pack(">iii", len(data) + 12, typ, from_) +    # print("> " + repr(header + data)) +    sock.send(header + data) +# Generated by collections command (not presented in python 2.4 +#Cred = collections.namedtuple("Cred", "c_type c_vala c_valb c_valc") +class Cred(tuple): +        'Cred(c_type, c_vala, c_valb, c_valc)'  + +        __slots__ = ()  + +        _fields = ('c_type', 'c_vala', 'c_valb', 'c_valc')  + +        def __new__(_cls, c_type, c_vala, c_valb, c_valc): +            return tuple.__new__(_cls, (c_type, c_vala, c_valb, c_valc))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Cred object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 4: +                raise TypeError('Expected 4 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Cred(c_type=%r, c_vala=%r, c_valb=%r, c_valc=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'c_type': t[0], 'c_vala': t[1], 'c_valb': t[2], 'c_valc': t[3]}  + +        def _replace(_self, **kwds): +            'Return a new Cred object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('c_type', 'c_vala', 'c_valb', 'c_valc'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        c_type = property(_itemgetter(0)) +        c_vala = property(_itemgetter(1)) +        c_valb = property(_itemgetter(2)) +        c_valc = property(_itemgetter(3)) + + +def send_creds(sock, from_, creds): +    # TODO: check byte order (translated from bitfields +    send_pdu(sock, PDU_CREDS, from_, +             struct.pack(">i", len(creds)) +             + str("").join([struct.pack(">BBBB", *c) for c in creds])) +def __pad_to_4(b): +    "Adds NUL bytes at the end to make sure that the length is == 0 (mod 4)." +    return b + (str(""), str("\0\0\0"), str("\0\0"), str("\0e"))[len(b) & 3] +def send_pmns_names(sock, from_, names, status=False): +    strlen = sum([len(x) + 1 for x in names]) +    if status: +        not_implemented() +    else: +        send_pdu(sock, PDU_PMNS_NAMES, from_, +                 struct.pack(">iii", strlen, 0, len(names)) +                 + str("").join([struct.pack(">I", len(x)) + __pad_to_4(x) +                             for x in names])) +def send_pmns_child(sock, from_, name, subtype=0): +    send_pdu(sock, PDU_PMNS_CHILD, from_, +             struct.pack(">ii", subtype, len(name)) +             + name) +def send_pmns_traverse(sock, from_, name, subtype=0): +    send_pdu(sock, PDU_PMNS_TRAVERSE, from_, +             struct.pack(">ii", subtype, len(name)) +             + name) +def send_desc_req(sock, from_, pmid): +    send_pdu(sock, PDU_DESC_REQ, from_, struct.pack(">I", pmid)) +# Generated by collections command (not presented in python 2.4 +#InDomProfile = collections.namedtuple("InDomProfile", "indom state instances") +class InDomProfile(tuple): +        'InDomProfile(indom, state, instances)'  + +        __slots__ = ()  + +        _fields = ('indom', 'state', 'instances')  + +        def __new__(_cls, indom, state, instances): +            return tuple.__new__(_cls, (indom, state, instances))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new InDomProfile object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 3: +                raise TypeError('Expected 3 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'InDomProfile(indom=%r, state=%r, instances=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'indom': t[0], 'state': t[1], 'instances': t[2]}  + +        def _replace(_self, **kwds): +            'Return a new InDomProfile object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('indom', 'state', 'instances'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        indom = property(_itemgetter(0)) +        state = property(_itemgetter(1)) +        instances = property(_itemgetter(2)) + +# Generated by collections command (not presented in python 2.4 +# Profile = collections.namedtuple("Profile", "state profiles") +class Profile(tuple): +        'Profile(state, profiles)'  + +        __slots__ = ()  + +        _fields = ('state', 'profiles')  + +        def __new__(_cls, state, profiles): +            return tuple.__new__(_cls, (state, profiles))  + +        @classmethod +        def _make(cls, iterable, new=tuple.__new__, len=len): +            'Make a new Profile object from a sequence or iterable' +            result = new(cls, iterable) +            if len(result) != 2: +                raise TypeError('Expected 2 arguments, got %d' % len(result)) +            return result  + +        def __repr__(self): +            return 'Profile(state=%r, profiles=%r)' % self  + +        def _asdict(t): +            'Return a new dict which maps field names to their values' +            return {'state': t[0], 'profiles': t[1]}  + +        def _replace(_self, **kwds): +            'Return a new Profile object replacing specified fields with new values' +            result = _self._make(map(kwds.pop, ('state', 'profiles'), _self)) +            if kwds: +                raise ValueError('Got unexpected field names: %r' % kwds.keys()) +            return result  + +        def __getnewargs__(self): +            return tuple(self)  + +        state = property(_itemgetter(0)) +        profiles = property(_itemgetter(1)) + + + +def send_profile(sock, from_, ctxnum, instprof): +    send_pdu(sock, PDU_PROFILE, from_, +             struct.pack(">iiii", ctxnum, instprof.state, +                         len(instprof.profiles), 0) +             + str("").join([struct.pack(">Iii" + "i" * len(x.instances), +                                     x.indom, x.state, len(x.instances), +                                     *x.instances) +                         for x in instprof.profiles])) +def send_fetch(sock, from_, ctxnum, pmidlist, when=(0, 0)): +    sec, usec = when +    send_pdu(sock, PDU_FETCH, from_, +             struct.pack(">iiii" + "i" * len(pmidlist), +                         ctxnum, sec, usec, len(pmidlist), +                         *pmidlist)) +def send_text_req(sock, from_, ident, text): +    send_pdu(sock, PDU_TEXT_REQ, from_, +             struct.pack(">ii", ident, len(text)) +             + text) +def send_instance_req(sock, from_, indom, name, when=(0, 0), inst=-1): +    sec, usec = when +    send_pdu(sock, PDU_INSTANCE_REQ, from_, +             struct.pack(">Iiiii", indom, sec, usec, inst, len(name)) +             + name) +def client_handshake(sock, from_): +    header, error = parse_error(read_pdu(sock)) +    send_creds(sock, from_, [Cred(CVERSION, PDU_VERSION, 0, 0)]) | 
