Source code for snimpy.manager

#
# snimpy -- Interactive SNMP tool
#
# Copyright (C) Vincent Bernat <bernat@luffy.cx>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

"""This module is the high-level interface to *Snimpy*. It exposes
:class:`Manager` class to instantiate a new manager (which is an SNMP
client). This is the preferred interface for *Snimpy*.

Here is a simple example of use of this module::

    >>> load("IF-MIB")
    >>> m = Manager("localhost")
    >>> m.ifDescr[1]
    <String: lo>
"""

import inspect
from time import time
from collections import MutableMapping, Container, Iterable, Sized
from snimpy import snmp, mib, basictypes


class DelegatedSession(object):

    """General class for SNMP session for delegation"""

    def __init__(self, session):
        self._session = session

    def __getattr__(self, attr):
        return getattr(self._session, attr)

    def __setattribute__(self, attr, value):
        return setattr(self._session, attr, value)


class DelayedSetSession(DelegatedSession):

    """SNMP session that is able to delay SET requests.

    This is an adapter. The constructor takes the original (not
    delayed) session.
    """

    def __init__(self, session):
        DelegatedSession.__init__(self, session)
        self.setters = []

    def set(self, *args):
        self.setters.extend(args)

    def commit(self):
        if self.setters:
            self._session.set(*self.setters)


class NoneSession(DelegatedSession):

    """SNMP session that will return None on unsucessful GET requests.

    In a normal session, a GET request returning `No such instance`
    error will trigger an exception. This session will catch such an
    error and return None instead.
    """

    def get(self, *args):
        try:
            return self._session.get(*args)
        except (snmp.SNMPNoSuchName,
                snmp.SNMPNoSuchObject,
                snmp.SNMPNoSuchInstance):
            if len(args) > 1:
                # We can't handle this case yet because we don't know
                # which value is unavailable.
                raise
            return ((args[0], None),)


class CachedSession(DelegatedSession):

    """SNMP session using a cache.

    This is an adapter. The constructor takes the original session.
    """

    def __init__(self, session, timeout=5):
        DelegatedSession.__init__(self, session)
        self.cache = {}  # contains (operation, oid) -> [time, result] entries
        self.timeout = timeout
        self.count = 0

    def getorwalk(self, op, *args):
        self.count += 1
        if (op, args) in self.cache:
            t, v = self.cache[op, args]
            if time() - t < self.timeout:
                return v
        value = getattr(self._session, op)(*args)
        self.cache[op, args] = [time(), value]
        if op == "walkmore":
            # also cache all the get requests we got for free
            for oid, get_value in value:
                self.count += 1
                self.cache["get", (oid, )] = [time(), ((oid, get_value), )]
        self.flush()
        return value

    def get(self, *args):
        return self.getorwalk("get", *args)

    def walk(self, *args):
        assert(len(args) == 1)  # we should ony walk one oid at a time
        return self.getorwalk("walkmore", *args)

    def flush(self):
        keys = list(self.cache.keys())
        for k in keys:
            if time() - self.cache[k][0] > self.timeout:
                del self.cache[k]
        self.count = 0


def MibRestrictedManager(original, mibs):

    """Copy an existing manager but restrict its view to the given set of
    MIBs.
    """
    clone = Manager(**original._constructor_args)
    clone._loaded = mibs
    return clone


