import client_intf import string import re import types from sys import stderr # control verbosity of error output verbose = 1 secLevelMap = { 'noAuthNoPriv':1, 'authNoPriv':2, 'authPriv':3 } def _parse_session_args(kargs): sessArgs = { 'Version':3, 'DestHost':'localhost', 'Community':'public', 'Timeout':1000000, 'Retries':3, 'RemotePort':161, 'LocalPort':0, 'SecLevel':'noAuthNoPriv', 'SecName':'initial', 'PrivProto':'DEFAULT', 'PrivPass':'', 'AuthProto':'DEFAULT', 'AuthPass':'', 'ContextEngineId':'', 'SecEngineId':'', 'Context':'', 'Engineboots':0, 'Enginetime':0, 'UseNumeric':0, 'OurIdentity':'', 'TheirIdentity':'', 'TheirHostname':'', 'TrustCert':'' } keys = kargs.keys() for key in keys: if sessArgs.has_key(key): sessArgs[key] = kargs[key] else: print >>stderr, "ERROR: unknown key", key return sessArgs def STR(obj): if obj != None: obj = str(obj) return obj class Varbind(object): def __init__(self, tag=None, iid=None, val=None, type=None): self.tag = STR(tag) self.iid = STR(iid) self.val = STR(val) self.type = STR(type) # parse iid out of tag if needed if iid == None and tag != None: regex = re.compile(r'^((?:\.\d+)+|(?:\w+(?:[-:]*\w+)+))\.?(.*)$') match = regex.match(tag) if match: (self.tag, self.iid) = match.group(1,2) def __setattr__(self, name, val): self.__dict__[name] = STR(val) def print_str(self): return self.tag, self.iid, self.val, self.type class VarList(object): def __init__(self, *vs): self.varbinds = [] for var in vs: if isinstance(var, netsnmp.client.Varbind): self.varbinds.append(var) else: self.varbinds.append(Varbind(var)) def __len__(self): return len(self.varbinds) def __getitem__(self, index): return self.varbinds[index] def __setitem__(self, index, val): if isinstance(val, netsnmp.client.Varbind): self.varbinds[index] = val else: raise TypeError def __iter__(self): return iter(self.varbinds) def __delitem__(self, index): del self.varbinds[index] def __repr__(self): return repr(self.varbinds) def __getslice__(self, i, j): return self.varbinds[i:j] def append(self, *vars): for var in vars: if isinstance(var, netsnmp.client.Varbind): self.varbinds.append(var) else: raise TypeError class Session(object): def __init__(self, **args): self.sess_ptr = None self.UseLongNames = 0 self.UseNumeric = 0 self.UseSprintValue = 0 self.UseEnums = 0 self.BestGuess = 0 self.RetryNoSuch = 0 self.ErrorStr = '' self.ErrorNum = 0 self.ErrorInd = 0 sess_args = _parse_session_args(args) for k,v in sess_args.items(): self.__dict__[k] = v # check for transports that may be tunneled transportCheck = re.compile('^(tls|dtls|ssh)'); match = transportCheck.match(sess_args['DestHost']) if match: self.sess_ptr = client_intf.session_tunneled( sess_args['Version'], sess_args['DestHost'], sess_args['LocalPort'], sess_args['Retries'], sess_args['Timeout'], sess_args['SecName'], secLevelMap[sess_args['SecLevel']], sess_args['ContextEngineId'], sess_args['Context'], sess_args['OurIdentity'], sess_args['TheirIdentity'], sess_args['TheirHostname'], sess_args['TrustCert'], ); elif sess_args['Version'] == 3: self.sess_ptr = client_intf.session_v3( sess_args['Version'], sess_args['DestHost'], sess_args['LocalPort'], sess_args['Retries'], sess_args['Timeout'], sess_args['SecName'], secLevelMap[sess_args['SecLevel']], sess_args['SecEngineId'], sess_args['ContextEngineId'], sess_args['Context'], sess_args['AuthProto'], sess_args['AuthPass'], sess_args['PrivProto'], sess_args['PrivPass'], sess_args['Engineboots'], sess_args['Enginetime']) else: self.sess_ptr = client_intf.session( sess_args['Version'], sess_args['Community'], sess_args['DestHost'], sess_args['LocalPort'], sess_args['Retries'], sess_args['Timeout']) def get(self, varlist): res = client_intf.get(self, varlist) return res def set(self, varlist): res = client_intf.set(self, varlist) return res def getnext(self, varlist): res = client_intf.getnext(self, varlist) return res def getbulk(self, nonrepeaters, maxrepetitions, varlist): if self.Version == 1: return None res = client_intf.getbulk(self, nonrepeaters, maxrepetitions, varlist) return res def walk(self, varlist): res = client_intf.walk(self, varlist) return res def __del__(self): res = client_intf.delete_session(self) return res import netsnmp def snmpget(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.get(var_list) return res def snmpset(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.set(var_list) return res def snmpgetnext(*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.getnext(var_list) return res def snmpgetbulk(nonrepeaters, maxrepetitions,*args, **kargs): sess = Session(**kargs) var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.getbulk(nonrepeaters, maxrepetitions, var_list) return res def snmpwalk(*args, **kargs): sess = Session(**kargs) if isinstance(args[0], netsnmp.client.VarList): var_list = args[0] else: var_list = VarList() for arg in args: if isinstance(arg, netsnmp.client.Varbind): var_list.append(arg) else: var_list.append(Varbind(arg)) res = sess.walk(var_list) return res