summaryrefslogtreecommitdiff
path: root/python/netsnmp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/netsnmp/client.py')
-rw-r--r--python/netsnmp/client.py266
1 files changed, 266 insertions, 0 deletions
diff --git a/python/netsnmp/client.py b/python/netsnmp/client.py
new file mode 100644
index 0000000..6fc4b2e
--- /dev/null
+++ b/python/netsnmp/client.py
@@ -0,0 +1,266 @@
+import client_intf
+import string
+import re
+import types
+from sys import stderr
+
+# control verbosity of error output
+verbose = 1
+
+secLevelMap = { 'noAuthNoPriv':1, 'authNoPriv':2, 'authPriv':3 }
+
+def _parse_session_args(kargs):
+ sessArgs = {
+ 'Version':3,
+ 'DestHost':'localhost',
+ 'Community':'public',
+ 'Timeout':1000000,
+ 'Retries':3,
+ 'RemotePort':161,
+ 'LocalPort':0,
+ 'SecLevel':'noAuthNoPriv',
+ 'SecName':'initial',
+ 'PrivProto':'DEFAULT',
+ 'PrivPass':'',
+ 'AuthProto':'DEFAULT',
+ 'AuthPass':'',
+ 'ContextEngineId':'',
+ 'SecEngineId':'',
+ 'Context':'',
+ 'Engineboots':0,
+ 'Enginetime':0,
+ 'UseNumeric':0,
+ 'OurIdentity':'',
+ 'TheirIdentity':'',
+ 'TheirHostname':'',
+ 'TrustCert':''
+ }
+ keys = kargs.keys()
+ for key in keys:
+ if sessArgs.has_key(key):
+ sessArgs[key] = kargs[key]
+ else:
+ print >>stderr, "ERROR: unknown key", key
+ return sessArgs
+
+def STR(obj):
+ if obj != None:
+ obj = str(obj)
+ return obj
+
+
+class Varbind(object):
+ def __init__(self, tag=None, iid=None, val=None, type=None):
+ self.tag = STR(tag)
+ self.iid = STR(iid)
+ self.val = STR(val)
+ self.type = STR(type)
+ # parse iid out of tag if needed
+ if iid == None and tag != None:
+ regex = re.compile(r'^((?:\.\d+)+|(?:\w+(?:[-:]*\w+)+))\.?(.*)$')
+ match = regex.match(tag)
+ if match:
+ (self.tag, self.iid) = match.group(1,2)
+
+ def __setattr__(self, name, val):
+ self.__dict__[name] = STR(val)
+
+ def print_str(self):
+ return self.tag, self.iid, self.val, self.type
+
+
+class VarList(object):
+ def __init__(self, *vs):
+ self.varbinds = []
+
+ for var in vs:
+ if isinstance(var, netsnmp.client.Varbind):
+ self.varbinds.append(var)
+ else:
+ self.varbinds.append(Varbind(var))
+
+ def __len__(self):
+ return len(self.varbinds)
+
+ def __getitem__(self, index):
+ return self.varbinds[index]
+
+ def __setitem__(self, index, val):
+ if isinstance(val, netsnmp.client.Varbind):
+ self.varbinds[index] = val
+ else:
+ raise TypeError
+
+ def __iter__(self):
+ return iter(self.varbinds)
+
+ def __delitem__(self, index):
+ del self.varbinds[index]
+
+ def __repr__(self):
+ return repr(self.varbinds)
+
+ def __getslice__(self, i, j):
+ return self.varbinds[i:j]
+
+ def append(self, *vars):
+ for var in vars:
+ if isinstance(var, netsnmp.client.Varbind):
+ self.varbinds.append(var)
+ else:
+ raise TypeError
+
+
+
+class Session(object):
+ def __init__(self, **args):
+ self.sess_ptr = None
+ self.UseLongNames = 0
+ self.UseNumeric = 0
+ self.UseSprintValue = 0
+ self.UseEnums = 0
+ self.BestGuess = 0
+ self.RetryNoSuch = 0
+ self.ErrorStr = ''
+ self.ErrorNum = 0
+ self.ErrorInd = 0
+
+ sess_args = _parse_session_args(args)
+
+ for k,v in sess_args.items():
+ self.__dict__[k] = v
+
+
+ # check for transports that may be tunneled
+ transportCheck = re.compile('^(tls|dtls|ssh)');
+ match = transportCheck.match(sess_args['DestHost'])
+
+ if match:
+ self.sess_ptr = client_intf.session_tunneled(
+ sess_args['Version'],
+ sess_args['DestHost'],
+ sess_args['LocalPort'],
+ sess_args['Retries'],
+ sess_args['Timeout'],
+ sess_args['SecName'],
+ secLevelMap[sess_args['SecLevel']],
+ sess_args['ContextEngineId'],
+ sess_args['Context'],
+ sess_args['OurIdentity'],
+ sess_args['TheirIdentity'],
+ sess_args['TheirHostname'],
+ sess_args['TrustCert'],
+ );
+ elif sess_args['Version'] == 3:
+ self.sess_ptr = client_intf.session_v3(
+ sess_args['Version'],
+ sess_args['DestHost'],
+ sess_args['LocalPort'],
+ sess_args['Retries'],
+ sess_args['Timeout'],
+ sess_args['SecName'],
+ secLevelMap[sess_args['SecLevel']],
+ sess_args['SecEngineId'],
+ sess_args['ContextEngineId'],
+ sess_args['Context'],
+ sess_args['AuthProto'],
+ sess_args['AuthPass'],
+ sess_args['PrivProto'],
+ sess_args['PrivPass'],
+ sess_args['Engineboots'],
+ sess_args['Enginetime'])
+ else:
+ self.sess_ptr = client_intf.session(
+ sess_args['Version'],
+ sess_args['Community'],
+ sess_args['DestHost'],
+ sess_args['LocalPort'],
+ sess_args['Retries'],
+ sess_args['Timeout'])
+
+ def get(self, varlist):
+ res = client_intf.get(self, varlist)
+ return res
+
+ def set(self, varlist):
+ res = client_intf.set(self, varlist)
+ return res
+
+ def getnext(self, varlist):
+ res = client_intf.getnext(self, varlist)
+ return res
+
+ def getbulk(self, nonrepeaters, maxrepetitions, varlist):
+ if self.Version == 1:
+ return None
+ res = client_intf.getbulk(self, nonrepeaters, maxrepetitions, varlist)
+ return res
+
+ def walk(self, varlist):
+ res = client_intf.walk(self, varlist)
+ return res
+
+ def __del__(self):
+ res = client_intf.delete_session(self)
+ return res
+
+import netsnmp
+
+def snmpget(*args, **kargs):
+ sess = Session(**kargs)
+ var_list = VarList()
+ for arg in args:
+ if isinstance(arg, netsnmp.client.Varbind):
+ var_list.append(arg)
+ else:
+ var_list.append(Varbind(arg))
+ res = sess.get(var_list)
+ return res
+
+def snmpset(*args, **kargs):
+ sess = Session(**kargs)
+ var_list = VarList()
+ for arg in args:
+ if isinstance(arg, netsnmp.client.Varbind):
+ var_list.append(arg)
+ else:
+ var_list.append(Varbind(arg))
+ res = sess.set(var_list)
+ return res
+
+def snmpgetnext(*args, **kargs):
+ sess = Session(**kargs)
+ var_list = VarList()
+ for arg in args:
+ if isinstance(arg, netsnmp.client.Varbind):
+ var_list.append(arg)
+ else:
+ var_list.append(Varbind(arg))
+ res = sess.getnext(var_list)
+ return res
+
+def snmpgetbulk(nonrepeaters, maxrepetitions,*args, **kargs):
+ sess = Session(**kargs)
+ var_list = VarList()
+ for arg in args:
+ if isinstance(arg, netsnmp.client.Varbind):
+ var_list.append(arg)
+ else:
+ var_list.append(Varbind(arg))
+ res = sess.getbulk(nonrepeaters, maxrepetitions, var_list)
+ return res
+
+def snmpwalk(*args, **kargs):
+ sess = Session(**kargs)
+ if isinstance(args[0], netsnmp.client.VarList):
+ var_list = args[0]
+ else:
+ var_list = VarList()
+ for arg in args:
+ if isinstance(arg, netsnmp.client.Varbind):
+ var_list.append(arg)
+ else:
+ var_list.append(Varbind(arg))
+ res = sess.walk(var_list)
+ return res
+