Source code for snimpy.basictypes

#
# snimpy -- Interactive SNMP tool
#
# Copyright (C) Vincent Bernat <bernat@luffy.cx>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

"""
This module is aimed at providing Pythonic representation of
various SNMP types. Each SMIv2 type is mapped to a corresponding class
which tries to mimic a basic type from Python. For example, display
strings are like Python string while SMIv2 integers are just like
Python integers. This module is some kind of a hack and its use
outside of *Snimpy* seems convoluted.
"""

import struct
import re
import ipaddress
from datetime import timedelta
from pysnmp.proto import rfc1902

from snimpy import mib


def ordering_with_cmp(cls):
    ops = {'__lt__': lambda self, other: self.__cmp__(other) < 0,
           '__gt__': lambda self, other: self.__cmp__(other) > 0,
           '__le__': lambda self, other: self.__cmp__(other) <= 0,
           '__ge__': lambda self, other: self.__cmp__(other) >= 0,
           '__eq__': lambda self, other: self.__cmp__(other) == 0,
           '__ne__': lambda self, other: self.__cmp__(other) != 0}
    for opname, opfunc in ops.items():
        opfunc.__name__ = opname
        opfunc.__doc__ = getattr(int, opname).__doc__
        setattr(cls, opname, opfunc)
    return cls


