Source code for snimpy.mib

#
# snimpy -- Interactive SNMP tool
#
# Copyright (C) Vincent Bernat <bernat@luffy.cx>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

"""This module is a low-level interface to manipulate and
extract information from MIB files. It is a CFFI_ wrapper around
libsmi_. You may find convenient to use it in other projects but the
wrapper is merely here to serve *Snimpy* use and is therefore
incomplete.

.. _libsmi: http://www.ibr.cs.tu-bs.de/projects/libsmi/
.. _CFFI: http://cffi.readthedocs.io/
"""

try:
    from snimpy._smi import lib as _smi
    from snimpy._smi import ffi
except ImportError:
    from snimpy.smi_build import ffi, get_lib
    _smi = get_lib()


[docs]class SMIException(Exception): """SMI related exception. Any exception thrown in this module is inherited from this one."""
[docs]class Node: """MIB node. An instance of this class represents a MIB node. It can be specialized by other classes, like :class:`Scalar`, :class:`Table`, :class:`Column`, :class:`Node`. """ def __init__(self, node): """Create a new MIB node. :param node: libsmi node supporting this node. """ self.node = node self._override_type = None @property def type(self): """Get the basic type associated with this node. :return: The class from :mod:`basictypes` module which can represent the node. When retrieving a valid value for this node, the returned class can be instanciated to get an appropriate representation. """ from snimpy import basictypes if self._override_type: t = self._override_type else: t = _smi.smiGetNodeType(self.node) if t == ffi.NULL: raise SMIException("unable to retrieve type of node") target = { _smi.SMI_BASETYPE_INTEGER32: basictypes.Integer, _smi.SMI_BASETYPE_INTEGER64: basictypes.Integer, _smi.SMI_BASETYPE_UNSIGNED32: {b"TimeTicks": basictypes.Timeticks, None: basictypes.Unsigned32}, _smi.SMI_BASETYPE_UNSIGNED64: basictypes.Unsigned64, _smi.SMI_BASETYPE_OCTETSTRING: {b"IpAddress": basictypes.IpAddress, None: basictypes.OctetString}, _smi.SMI_BASETYPE_OBJECTIDENTIFIER: basictypes.Oid, _smi.SMI_BASETYPE_ENUM: {b"TruthValue": basictypes.Boolean, None: basictypes.Enum}, _smi.SMI_BASETYPE_BITS: basictypes.Bits }.get(t.basetype, None) if isinstance(target, dict): tt = _smi.smiGetParentType(t) target = target.get((t.name != ffi.NULL and ffi.string(t.name)) or (tt.name != ffi.NULL and ffi.string( tt.name)) or None, target.get(None, None)) if target is None: raise SMIException("unable to retrieve type of node") return target @property def typeName(self): """Retrieves the name of the the node's current declared type (not basic type). :return: A string representing the current declared type, suitable for assignment to type.setter. """ if self._override_type: t = self._override_type else: t = _smi.smiGetNodeType(self.node) # This occurs when the type is "implied". if t.name == ffi.NULL: t = _smi.smiGetParentType(t) if t is None or t == ffi.NULL: raise SMIException("unable to retrieve the declared type " "of the node '{}'".format(self.node.name)) return ffi.string(t.name) @typeName.setter def typeName(self, type_name): """Override the node's type to type_name, found using _getType. The new type must resolve to the same basictype. :param type_name: string name of the type. """ current_override = self._override_type declared_type = _smi.smiGetNodeType(self.node) declared_basetype = self.type new_type = _getType(type_name) if not new_type: raise SMIException("no type named {} in any loaded module".format( type_name)) # Easiest way to find the new basetype is to set the override # and ask. self._override_type = new_type new_basetype = self.type if declared_basetype != new_basetype: self._override_type = current_override raise SMIException("override type {1} not compatible with " "basetype of {0}".format( ffi.string(declared_type.name), ffi.string(new_type.name))) @typeName.deleter def typeName(self): """Clears the type override.""" self._override_type = None @property def fmt(self): """Get node format. The node format is a string to use to display a user-friendly version of the node. This is can be used for both octet strings or integers (to make them appear as decimal numbers). :return: The node format as a string or None if there is no format available. """ if self._override_type: t = self._override_type else: t = _smi.smiGetNodeType(self.node) tt = _smi.smiGetParentType(t) f = (t != ffi.NULL and t.format != ffi.NULL and ffi.string(t.format) or tt != ffi.NULL and tt.format != ffi.NULL and ffi.string(tt.format)) or None if f is None: return None return f.decode("ascii") @property def oid(self): """Get OID for the current node. The OID can then be used to request the node from an SNMP agent. :return: OID as a tuple """ return tuple([self.node.oid[i] for i in range(self.node.oidlen)]) @property def ranges(self): """Get node ranges. An node can be restricted by a set of ranges. For example, an integer can only be provided between two values. For strings, the restriction is on the length of the string. The returned value can be `None` if no restriction on range exists for the current node, a single value if the range is fixed or a list of tuples or fixed values otherwise. :return: The valid range for this node. """ t = _smi.smiGetNodeType(self.node) if t == ffi.NULL: return None ranges = [] range = _smi.smiGetFirstRange(t) while range != ffi.NULL: m1 = self._convert(range.minValue) m2 = self._convert(range.maxValue) if m1 == m2: ranges.append(m1) else: ranges.append((m1, m2)) range = _smi.smiGetNextRange(range) if len(ranges) == 0: return None if len(ranges) == 1: return ranges[0] return ranges @property def enum(self): """Get possible enum values. When the node can only take a discrete number of values, those values are defined in the MIB and can be retrieved through this property. :return: The dictionary of possible values keyed by the integer value. """ t = _smi.smiGetNodeType(self.node) if t == ffi.NULL or t.basetype not in (_smi.SMI_BASETYPE_ENUM, _smi.SMI_BASETYPE_BITS): return None result = {} element = _smi.smiGetFirstNamedNumber(t) while element != ffi.NULL: result[self._convert(element.value)] = ffi.string( element.name).decode("ascii") element = _smi.smiGetNextNamedNumber(element) return result @property def accessible(self): return (self.node.access not in (_smi.SMI_ACCESS_NOT_IMPLEMENTED, _smi.SMI_ACCESS_NOT_ACCESSIBLE)) def __str__(self): return ffi.string(self.node.name).decode("ascii") def __repr__(self): r = _smi.smiRenderNode(self.node, _smi.SMI_RENDER_ALL) if r == ffi.NULL: return "<uninitialized {} object at {}>".format( self.__class__.__name__, hex(id(self))) r = ffi.gc(r, _smi.free) module = _smi.smiGetNodeModule(self.node) if module == ffi.NULL: raise SMIException("unable to get module for {}".format( self.node.name)) return "<{} {} from '{}'>".format(self.__class__.__name__, ffi.string(r), ffi.string(module.name)) def _convert(self, value): attr = {_smi.SMI_BASETYPE_INTEGER32: "integer32", _smi.SMI_BASETYPE_UNSIGNED32: "unsigned32", _smi.SMI_BASETYPE_INTEGER64: "integer64", _smi.SMI_BASETYPE_UNSIGNED64: "unsigned64"}.get(value.basetype, None) if attr is None: raise SMIException("unexpected type found in range") return getattr(value.value, attr)
[docs]class Scalar(Node): """MIB scalar node. This class represents a scalar value in the MIB. A scalar value is a value not contained in a table. """
[docs]class Table(Node): """MIB table node. This class represents a table. A table is an ordered collection of objects consisting of zero or more rows. Each object in the table is identified using an index. An index can be a single value or a list of values. """ @property def columns(self): """Get table columns. The columns are the different kind of objects that can be retrieved in a table. :return: list of table columns (:class:`Column` instances) """ child = _smi.smiGetFirstChildNode(self.node) if child == ffi.NULL: return [] if child.nodekind != _smi.SMI_NODEKIND_ROW: raise SMIException("child {} of {} is not a row".format( ffi.string(child.name), ffi.string(self.node.name))) columns = [] child = _smi.smiGetFirstChildNode(child) while child != ffi.NULL: if child.nodekind != _smi.SMI_NODEKIND_COLUMN: raise SMIException("child {} of {} is not a column".format( ffi.string(child.name), ffi.string(self.node.name))) columns.append(Column(child)) child = _smi.smiGetNextChildNode(child) return columns @property def _row(self): """Get the table row. :return: row object (as an opaque object) """ child = _smi.smiGetFirstChildNode(self.node) if child != ffi.NULL and child.indexkind == _smi.SMI_INDEX_AUGMENT: child = _smi.smiGetRelatedNode(child) if child == ffi.NULL: raise SMIException("AUGMENT index for {} but " "unable to retrieve it".format( ffi.string(self.node.name))) if child == ffi.NULL: raise SMIException("{} does not have a row".format( ffi.string(self.node.name))) if child.nodekind != _smi.SMI_NODEKIND_ROW: raise SMIException("child {} of {} is not a row".format( ffi.string(child.name), ffi.string(self.node.name))) if child.indexkind != _smi.SMI_INDEX_INDEX: raise SMIException("child {} of {} has an unhandled " "kind of index".format( ffi.string(child.name), ffi.string(self.node.name))) return child @property def implied(self): """Is the last index implied? An implied index is an index whose size is not fixed but who is not prefixed by its size because this is the last index of a table. :return: `True` if and only if the last index is implied. """ child = self._row if child.implied: return True return False @property def index(self): """Get indexes for a table. The indexes are used to locate a precise row in a table. They are a subset of the table columns. :return: The list of indexes (as :class:`Column` instances) of the table. """ child = self._row lindex = [] element = _smi.smiGetFirstElement(child) while element != ffi.NULL: nelement = _smi.smiGetElementNode(element) if nelement == ffi.NULL: raise SMIException("cannot get index " "associated with {}".format( ffi.string(self.node.name))) if nelement.nodekind != _smi.SMI_NODEKIND_COLUMN: raise SMIException("index {} for {} is " "not a column".format( ffi.string(nelement.name), ffi.string(self.node.name))) lindex.append(Column(nelement)) element = _smi.smiGetNextElement(element) return lindex
[docs]class Column(Node): """MIB column node. This class represent a column of a table.""" @property def table(self): """Get table associated with this column. :return: The :class:`Table` instance associated to this column. """ parent = _smi.smiGetParentNode(self.node) if parent == ffi.NULL: raise SMIException("unable to get parent of {}".format( ffi.string(self.node.name))) if parent.nodekind != _smi.SMI_NODEKIND_ROW: raise SMIException("parent {} of {} is not a row".format( ffi.string(parent.name), ffi.string(self.node.name))) parent = _smi.smiGetParentNode(parent) if parent == ffi.NULL: raise SMIException("unable to get parent of {}".format( ffi.string(self.node.name))) if parent.nodekind != _smi.SMI_NODEKIND_TABLE: raise SMIException("parent {} of {} is not a table".format( ffi.string(parent.name), ffi.string(self.node.name))) t = Table(parent) return t
[docs]class Notification(Node): """MIB notification node. This class represent a notification.""" @property def objects(self): """Get objects for a notification. :return: The list of objects (as :class:`Column`, :class:`Node` or :class:`Scalar` instances) of the notification. """ child = self.node lindex = [] element = _smi.smiGetFirstElement(child) while element != ffi.NULL: nelement = _smi.smiGetElementNode(element) if nelement == ffi.NULL: raise SMIException("cannot get object " "associated with {}".format( ffi.string(self.node.name))) if nelement.nodekind == _smi.SMI_NODEKIND_COLUMN: lindex.append(Column(nelement)) elif nelement.nodekind == _smi.SMI_NODEKIND_NODE: lindex.append(Node(nelement)) elif nelement.nodekind == _smi.SMI_NODEKIND_SCALAR: lindex.append(Scalar(nelement)) else: raise SMIException("object {} for {} is " "not a node".format( ffi.string(nelement.name), ffi.string(self.node.name))) element = _smi.smiGetNextElement(element) return lindex
_lastError = None @ffi.callback("void(char *, int, int, char *, char*)") def _logError(path, line, severity, msg, tag): global _lastError if path != ffi.NULL and msg != ffi.NULL: _lastError = "{}:{}: {}".format(ffi.string(path), line, ffi.string(msg)) else: _lastError = None
[docs]def reset(): """Reset libsmi to its initial state.""" _smi.smiExit() try: if _smi.smiInit(b"snimpy") < 0: raise SMIException("unable to init libsmi") except TypeError: pass # We are being mocked _smi.smiSetErrorLevel(1) _smi.smiSetErrorHandler(_logError) try: _smi.smiSetFlags(_smi.SMI_FLAG_ERRORS | _smi.SMI_FLAG_RECURSIVE) except TypeError: pass # We are being mocked
[docs]def path(path=None): """Set or get a search path to libsmi. When no path is provided, return the current path, unmodified. Otherwise, set the path to the specified value. :param path: The string to be used to change the search path or `None` """ if path is None: # Get the path path = _smi.smiGetPath() if path == ffi.NULL: raise SMIException("unable to get current libsmi path") path = ffi.gc(path, _smi.free) result = ffi.string(path) return result.decode("utf8") # Set the path if not isinstance(path, bytes): path = path.encode("utf8") if _smi.smiSetPath(path) < 0: raise SMIException("unable to set the path {}".format(path))
def _get_module(name): """Get the SMI module from its name. :param name: The name of the module :return: The SMI module or `None` if not found (not loaded) """ if not isinstance(name, bytes): name = name.encode("ascii") m = _smi.smiGetModule(name) if m == ffi.NULL: return None if m.conformance and m.conformance <= 1: return None return m def _kind2object(kind): return { _smi.SMI_NODEKIND_NODE: Node, _smi.SMI_NODEKIND_SCALAR: Scalar, _smi.SMI_NODEKIND_TABLE: Table, _smi.SMI_NODEKIND_NOTIFICATION: Notification, _smi.SMI_NODEKIND_COLUMN: Column }.get(kind, Node)
[docs]def get(mib, name): """Get a node by its name. :param mib: The MIB name to query :param name: The object name to get from the MIB :return: the requested MIB node (:class:`Node`) """ if not isinstance(mib, bytes): mib = mib.encode("ascii") module = _get_module(mib) if module is None: raise SMIException("no module named {}".format(mib)) node = _smi.smiGetNode(module, name.encode("ascii")) if node == ffi.NULL: raise SMIException("in {}, no node named {}".format( mib, name)) pnode = _kind2object(node.nodekind) return pnode(node)
[docs]def getByOid(oid): """Get a node by its OID. :param oid: The OID as a tuple :return: The requested MIB node (:class:`Node`) """ node = _smi.smiGetNodeByOID(len(oid), oid) if node == ffi.NULL: raise SMIException("no node for {}".format( ".".join([str(o) for o in oid]))) pnode = _kind2object(node.nodekind) return pnode(node)
def _getType(type_name): """Searches for a smi type through all loaded modules. :param type_name: The name of the type to search for. :return: The requested type (:class:`smi.SmiType`), if found, or None. """ if not isinstance(type_name, bytes): type_name = type_name.encode("ascii") for module in _loadedModules(): new_type = _smi.smiGetType(module, type_name) if new_type != ffi.NULL: return new_type return None def _get_kind(mib, kind): """Get nodes of a given kind from a MIB. :param mib: The MIB name to search objects for :param kind: The SMI kind of object :return: The list of matched MIB nodes for the MIB """ if not isinstance(mib, bytes): mib = mib.encode("ascii") module = _get_module(mib) if module is None: raise SMIException("no module named {}".format(mib)) lnode = [] node = _smi.smiGetFirstNode(module, kind) while node != ffi.NULL: lnode.append(_kind2object(kind)(node)) node = _smi.smiGetNextNode(node, kind) return lnode
[docs]def getNodes(mib): """Return all nodes from a given MIB. :param mib: The MIB name :return: The list of all MIB nodes for the MIB :rtype: list of :class:`Node` instances """ return _get_kind(mib, _smi.SMI_NODEKIND_NODE)
[docs]def getScalars(mib): """Return all scalars from a given MIB. :param mib: The MIB name :return: The list of all scalars for the MIB :rtype: list of :class:`Scalar` instances """ return _get_kind(mib, _smi.SMI_NODEKIND_SCALAR)
[docs]def getTables(mib): """Return all tables from a given MIB. :param mib: The MIB name :return: The list of all tables for the MIB :rtype: list of :class:`Table` instances """ return _get_kind(mib, _smi.SMI_NODEKIND_TABLE)
[docs]def getColumns(mib): """Return all columns from a givem MIB. :param mib: The MIB name :return: The list of all columns for the MIB :rtype: list of :class:`Column` instances """ return _get_kind(mib, _smi.SMI_NODEKIND_COLUMN)
[docs]def getNotifications(mib): """Return all notifications from a givem MIB. :param mib: The MIB name :return: The list of all notifications for the MIB :rtype: list of :class:`Notification` instances """ return _get_kind(mib, _smi.SMI_NODEKIND_NOTIFICATION)
[docs]def load(mib): """Load a MIB into the library. :param mib: The MIB to load, either a filename or a MIB name. :return: The MIB name that has been loaded. :except SMIException: The requested MIB cannot be loaded. """ if not isinstance(mib, bytes): mib = mib.encode("ascii") modulename = _smi.smiLoadModule(mib) if modulename == ffi.NULL: raise SMIException("unable to find {} (check the path)".format(mib)) modulename = ffi.string(modulename) if not _get_module(modulename.decode("ascii")): details = "check with smilint -s -l1" if _lastError is not None: details = "{}: {}".format(_lastError, details) raise SMIException( "{} contains major SMI error ({})".format(mib, details)) return modulename
def _loadedModules(): """Generates the list of loaded modules. :yield: The :class:`smi.SmiModule` of all currently loaded modules. """ module = _smi.smiGetFirstModule() while module != ffi.NULL: yield module module = _smi.smiGetNextModule(module)
[docs]def loadedMibNames(): """Generates the list of loaded MIB names. :yield: The names of all currently loaded MIBs. """ for module in _loadedModules(): yield ffi.string(module.name).decode('utf-8')
reset()