Source code for snimpy.snmp

#
# snimpy -- Interactive SNMP tool
#
# Copyright (C) Vincent Bernat <bernat@luffy.cx>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

"""
This module is a low-level interface to build SNMP requests, send
them and receive answers. It is built on top of pysnmp_ but the
exposed interface is far simpler. It is also far less complete and
there is an important dependency to the :mod:`basictypes` module for
type coercing.

.. _pysnmp: http://pysnmp.sourceforge.net/
"""

import re
import socket
import inspect
import threading
import ipaddress
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.proto import rfc1902, rfc1905
from pysnmp.smi import error


[docs]class SNMPException(Exception): """SNMP related base exception. All SNMP exceptions are inherited from this one. The inherited exceptions are named after the name of the corresponding SNMP error. """
[docs]class SNMPTooBig(SNMPException): pass
[docs]class SNMPNoSuchName(SNMPException): pass
[docs]class SNMPBadValue(SNMPException): pass
[docs]class SNMPReadOnly(SNMPException): pass
# Dynamically build remaining (v2) exceptions for name, obj in inspect.getmembers(error): if name.endswith("Error") and \ inspect.isclass(obj) and \ issubclass(obj, error.MibOperationError) and \ obj != error.MibOperationError: name = str("SNMP{}".format(name[:-5])) globals()[name] = type(name, (SNMPException,), {}) del name del obj
[docs]class Session: """SNMP session. An instance of this object will represent an SNMP session. From such an instance, one can get information from the associated agent.""" _tls = threading.local() def __init__(self, host, community="public", version=2, secname=None, authprotocol=None, authpassword=None, privprotocol=None, privpassword=None, contextname=None, bulk=40, none=False): """Create a new SNMP session. :param host: The hostname or IP address of the agent to connect to. Optionally, the port can be specified separated with a double colon. :type host: str :param community: The community to transmit to the agent for authorization purpose. This parameter is ignored if the specified version is 3. :type community: str :param version: The SNMP version to use to talk with the agent. Possible values are `1`, `2` (community-based) or `3`. :type version: int :param secname: Security name to use for SNMPv3 only. :type secname: str :param authprotocol: Authorization protocol to use for SNMPv3. This can be `None` or either the string `SHA` or `MD5`. :type authprotocol: None or str :param authpassword: Authorization password if authorization protocol is not `None`. :type authpassword: str :param privprotocol: Privacy protocol to use for SNMPv3. This can be `None` or either the string `AES`, `AES128`, `AES192`, `AES256` or `3DES`. :type privprotocol: None or str :param privpassword: Privacy password if privacy protocol is not `None`. :type contextname: str :param contextname: Context name for SNMPv3 messages. :type privpassword: str :param bulk: Max repetition value for `GETBULK` requests. Set to `0` to disable. :type bulk: int :param none: When enabled, will return None for not found values (instead of raising an exception) :type none: bool """ self._host = host self._version = version self._none = none if version == 3: self._cmdgen = cmdgen.CommandGenerator() self._contextname = contextname else: if not hasattr(self._tls, "cmdgen"): self._tls.cmdgen = cmdgen.CommandGenerator() self._cmdgen = self._tls.cmdgen self._contextname = None if version == 1 and none: raise ValueError("None-GET requests not compatible with SNMPv1") # Put authentication stuff in self._auth if version in [1, 2]: self._auth = cmdgen.CommunityData( community[0:30], community, version - 1) elif version == 3: if secname is None: secname = community try: authprotocol = { None: cmdgen.usmNoAuthProtocol, "MD5": cmdgen.usmHMACMD5AuthProtocol, "SHA": cmdgen.usmHMACSHAAuthProtocol, "SHA1": cmdgen.usmHMACSHAAuthProtocol }[authprotocol] except KeyError: raise ValueError("{} is not an acceptable authentication " "protocol".format(authprotocol)) try: privprotocol = { None: cmdgen.usmNoPrivProtocol, "DES": cmdgen.usmDESPrivProtocol, "3DES": cmdgen.usm3DESEDEPrivProtocol, "AES": cmdgen.usmAesCfb128Protocol, "AES128": cmdgen.usmAesCfb128Protocol, "AES192": cmdgen.usmAesCfb192Protocol, "AES256": cmdgen.usmAesCfb256Protocol, }[privprotocol] except KeyError: raise ValueError("{} is not an acceptable privacy " "protocol".format(privprotocol)) self._auth = cmdgen.UsmUserData(secname, authpassword, privpassword, authprotocol, privprotocol) else: raise ValueError("unsupported SNMP version {}".format(version)) # Put transport stuff into self._transport mo = re.match(r'^(?:' r'\[(?P<ipv6>[\d:A-Fa-f]+)\]|' r'(?P<ipv4>[\d\.]+)|' r'(?P<any>.*?))' r'(?::(?P<port>\d+))?$', host) if mo.group("port"): port = int(mo.group("port")) else: port = 161 if mo.group("ipv6"): self._transport = cmdgen.Udp6TransportTarget((mo.group("ipv6"), port)) elif mo.group("ipv4"): self._transport = cmdgen.UdpTransportTarget((mo.group("ipv4"), port)) else: results = socket.getaddrinfo(mo.group("any"), port, 0, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # We should try to connect to each result to determine if # the given family is available. However, we cannot do # that over UDP. Let's implement a safe choice. If we have # an IPv4 address, use that. If not, use IPv6. If we want # to add an option to force IPv6, it is a good place. if [x for x in results if x[0] == socket.AF_INET]: self._transport = cmdgen.UdpTransportTarget((mo.group("any"), port)) else: self._transport = cmdgen.Udp6TransportTarget((mo.group("any"), port)) # Bulk stuff self.bulk = bulk def _check_exception(self, value): """Check if the given ASN1 value is an exception""" if isinstance(value, rfc1905.NoSuchObject): raise SNMPNoSuchObject("No such object was found") # noqa: F821 if isinstance(value, rfc1905.NoSuchInstance): raise SNMPNoSuchInstance("No such instance exists") # noqa: F821 if isinstance(value, rfc1905.EndOfMibView): raise SNMPEndOfMibView("End of MIB was reached") # noqa: F821 def _convert(self, value): """Convert a PySNMP value to some native Python type""" try: # With PySNMP 4.3+, an OID is a ObjectIdentity. We try to # extract it while being compatible with earlier releases. value = value.getOid() except AttributeError: pass convertors = {rfc1902.Integer: int, rfc1902.Integer32: int, rfc1902.OctetString: bytes, rfc1902.IpAddress: ipaddress.IPv4Address, rfc1902.Counter32: int, rfc1902.Counter64: int, rfc1902.Gauge32: int, rfc1902.Unsigned32: int, rfc1902.TimeTicks: int, rfc1902.Bits: str, rfc1902.Opaque: str, rfc1902.univ.ObjectIdentifier: tuple} if self._none: convertors[rfc1905.NoSuchObject] = lambda x: None convertors[rfc1905.NoSuchInstance] = lambda x: None for cl, fn in convertors.items(): if isinstance(value, cl): return fn(value) self._check_exception(value) raise NotImplementedError("unable to convert {}".format(repr(value))) def _op(self, cmd, *oids): """Apply an SNMP operation""" kwargs = {} if self._contextname: kwargs['contextName'] = rfc1902.OctetString(self._contextname) errorIndication, errorStatus, errorIndex, varBinds = cmd( self._auth, self._transport, *oids, **kwargs) if errorIndication: self._check_exception(errorIndication) raise SNMPException(str(errorIndication)) if errorStatus: # We try to find a builtin exception with the same message exc = str(errorStatus.prettyPrint()) exc = re.sub(r'\W+', '', exc) exc = "SNMP{}".format(exc[0].upper() + exc[1:]) if str(exc) in globals(): raise globals()[exc] raise SNMPException(errorStatus.prettyPrint()) if cmd in [self._cmdgen.getCmd, self._cmdgen.setCmd]: results = [(tuple(name), val) for name, val in varBinds] else: results = [(tuple(name), val) for row in varBinds for name, val in row] if len(results) > 0 and isinstance(results[-1][1], rfc1905.EndOfMibView): results = results[:-1] if len(results) == 0: if cmd not in [self._cmdgen.nextCmd, self._cmdgen.bulkCmd]: raise SNMPException("empty answer") return tuple([(oid, self._convert(val)) for oid, val in results])
[docs] def get(self, *oids): """Retrieve an OID value using GET. :param oids: a list of OID to retrieve. An OID is a tuple. :return: a list of tuples with the retrieved OID and the raw value. """ return self._op(self._cmdgen.getCmd, *oids)
[docs] def walkmore(self, *oids): """Retrieve OIDs values using GETBULK or GETNEXT. The method is called "walk" but this is either a GETBULK or a GETNEXT. The later is only used for SNMPv1 or if bulk has been disabled using :meth:`bulk` property. :param oids: a list of OID to retrieve. An OID is a tuple. :return: a list of tuples with the retrieved OID and the raw value. """ if self._version == 1 or not self.bulk: return self._op(self._cmdgen.nextCmd, *oids) args = [0, self.bulk] + list(oids) try: return self._op(self._cmdgen.bulkCmd, *args) except SNMPTooBig: # Let's try to ask for less values. We will never increase # bulk again. We cannot increase it just after the walk # because we may end up requesting everything twice (or # more). nbulk = self.bulk / 2 or False if nbulk != self.bulk: self.bulk = nbulk return self.walk(*oids) raise
[docs] def walk(self, *oids): """Walk from given OIDs but don't return any "extra" results. Only results in the subtree will be returned. :param oid: OIDs used as a start point :return: a list of tuples with the retrieved OID and the raw value. """ return ((noid, result) for oid in oids for noid, result in self.walkmore(oid) if (len(noid) >= len(oid) and noid[:len(oid)] == oid[:len(oid)]))
[docs] def set(self, *args): """Set an OID value using SET. This function takes an odd number of arguments. They are working by pair. The first member is an OID and the second one is :class:`basictypes.Type` instace whose `pack()` method will be used to transform into the appropriate form. :return: a list of tuples with the retrieved OID and the raw value. """ if len(args) % 2 != 0: raise ValueError("expect an even number of arguments for SET") varbinds = zip(*[args[0::2], [v.pack() for v in args[1::2]]]) return self._op(self._cmdgen.setCmd, *varbinds)
def __repr__(self): return "{}(host={},version={})".format( self.__class__.__name__, self._host, self._version) @property def timeout(self): """Get timeout value for the current session. :return: Timeout value in microseconds. """ return self._transport.timeout * 1000000 @timeout.setter def timeout(self, value): """Set timeout value for the current session. :param value: Timeout value in microseconds. """ value = int(value) if value <= 0: raise ValueError("timeout is a positive integer") self._transport.timeout = value / 1000000. @property def retries(self): """Get number of times a request is retried. :return: Number of retries for each request. """ return self._transport.retries @retries.setter def retries(self, value): """Set number of times a request is retried. :param value: Number of retries for each request. """ value = int(value) if value < 0: raise ValueError("retries is a non-negative integer") self._transport.retries = value @property def bulk(self): """Get bulk settings. :return: `False` if bulk is disabled or a non-negative integer for the number of repetitions. """ return self._bulk @bulk.setter def bulk(self, value): """Set bulk settings. :param value: `False` to disable bulk or a non-negative integer for the number of allowed repetitions. """ if value is False: self._bulk = False return value = int(value) if value <= 0: raise ValueError("{} is not an appropriate value " "for max repeater parameter".format( value)) self._bulk = value