[docs]class Manager(object): """SNMP manager. An instance of this class will represent an SNMP manager (client). When a MIB is loaded with :func:`load`, scalars and row names from it will be made available as an instance attribute. For a scalar, reading the corresponding attribute will get its value while setting it will allow to modify it: >>> load("SNMPv2-MIB") >>> m = Manager("localhost", "private") >>> m.sysContact <String: root> >>> m.sysContact = "Brian Jones" >>> m.sysContact <String: Brian Jones> For a row name, the provided interface is like a Python dictionary. Requesting an item using its index will retrieve the value from the agent (the server):: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> m.ifDescr[1] <String: lo> >>> m.ifName[1] = "Loopback interface" Also, it is possible to iterate on a row name to get all available values for index:: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> for idx in m.ifDescr: ... print(m.ifDescr[idx]) You can get a slice of index values from a table by iterating on a row name subscripted by a partial index:: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> for idx in m.ipNetToMediaPhysAddress[1]: ... print(idx) (<Integer: 1>, <IpAddress: 127.0.0.1>) You can use multivalue indexes in two ways: using Pythonic multi-dimensional dict syntax, or by providing a tuple containing index values:: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> m.ipNetToMediaPhysAddress[1]['127.0.0.1'] <String: aa:bb:cc:dd:ee:ff> >>> m.ipNetToMediaPhysAddress[1, '127.0.0.1'] <String: aa:bb:cc:dd:ee:ff> A context manager is also provided. Any modification issued inside the context will be delayed until the end of the context and then grouped into a single SNMP PDU to be executed atomically:: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> with m: ... m.ifName[1] = "Loopback interface" ... m.ifName[2] = "First interface" Any error will be turned into an exception:: >>> load("IF-MIB") >>> m = Manager("localhost", "private") >>> m.ifDescr[999] Traceback (most recent call last): ... SNMPNoSuchName: There is no such variable name in this MIB. """ # do we want this object to be populated with all nodes? _complete = False def __init__(self, host="localhost", community="public", version=2, cache=False, none=False, timeout=None, retries=None, loose=False, bulk=40, # SNMPv3 secname=None, authprotocol=None, authpassword=None, privprotocol=None, privpassword=None, contextname=None): """Create a new SNMP manager. Some of the parameters are explained in :meth:`snmp.Session.__init__`. :param host: The hostname or IP address of the agent to connect to. Optionally, the port can be specified separated with a double colon. :type host: str :param community: The community to transmit to the agent for authorization purpose. This parameter is ignored if the specified version is 3. :type community: str :param version: The SNMP version to use to talk with the agent. Possible values are `1`, `2` (community-based) or `3`. :type version: int :param cache: Should caching be enabled? This can be either a boolean or an integer to specify the cache timeout in seconds. If `True`, the default timeout is 5 seconds. :type cache: bool or int :param none: Should `None` be returned when the agent does not know the requested OID? If `True`, `None` will be returned when requesting an inexisting scalar or column. :type none: bool :param timeout: Use the specified value in seconds as timeout. :type timeout: int :param retries: How many times the request should be retried? :type retries: int :param loose: Enable loose typing. When type coercion fails (for example when a MIB declare an element to be an ASCII string while it is not), just return the raw result instead of an exception. This mode should be enabled with caution. Patching the MIB is a better idea. :type loose: bool :param bulk: Max-repetition to use to speed up MIB walking with `GETBULK`. Set to `0` to disable. :type bulk: int """ if host is None: host = Manager._host self._host = host self._session = snmp.Session(host, community, version, secname, authprotocol, authpassword, privprotocol, privpassword, contextname=contextname, bulk=bulk) if timeout is not None: self._session.timeout = int(timeout * 1000000) if retries is not None: self._session.retries = retries if cache: if cache is True: self._session = CachedSession(self._session) else: self._session = CachedSession(self._session, cache) if none: self._session = NoneSession(self._session) self._loose = loose self._loaded = loaded # To be able to clone, we save the arguments provided to the # constructor in a generic way frame = inspect.currentframe() args, _, _, values = inspect.getargvalues(frame) self._constructor_args = dict((a, values[a]) for a in args if a != 'self') def _locate(self, attribute): for m in self._loaded: try: a = mib.get(m, attribute) return (m, a) except mib.SMIException: pass raise AttributeError("{0} is not an attribute".format(attribute)) def __getattribute__(self, attribute): if attribute.startswith("_"): return object.__getattribute__(self, attribute) m, a = self._locate(attribute) if isinstance(a, mib.Scalar): oid, result = self._session.get(a.oid + (0,))[0] if result is not None: try: return a.type(a, result) except ValueError: if self._loose: return result raise return None elif isinstance(a, mib.Column): return ProxyColumn(self._session, a, self._loose) elif isinstance(a, mib.Table): return ProxyTable(self._session, a, self._loose) raise NotImplementedError def __setattr__(self, attribute, value): if attribute.startswith("_"): return object.__setattr__(self, attribute, value) m, a = self._locate(attribute) if not isinstance(value, basictypes.Type): value = a.type(a, value, raw=False) if isinstance(a, mib.Scalar): self._session.set(a.oid + (0,), value) return raise AttributeError("{0} is not writable".format(attribute)) def __getitem__(self, modulename): modulename = modulename.encode('ascii') for m in loaded: if modulename == m: return MibRestrictedManager(self, [m]) raise KeyError("{0} is not a loaded module".format(modulename)) def __repr__(self): return "<Manager for {0}>".format(self._host) def __enter__(self): """In a context, we group all "set" into a single request""" self._osession = self._session self._session = DelayedSetSession(self._session) return self def __exit__(self, type, value, traceback): """When we exit, we should execute all "set" requests""" try: if type is None: self._session.commit() finally: self._session = self._osession del self._osession
class Proxy: """A proxy for some base type, notably a column or a table.""" def __repr__(self): return "<{0} for {1}>".format(self.__class__.__name__, repr(self.proxy)[1:-1]) class ProxyIter(Proxy, Sized, Iterable, Container): """Proxy for an iterable sequence. This a proxy offering the ABC of an iterable sequence (something like a set but without set operations). This will be used by both `ProxyColumn` and `ProxyTable`. """ def _op(self, op, index, *args): if not isinstance(index, tuple): index = (index,) indextype = self.proxy.table.index if len(indextype) != len(index): raise ValueError( "{0} column uses the following " "indexes: {1!r}".format(self.proxy, indextype)) oidindex = [] for i, ind in enumerate(index): # Cast to the correct type since we need "toOid()" ind = indextype[i].type(indextype[i], ind, raw=False) oidindex.extend(ind.toOid()) result = getattr( self.session, op)(self.proxy.oid + tuple(oidindex), *args) if op != "set": oid, result = result[0] if result is not None: try: return self.proxy.type(self.proxy, result) except ValueError: if self._loose: return result raise return None def __contains__(self, object): try: self._op("get", object) except: return False return True def __iter__(self): for k, _ in self.iteritems(): yield k def __len__(self): len(list(self.iteritems())) def iteritems(self, table_filter=None): count = 0 oid = self.proxy.oid indexes = self.proxy.table.index if table_filter is not None: if len(table_filter) >= len(indexes): raise ValueError("Table filter has too many elements") oid_suffix = [] # Convert filter elements to correct types for i, part in enumerate(table_filter): part = indexes[i].type(indexes[i], part, raw=False) oid_suffix.extend(part.toOid()) oid += tuple(oid_suffix) walk_oid = oid for noid, result in self.session.walk(oid): if noid <= oid: noid = None break oid = noid if not((len(oid) >= len(walk_oid) and oid[:len(walk_oid)] == walk_oid[:len(walk_oid)])): noid = None break # oid should be turned into index index = oid[len(self.proxy.oid):] target = [] for x in indexes: l, o = x.type.fromOid(x, tuple(index)) target.append(x.type(x, o)) index = index[l:] count = count + 1 if result is not None: try: result = self.proxy.type(self.proxy, result) except ValueError: if not self._loose: raise if len(target) == 1: # Should work most of the time yield target[0], result else: yield tuple(target), result if count == 0: # We did not find any element. Is it because the column is # empty or because the column does not exist. We do a SNMP # GET to know. If we get a "No such instance" exception, # this means the column is empty. If we get "No such # object", this means the column does not exist. We cannot # make such a distinction with SNMPv1. try: self.session.get(self.proxy.oid) except snmp.SNMPNoSuchInstance: # OK, the set of result is really empty raise StopIteration except snmp.SNMPNoAccess: # Some implementations seem to return NoAccess (PySNMP is one) raise StopIteration except snmp.SNMPNoSuchName: # SNMPv1, we don't know pass except snmp.SNMPNoSuchObject: # The result is empty because the column is unknown raise raise StopIteration class ProxyTable(ProxyIter): """Proxy for table access. We just use the first index as a column. However, the mapping operations are not available. """ def __init__(self, session, table, loose): self.proxy = table.index[0] self.session = session self._loose = loose class ProxyColumn(ProxyIter, MutableMapping): """Proxy for column access""" def __init__(self, session, column, loose, oid_suffix=()): self.proxy = column self.session = session self._loose = loose self._oid_suffix = oid_suffix def __getitem__(self, index): # If supplied index is partial we return new ProxyColumn # with appended OID suffix idx_len = len(self.proxy.table.index) suffix_len = len(self._oid_suffix) if isinstance(index, tuple): if len(index) + suffix_len < idx_len: return self._partial(index) elif idx_len > suffix_len + 1: return self._partial((index,)) # Otherwise a read op is made if not isinstance(index, tuple): index = (index,) return self._op("get", self._oid_suffix + index) def __setitem__(self, index, value): if not isinstance(value, basictypes.Type): value = self.proxy.type(self.proxy, value, raw=False) if not isinstance(index, tuple): index = (index,) self._op("set", self._oid_suffix + index, value) def __delitem__(self, index): raise NotImplementedError("cannot suppress a column") def __contains__(self, index): if not isinstance(index, tuple): index = (index,) return ProxyIter.__contains__(self, self._oid_suffix + index) def _partial(self, index): """Create new ProxyColumn based on current one, but with appended OID suffix""" new_suffix = self._oid_suffix + index return ProxyColumn(self.session, self.proxy, self._loose, new_suffix) def iteritems(self, table_filter=None): resulting_filter = self._oid_suffix if table_filter is not None: if not isinstance(table_filter, tuple): table_filter = (table_filter,) resulting_filter += table_filter return ProxyIter.iteritems(self, resulting_filter) loaded = []
[docs]def load(mibname): """Load a MIB in memory. :param mibname: MIB name or filename :type mibname: str """ m = mib.load(mibname) if m not in loaded: loaded.append(m) if Manager._complete: for o in mib.getScalars(m) + \ mib.getColumns(m) + \ mib.getTables(m): setattr(Manager, str(o), 1)