[docs]class Type: """Base class for all types.""" def __new__(cls, entity, value, raw=True): """Create a new typed value. :param entity: A :class:`mib.Node` instance :param value: The value to set :param raw: Whetever the raw value is provided (as opposed to a user-supplied value). This parameter is important when the provided input is ambiguous, for example when it is an array of bytes. :type raw: bool :return: an instance of the new typed value """ if entity.type != cls: raise ValueError("MIB node is {}. We are {}".format(entity.type, cls)) if cls == OctetString and entity.fmt is not None: # Promotion of OctetString to String if we have unicode stuff if isinstance(value, (String, str)) or not raw: cls = String if not isinstance(value, Type): value = cls._internal(entity, value) else: value = cls._internal(entity, value._value) if issubclass(cls, str): self = str.__new__(cls, value) elif issubclass(cls, bytes): self = bytes.__new__(cls, value) elif issubclass(cls, int): self = int.__new__(cls, value) else: self = object.__new__(cls) self._value = value self.entity = entity if cls == OctetString and entity.fmt is not None: # A display-hint propose to use only ascii and UTF-8 # chars. We promote an OCTET-STRING to a DisplayString if # we have a format. This means we won't be able to access # individual bytes in this format, only the full displayed # version. value = String._internal(entity, self) self = str.__new__(String, value) self._value = value self.entity = entity if isinstance(self, String): # Ensure that strings follow their format, if it is applied. # This is safer and simpler than toOid, as it does not do # additional validation. self._toBytes() return self def __init__(self, *args, **kwargs): # Neutralize __init__ from other inherited classes pass @classmethod def _internal(cls, entity, value): """Get internal value for a given value.""" raise NotImplementedError # pragma: no cover
[docs] def pack(self): """Prepare the instance to be sent on the wire.""" raise NotImplementedError # pragma: no cover
[docs] def toOid(self, implied=False): """Convert to an OID. If this function is implemented, then class function :meth:`fromOid` should also be implemented as the "invert" function of this one. This function only works if the entity is used as an index! Otherwise, it should raises NotImplementedError. :return: An OID that can be used as index """ raise NotImplementedError # pragma: no cover
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): """Create instance from an OID. This is the sister function of :meth:`toOid`. :param oid: The OID to use to create an instance :param entity: The MIB entity we want to instantiate :return: A couple `(l, v)` with `l` the number of suboids needed to create the instance and `v` the instance created from the OID """ raise NotImplementedError # pragma: no cover
@classmethod def _fixedLen(cls, entity): """Determine if the given entity is fixed-len This function is a helper that is used for String and Oid. When converting a variable-length type to an OID, we need to prefix it by its len or not depending of what the MIB say. Node that the type can be used in an index with IMPLIED keyword. In that case, even when this function returns False, the OID will not be prefixed by its length. :param entity: entity to check :return: `True` if it is fixed-len, `False` otherwise """ if entity.ranges and not isinstance(entity.ranges, (tuple, list)): # Fixed length return True else: # Variable length return False def __str__(self): return str(self._value) def __repr__(self): return '<{}: {}>'.format(self.__class__.__name__, str(self))
[docs]@ordering_with_cmp class IpAddress(Type, ipaddress.IPv4Address): """Class representing an IP address/""" @classmethod def _internal(cls, entity, value): if isinstance(value, (list, tuple)): value = ".".join([str(a) for a in value]) try: value = ipaddress.IPv4Address(value) except ipaddress.AddressValueError: raise ValueError("{!r} is not a valid IP".format(value)) return value
[docs] def pack(self): return rfc1902.IpAddress(str(self._value))
[docs] def toOid(self, implied=False): return tuple(self._value.packed)
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): if len(oid) < 4: raise ValueError( "{!r} is too short for an IP address".format(oid)) return (4, cls(entity, oid[:4]))
def __cmp__(self, other): if not isinstance(other, IpAddress): try: other = IpAddress(self.entity, other) except Exception: raise NotImplementedError # pragma: no cover if self._value == other._value: return 0 if self._value < other._value: return -1 return 1 def __getitem__(self, nb): return self._value.packed[nb]
[docs]class StringOrOctetString(Type):
[docs] def toOid(self, implied=False): # To convert properly to OID, we need to know if it is a # fixed-len string, an implied string or a variable-len # string. b = self._toBytes() if implied or self._fixedLen(self.entity): return tuple(b) else: return (len(b),) + tuple(b)
def _toBytes(self): raise NotImplementedError
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): oid = tuple(o & 0xff for o in oid) if implied: # Eat everything return (len(oid), cls(entity, bytes(oid))) if cls._fixedLen(entity): length = entity.ranges if len(oid) < length: raise ValueError( "{} is too short for wanted fixed " "string (need at least {:d})".format(oid, length)) return (length, cls(entity, bytes(oid[:length]))) # This is var-len if not oid: raise ValueError("empty OID while waiting for var-len string") length = oid[0] if len(oid) < length + 1: raise ValueError( "{} is too short for variable-len " "string (need at least {:d})".format(oid, length)) return ( (length + 1, cls(entity, bytes(oid[1:(length + 1)]))))
[docs] def pack(self): return rfc1902.OctetString(self._toBytes())
[docs]class OctetString(StringOrOctetString, bytes): """Class for a generic octet string. This class should be compared to :class:`String` which is used to represent a display string. This class is usually used to store raw bytes, like a bitmask of VLANs. """ @classmethod def _internal(cls, entity, value): # Internally, we are using bytes if isinstance(value, bytes): return value if isinstance(value, str): return value.encode("ascii") return bytes(value) def _toBytes(self): return self._value def __ior__(self, value): nvalue = bytearray(self._value) if not isinstance(value, (tuple, list)): value = [value] for v in value: if not isinstance(v, int): raise NotImplementedError( "on string, bit-operation are limited to integers") if len(nvalue) < (v >> 3) + 1: nvalue.extend([0] * ((v >> 3) + 1 - len(self._value))) nvalue[v >> 3] |= 1 << (7 - v % 8) return self.__class__(self.entity, bytes(nvalue)) def __isub__(self, value): nvalue = bytearray(self._value) if not isinstance(value, (tuple, list)): value = [value] for v in value: if not isinstance(v, int): raise NotImplementedError( "on string, bit-operation are limited to integers") if len(nvalue) < (v >> 3) + 1: continue nvalue[v >> 3] &= ~(1 << (7 - v % 8)) return self.__class__(self.entity, bytes(nvalue)) return self def __and__(self, value): nvalue = bytearray(self._value) if not isinstance(value, (tuple, list)): value = [value] for v in value: if not isinstance(v, int): raise NotImplementedError( "on string, bit-operation are limited to integers") if len(nvalue) < (v >> 3) + 1: return False if not (nvalue[v >> 3] & (1 << (7 - v % 8))): return False return True
[docs]class String(StringOrOctetString, str): """Class for a display string. Such a string is an unicode string and it is therefore expected that only printable characters are used. This is usually the case if the corresponding MIB node comes with a format string. With such an instance, the user is expected to be able to provide a formatted. For example, a MAC address could be written `00:11:22:33:44:55`. """ @classmethod def _parseOctetFormat(cls, fmt, j): # repeater if fmt[j] == "*": dorepeat = True j += 1 else: dorepeat = False # length length = "" while fmt[j].isdigit(): length += fmt[j] j += 1 length = int(length) # format format = fmt[j] j += 1 # seperator if j < len(fmt) and \ fmt[j] != "*" and not fmt[j].isdigit(): sep = fmt[j] j += 1 else: sep = "" # terminator if j < len(fmt) and \ fmt[j] != "*" and not fmt[j].isdigit(): term = fmt[j] j += 1 else: term = "" return (j, dorepeat, length, format, sep, term) @classmethod def _fromBytes(cls, value, fmt): i = 0 # Position in value j = 0 # Position in fmt result = "" term = None sep = None while i < len(value): if j < len(fmt): j, dorepeat, length, format, sep, term = cls._parseOctetFormat( fmt, j) # building if dorepeat: repeat = value[i] i += 1 else: repeat = 1 for r in range(repeat): bb = value[i:i + length] i += length if format in ['o', 'x', 'd']: if length > 4: raise ValueError( "don't know how to handle integers " "more than 4 bytes long") bb = b"\x00" * (4 - length) + bb number = struct.unpack(b"!l", bb)[0] if format == "o": # In Python2, oct() is 01242, while it is 0o1242 in # Python3 result += "".join(oct(number).partition("o")[0:3:2]) elif format == "x": result += hex(number)[2:] else: # format == "d": result += str(number) elif format == "a": result += bb.decode("ascii") elif format == "t": result += bb.decode("utf-8") else: raise ValueError("{!r} cannot be represented with " "the given display string ({})".format( bb, fmt)) result += sep if sep and term: result = result[:-1] result += term if term or sep: result = result[:-1] return result def _toBytes(self): # We need to reverse what was done by `_fromBytes`. This is # not an exact science. In most case, this is easy because a # separator is used but sometimes, this is not. We do some # black magic that will fail. i = 0 j = 0 fmt = self.entity.fmt bb = b"" while i < len(self._value): if j < len(fmt): parsed = self._parseOctetFormat(fmt, j) j, dorepeat, length, format, sep, term = parsed if format == "o": fmatch = "(?P<o>[0-7]{{1,{0}}})".format( int(length * 2.66667) + 1) elif format == "x": fmatch = "(?P<x>[0-9A-Fa-f]{{1,{0}}})".format(length * 2) elif format == "d": fmatch = "(?P<d>[0-9]{{1,{0}}})".format( int(length * 2.4083) + 1) elif format == "a": fmatch = "(?P<a>(?:.|\n){{1,{0}}})".format(length) elif format == "t": fmatch = "(?P<t>(?:.|\n){{1,{0}}})".format(length) else: raise ValueError("{!r} cannot be parsed due to an " "incorrect format ({})".format( self._value, fmt)) repeats = [] while True: mo = re.match(fmatch, self._value[i:]) if not mo: raise ValueError("{!r} cannot be parsed because it " "does not match format {} at " "index {}".format(self._value, fmt, i)) if format in ["o", "x", "d"]: if format == "o": r = int(mo.group("o"), 8) elif format == "x": r = int(mo.group("x"), 16) else: r = int(mo.group("d")) result = struct.pack(b"!l", r)[-length:] else: result = mo.group(1).encode("utf-8") i += len(mo.group(1)) if dorepeat: repeats.append(result) if i < len(self._value): # Approximate... if sep and self._value[i] == sep: i += 1 elif term and self._value[i] == term: i += 1 break else: break else: break if dorepeat: bb += bytes([len(repeats)]) bb += b"".join(repeats) else: bb += result if i < len(self._value) and (sep and self._value[i] == sep or term and self._value[i] == term): i += 1 return bb @classmethod def _internal(cls, entity, value): # Internally, we use the displayed string. We have a special # case if the value is an OctetString to do the conversion. if isinstance(value, OctetString): return cls._fromBytes(value._value, entity.fmt) if isinstance(value, bytes): return value.decode("utf-8") return str(value) def __str__(self): return self._value
[docs]class Integer(Type, int): """Class for any integer.""" @classmethod def _internal(cls, entity, value): return int(value)
[docs] def pack(self): if self._value >= (1 << 64): raise OverflowError("too large to be packed") if self._value >= (1 << 32): return rfc1902.Counter64(self._value) if self._value >= 0: return rfc1902.Integer(self._value) if self._value >= -(1 << 31): return rfc1902.Integer(self._value) raise OverflowError("too small to be packed")
[docs] def toOid(self, implied=False): return (self._value,)
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): if len(oid) < 1: raise ValueError("{} is too short for an integer".format(oid)) return (1, cls(entity, oid[0]))
def __str__(self): if self.entity.fmt: if self.entity.fmt[0] == "x": return hex(self._value) if self.entity.fmt[0] == "o": return oct(self._value) if self.entity.fmt[0] == "b": if self._value == 0: return "0" if self._value > 0: v = self._value r = "" while v > 0: r = str(v % 2) + r v = v >> 1 return r elif self.entity.fmt[0] == "d" and \ len(self.entity.fmt) > 2 and \ self.entity.fmt[1] == "-": dec = int(self.entity.fmt[2:]) result = str(self._value) if len(result) < dec + 1: result = "0" * (dec + 1 - len(result)) + result return "{}.{}".format(result[:-2], result[-2:]) return str(self._value)
[docs]class Unsigned32(Integer): """Class to represent an unsigned 32bits integer."""
[docs] def pack(self): if self._value >= (1 << 32): raise OverflowError("too large to be packed") if self._value < 0: raise OverflowError("too small to be packed") return rfc1902.Unsigned32(self._value)
[docs]class Unsigned64(Integer): """Class to represent an unsigned 64bits integer."""
[docs] def pack(self): if self._value >= (1 << 64): raise OverflowError("too large to be packed") if self._value < 0: raise OverflowError("too small to be packed") return rfc1902.Counter64(self._value)
[docs]class Enum(Integer): """Class for an enumeration. An enumaration is an integer but labels are attached to some values for a more user-friendly display.""" @classmethod def _internal(cls, entity, value): if value in entity.enum: return value for (k, v) in entity.enum.items(): if (v == value): return k try: return int(value) except Exception: raise ValueError("{!r} is not a valid " "value for {}".format(value, entity))
[docs] def pack(self): return rfc1902.Integer(self._value)
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): if len(oid) < 1: raise ValueError( "{!r} is too short for an enumeration".format(oid)) return (1, cls(entity, oid[0]))
def __eq__(self, other): if not isinstance(other, self.__class__): try: other = self.__class__(self.entity, other) except Exception: raise NotImplementedError # pragma: no cover return self._value == other._value def __ne__(self, other): return not self.__eq__(other) def __str__(self): if self._value in self.entity.enum: return ( "{}({:d})".format(self.entity.enum[self._value], self._value) ) else: return str(self._value)
[docs]@ordering_with_cmp class Oid(Type): """Class to represent and OID.""" @classmethod def _internal(cls, entity, value): if isinstance(value, (list, tuple)): return tuple(int(v) for v in value) elif isinstance(value, str): return tuple(int(i) for i in value.split(".") if i) elif isinstance(value, mib.Node): return tuple(value.oid) else: raise TypeError( "don't know how to convert {!r} to OID".format(value))
[docs] def pack(self): return rfc1902.univ.ObjectIdentifier(self._value)
[docs] def toOid(self, implied=False): if implied or self._fixedLen(self.entity): return self._value else: return tuple([len(self._value)] + list(self._value))
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): if cls._fixedLen(entity): # A fixed OID? We don't like this. Provide a real example. raise ValueError( "{!r} seems to be a fixed-len OID index. Odd.".format(entity)) if not implied: # This index is not implied. We need the len if len(oid) < 1: raise ValueError( "{!r} is too short for a not " "implied index".format(entity)) length = oid[0] if len(oid) < length + 1: raise ValueError( "{!r} has an incorrect size " "(needs at least {:d})".format(oid, length)) return (length + 1, cls(entity, oid[1:(length + 1)])) else: # This index is implied. Eat everything return (len(oid), cls(entity, oid))
def __str__(self): return ".".join([str(x) for x in self._value]) def __cmp__(self, other): if not isinstance(other, Oid): other = Oid(self.entity, other) if tuple(self._value) == tuple(other._value): return 0 if self._value > other._value: return 1 return -1 def __getitem__(self, index): return self._value[index] def __contains__(self, item): """Test if item is a sub-oid of this OID""" if not isinstance(item, Oid): item = Oid(self.entity, item) return tuple(item._value[:len(self._value)]) == \ tuple(self._value[:len(self._value)])
[docs]class Boolean(Enum): """Class for a boolean.""" @classmethod def _internal(cls, entity, value): if isinstance(value, bool): if value: return Enum._internal(entity, "true") else: return Enum._internal(entity, "false") else: return Enum._internal(entity, value) def __nonzero__(self): if self._value == 1: return True else: return False def __bool__(self): return self.__nonzero__()
[docs]@ordering_with_cmp class Timeticks(Type): """Class for timeticks.""" @classmethod def _internal(cls, entity, value): if isinstance(value, int): # Value in centiseconds return timedelta(0, value / 100.) elif isinstance(value, timedelta): return value else: raise TypeError( "dunno how to handle {!r} ({})".format(value, type(value))) def __int__(self): return self._value.days * 3600 * 24 * 100 + \ self._value.seconds * 100 + \ self._value.microseconds // 10000
[docs] def toOid(self, implied=False): return (int(self),)
[docs] @classmethod def fromOid(cls, entity, oid, implied=False): if len(oid) < 1: raise ValueError("{!r} is too short for a timetick".format(oid)) return (1, cls(entity, oid[0]))
[docs] def pack(self): return rfc1902.TimeTicks(int(self))
def __str__(self): return str(self._value) def __cmp__(self, other): if isinstance(other, Timeticks): other = other._value elif isinstance(other, int): other = timedelta(0, other / 100.) elif not isinstance(other, timedelta): raise NotImplementedError( "only compare to int or " "timedelta, not {}".format(type(other))) if self._value == other: return 0 if self._value < other: return -1 return 1
[docs]class Bits(Type): """Class for bits.""" @classmethod def _internal(cls, entity, value): bits = set() tryalternate = False if isinstance(value, bytes): for i, x in enumerate(value): if x == 0: continue for j in range(8): if x & (1 << (7 - j)): k = (i * 8) + j if k not in entity.enum: tryalternate = True break bits.add(k) if tryalternate: break if not tryalternate: return bits else: bits = set() elif not isinstance(value, (tuple, list, set, frozenset)): value = {value} for v in value: found = False if v in entity.enum: bits.add(v) found = True else: for (k, t) in entity.enum.items(): if (t == v): bits.add(k) found = True break if not found: raise ValueError("{!r} is not a valid bit value".format(v)) return bits
[docs] def pack(self): if self._value: string = [0] * ((max(self._value) // 8) + 1) else: string = [] for b in self._value: string[b // 8] |= 1 << (7 - b % 8) return rfc1902.Bits(bytes(string))
def __eq__(self, other): if isinstance(other, str): other = [other] if not isinstance(other, Bits): other = Bits(self.entity, other) return self._value == other._value def __ne__(self, other): return not self.__eq__(other) def __str__(self): result = [] for b in sorted(self._value): result.append("{}({:d})".format(self.entity.enum[b], b)) return ", ".join(result) def __and__(self, other): if isinstance(other, str): other = [other] if not isinstance(other, Bits): other = Bits(self.entity, other) return len(self._value & other._value) > 0 def __ior__(self, other): if isinstance(other, str): other = [other] if not isinstance(other, Bits): other = Bits(self.entity, other) self._value |= other._value return self def __isub__(self, other): if isinstance(other, str): other = [other] if not isinstance(other, Bits): other = Bits(self.entity, other) self._value -= other._value return self
[docs]def build(mibname, node, value): """Build a new basic type with the given value. :param mibname: The MIB to use to locate the entity. :param node: The node that will be attached to this type. :param value: The initial value to set for the type. :return: A :class:`Type` instance """ m = mib.get(mibname, node) t = m.type(m, value) return t