1# 2# This file is part of pysnmp software. 3# 4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 5# License: http://snmplabs.com/pysnmp/license.html 6# 7from pysnmp.proto import rfc1901, rfc1902, rfc1905 8from pysnmp.proto.api import v1 9from pyasn1.type import univ, constraint 10 11# Shortcuts to SNMP types 12Null = univ.Null 13null = Null('') 14ObjectIdentifier = univ.ObjectIdentifier 15 16Integer = rfc1902.Integer 17Integer32 = rfc1902.Integer32 18OctetString = rfc1902.OctetString 19IpAddress = rfc1902.IpAddress 20Counter32 = rfc1902.Counter32 21Gauge32 = rfc1902.Gauge32 22Unsigned32 = rfc1902.Unsigned32 23TimeTicks = rfc1902.TimeTicks 24Opaque = rfc1902.Opaque 25Counter64 = rfc1902.Counter64 26Bits = rfc1902.Bits 27 28NoSuchObject = rfc1905.NoSuchObject 29NoSuchInstance = rfc1905.NoSuchInstance 30EndOfMibView = rfc1905.EndOfMibView 31 32VarBind = rfc1905.VarBind 33VarBindList = rfc1905.VarBindList 34GetRequestPDU = rfc1905.GetRequestPDU 35GetNextRequestPDU = rfc1905.GetNextRequestPDU 36ResponsePDU = GetResponsePDU = rfc1905.ResponsePDU 37SetRequestPDU = rfc1905.SetRequestPDU 38GetBulkRequestPDU = rfc1905.GetBulkRequestPDU 39InformRequestPDU = rfc1905.InformRequestPDU 40SNMPv2TrapPDU = TrapPDU = rfc1905.SNMPv2TrapPDU 41ReportPDU = rfc1905.ReportPDU 42 43Message = rfc1901.Message 44 45getNextRequestID = v1.getNextRequestID 46 47apiVarBind = v1.apiVarBind 48 49 50class PDUAPI(v1.PDUAPI): 51 _errorStatus = rfc1905.errorStatus.clone(0) 52 _errorIndex = univ.Integer(0).subtype(subtypeSpec=constraint.ValueRangeConstraint(0, rfc1905.max_bindings)) 53 54 def getResponse(self, reqPDU): 55 rspPDU = ResponsePDU() 56 self.setDefaults(rspPDU) 57 self.setRequestID(rspPDU, self.getRequestID(reqPDU)) 58 return rspPDU 59 60 def getVarBindTable(self, reqPDU, rspPDU): 61 return [apiPDU.getVarBinds(rspPDU)] 62 63 def setEndOfMibError(self, pdu, errorIndex): 64 varBindList = self.getVarBindList(pdu) 65 varBindList[errorIndex - 1].setComponentByPosition( 66 1, rfc1905.endOfMibView, verifyConstraints=False, matchTags=False, matchConstraints=False 67 ) 68 69 def setNoSuchInstanceError(self, pdu, errorIndex): 70 varBindList = self.getVarBindList(pdu) 71 varBindList[errorIndex - 1].setComponentByPosition( 72 1, rfc1905.noSuchInstance, verifyConstraints=False, matchTags=False, matchConstraints=False 73 ) 74 75 76apiPDU = PDUAPI() 77 78 79class BulkPDUAPI(PDUAPI): 80 _nonRepeaters = rfc1905.nonRepeaters.clone(0) 81 _maxRepetitions = rfc1905.maxRepetitions.clone(10) 82 83 def setDefaults(self, pdu): 84 PDUAPI.setDefaults(self, pdu) 85 pdu.setComponentByPosition( 86 0, getNextRequestID(), verifyConstraints=False, matchTags=False, matchConstraints=False 87 ) 88 pdu.setComponentByPosition( 89 1, self._nonRepeaters, verifyConstraints=False, matchTags=False, matchConstraints=False 90 ) 91 pdu.setComponentByPosition( 92 2, self._maxRepetitions, verifyConstraints=False, matchTags=False, matchConstraints=False 93 ) 94 pdu.setComponentByPosition(3) 95 96 @staticmethod 97 def getNonRepeaters(pdu): 98 return pdu.getComponentByPosition(1) 99 100 @staticmethod 101 def setNonRepeaters(pdu, value): 102 pdu.setComponentByPosition(1, value) 103 104 @staticmethod 105 def getMaxRepetitions(pdu): 106 return pdu.getComponentByPosition(2) 107 108 @staticmethod 109 def setMaxRepetitions(pdu, value): 110 pdu.setComponentByPosition(2, value) 111 112 def getVarBindTable(self, reqPDU, rspPDU): 113 nonRepeaters = self.getNonRepeaters(reqPDU) 114 115 reqVarBinds = self.getVarBinds(reqPDU) 116 117 N = min(int(nonRepeaters), len(reqVarBinds)) 118 R = max(len(reqVarBinds) - N, 0) 119 120 rspVarBinds = self.getVarBinds(rspPDU) 121 122 varBindTable = [] 123 124 if R: 125 for i in range(0, len(rspVarBinds) - N, R): 126 varBindRow = rspVarBinds[:N] + rspVarBinds[N + i:N + R + i] 127 # ignore stray OIDs / non-rectangular table 128 if len(varBindRow) == N + R: 129 varBindTable.append(varBindRow) 130 elif N: 131 varBindTable.append(rspVarBinds[:N]) 132 133 return varBindTable 134 135 136apiBulkPDU = BulkPDUAPI() 137 138 139class TrapPDUAPI(v1.PDUAPI): 140 sysUpTime = (1, 3, 6, 1, 2, 1, 1, 3, 0) 141 snmpTrapAddress = (1, 3, 6, 1, 6, 3, 18, 1, 3, 0) 142 snmpTrapCommunity = (1, 3, 6, 1, 6, 3, 18, 1, 4, 0) 143 snmpTrapOID = (1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0) 144 snmpTrapEnterprise = (1, 3, 6, 1, 6, 3, 1, 1, 4, 3, 0) 145 _zeroTime = TimeTicks(0) 146 _genTrap = ObjectIdentifier((1, 3, 6, 1, 6, 3, 1, 1, 5, 1)) 147 148 def setDefaults(self, pdu): 149 v1.PDUAPI.setDefaults(self, pdu) 150 varBinds = [(self.sysUpTime, self._zeroTime), 151 # generic trap 152 (self.snmpTrapOID, self._genTrap)] 153 self.setVarBinds(pdu, varBinds) 154 155 156apiTrapPDU = TrapPDUAPI() 157 158 159class MessageAPI(v1.MessageAPI): 160 _version = rfc1901.version.clone(1) 161 162 def setDefaults(self, msg): 163 msg.setComponentByPosition(0, self._version, verifyConstraints=False, matchTags=False, matchConstraints=False) 164 msg.setComponentByPosition(1, self._community, verifyConstraints=False, matchTags=False, matchConstraints=False) 165 return msg 166 167 def getResponse(self, reqMsg): 168 rspMsg = Message() 169 self.setDefaults(rspMsg) 170 self.setVersion(rspMsg, self.getVersion(reqMsg)) 171 self.setCommunity(rspMsg, self.getCommunity(reqMsg)) 172 self.setPDU(rspMsg, apiPDU.getResponse(self.getPDU(reqMsg))) 173 return rspMsg 174 175 176apiMessage = MessageAPI() 177