1#
2# This file is part of pysnmp software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: http://snmplabs.com/pysnmp/license.html
6#
7from pysnmp.proto import rfc1901, rfc1902, rfc1905
8from pysnmp.proto.api import v1
9from pyasn1.type import univ, constraint
10
11# Shortcuts to SNMP types
12Null = univ.Null
13null = Null('')
14ObjectIdentifier = univ.ObjectIdentifier
15
16Integer = rfc1902.Integer
17Integer32 = rfc1902.Integer32
18OctetString = rfc1902.OctetString
19IpAddress = rfc1902.IpAddress
20Counter32 = rfc1902.Counter32
21Gauge32 = rfc1902.Gauge32
22Unsigned32 = rfc1902.Unsigned32
23TimeTicks = rfc1902.TimeTicks
24Opaque = rfc1902.Opaque
25Counter64 = rfc1902.Counter64
26Bits = rfc1902.Bits
27
28NoSuchObject = rfc1905.NoSuchObject
29NoSuchInstance = rfc1905.NoSuchInstance
30EndOfMibView = rfc1905.EndOfMibView
31
32VarBind = rfc1905.VarBind
33VarBindList = rfc1905.VarBindList
34GetRequestPDU = rfc1905.GetRequestPDU
35GetNextRequestPDU = rfc1905.GetNextRequestPDU
36ResponsePDU = GetResponsePDU = rfc1905.ResponsePDU
37SetRequestPDU = rfc1905.SetRequestPDU
38GetBulkRequestPDU = rfc1905.GetBulkRequestPDU
39InformRequestPDU = rfc1905.InformRequestPDU
40SNMPv2TrapPDU = TrapPDU = rfc1905.SNMPv2TrapPDU
41ReportPDU = rfc1905.ReportPDU
42
43Message = rfc1901.Message
44
45getNextRequestID = v1.getNextRequestID
46
47apiVarBind = v1.apiVarBind
48
49
50class PDUAPI(v1.PDUAPI):
51    _errorStatus = rfc1905.errorStatus.clone(0)
52    _errorIndex = univ.Integer(0).subtype(subtypeSpec=constraint.ValueRangeConstraint(0, rfc1905.max_bindings))
53
54    def getResponse(self, reqPDU):
55        rspPDU = ResponsePDU()
56        self.setDefaults(rspPDU)
57        self.setRequestID(rspPDU, self.getRequestID(reqPDU))
58        return rspPDU
59
60    def getVarBindTable(self, reqPDU, rspPDU):
61        return [apiPDU.getVarBinds(rspPDU)]
62
63    def setEndOfMibError(self, pdu, errorIndex):
64        varBindList = self.getVarBindList(pdu)
65        varBindList[errorIndex - 1].setComponentByPosition(
66            1, rfc1905.endOfMibView, verifyConstraints=False, matchTags=False, matchConstraints=False
67        )
68
69    def setNoSuchInstanceError(self, pdu, errorIndex):
70        varBindList = self.getVarBindList(pdu)
71        varBindList[errorIndex - 1].setComponentByPosition(
72            1, rfc1905.noSuchInstance, verifyConstraints=False, matchTags=False, matchConstraints=False
73        )
74
75
76apiPDU = PDUAPI()
77
78
79class BulkPDUAPI(PDUAPI):
80    _nonRepeaters = rfc1905.nonRepeaters.clone(0)
81    _maxRepetitions = rfc1905.maxRepetitions.clone(10)
82
83    def setDefaults(self, pdu):
84        PDUAPI.setDefaults(self, pdu)
85        pdu.setComponentByPosition(
86            0, getNextRequestID(), verifyConstraints=False, matchTags=False, matchConstraints=False
87        )
88        pdu.setComponentByPosition(
89            1, self._nonRepeaters, verifyConstraints=False, matchTags=False, matchConstraints=False
90        )
91        pdu.setComponentByPosition(
92            2, self._maxRepetitions, verifyConstraints=False, matchTags=False, matchConstraints=False
93        )
94        pdu.setComponentByPosition(3)
95
96    @staticmethod
97    def getNonRepeaters(pdu):
98        return pdu.getComponentByPosition(1)
99
100    @staticmethod
101    def setNonRepeaters(pdu, value):
102        pdu.setComponentByPosition(1, value)
103
104    @staticmethod
105    def getMaxRepetitions(pdu):
106        return pdu.getComponentByPosition(2)
107
108    @staticmethod
109    def setMaxRepetitions(pdu, value):
110        pdu.setComponentByPosition(2, value)
111
112    def getVarBindTable(self, reqPDU, rspPDU):
113        nonRepeaters = self.getNonRepeaters(reqPDU)
114
115        reqVarBinds = self.getVarBinds(reqPDU)
116
117        N = min(int(nonRepeaters), len(reqVarBinds))
118        R = max(len(reqVarBinds) - N, 0)
119
120        rspVarBinds = self.getVarBinds(rspPDU)
121
122        varBindTable = []
123
124        if R:
125            for i in range(0, len(rspVarBinds) - N, R):
126                varBindRow = rspVarBinds[:N] + rspVarBinds[N + i:N + R + i]
127                # ignore stray OIDs / non-rectangular table
128                if len(varBindRow) == N + R:
129                    varBindTable.append(varBindRow)
130        elif N:
131            varBindTable.append(rspVarBinds[:N])
132
133        return varBindTable
134
135
136apiBulkPDU = BulkPDUAPI()
137
138
139class TrapPDUAPI(v1.PDUAPI):
140    sysUpTime = (1, 3, 6, 1, 2, 1, 1, 3, 0)
141    snmpTrapAddress = (1, 3, 6, 1, 6, 3, 18, 1, 3, 0)
142    snmpTrapCommunity = (1, 3, 6, 1, 6, 3, 18, 1, 4, 0)
143    snmpTrapOID = (1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0)
144    snmpTrapEnterprise = (1, 3, 6, 1, 6, 3, 1, 1, 4, 3, 0)
145    _zeroTime = TimeTicks(0)
146    _genTrap = ObjectIdentifier((1, 3, 6, 1, 6, 3, 1, 1, 5, 1))
147
148    def setDefaults(self, pdu):
149        v1.PDUAPI.setDefaults(self, pdu)
150        varBinds = [(self.sysUpTime, self._zeroTime),
151                    # generic trap
152                    (self.snmpTrapOID, self._genTrap)]
153        self.setVarBinds(pdu, varBinds)
154
155
156apiTrapPDU = TrapPDUAPI()
157
158
159class MessageAPI(v1.MessageAPI):
160    _version = rfc1901.version.clone(1)
161
162    def setDefaults(self, msg):
163        msg.setComponentByPosition(0, self._version, verifyConstraints=False, matchTags=False, matchConstraints=False)
164        msg.setComponentByPosition(1, self._community, verifyConstraints=False, matchTags=False, matchConstraints=False)
165        return msg
166
167    def getResponse(self, reqMsg):
168        rspMsg = Message()
169        self.setDefaults(rspMsg)
170        self.setVersion(rspMsg, self.getVersion(reqMsg))
171        self.setCommunity(rspMsg, self.getCommunity(reqMsg))
172        self.setPDU(rspMsg, apiPDU.getResponse(self.getPDU(reqMsg)))
173        return rspMsg
174
175
176apiMessage = MessageAPI()
177