1# -*- coding: utf-8 -*- 2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo 3# Copyright (C) 2011-2019 German Aerospace Center (DLR) and others. 4# This program and the accompanying materials 5# are made available under the terms of the Eclipse Public License v2.0 6# which accompanies this distribution, and is available at 7# http://www.eclipse.org/legal/epl-v20.html 8# SPDX-License-Identifier: EPL-2.0 9 10# @file _edge.py 11# @author Michael Behrisch 12# @author Jakob Erdmann 13# @date 2011-03-17 14# @version $Id$ 15 16from __future__ import absolute_import 17import struct 18from . import constants as tc 19from .domain import Domain 20from .storage import Storage 21from .exceptions import TraCIException 22 23 24_RETURN_VALUE_FUNC = {tc.VAR_EDGE_TRAVELTIME: Storage.readDouble, 25 tc.VAR_WAITING_TIME: Storage.readDouble, 26 tc.VAR_EDGE_EFFORT: Storage.readDouble, 27 tc.VAR_CO2EMISSION: Storage.readDouble, 28 tc.VAR_COEMISSION: Storage.readDouble, 29 tc.VAR_HCEMISSION: Storage.readDouble, 30 tc.VAR_PMXEMISSION: Storage.readDouble, 31 tc.VAR_NOXEMISSION: Storage.readDouble, 32 tc.VAR_FUELCONSUMPTION: Storage.readDouble, 33 tc.VAR_NOISEEMISSION: Storage.readDouble, 34 tc.VAR_ELECTRICITYCONSUMPTION: Storage.readDouble, 35 tc.LAST_STEP_MEAN_SPEED: Storage.readDouble, 36 tc.LAST_STEP_OCCUPANCY: Storage.readDouble, 37 tc.LAST_STEP_LENGTH: Storage.readDouble, 38 tc.VAR_LANE_INDEX: Storage.readInt, 39 tc.VAR_NAME: Storage.readString, 40 tc.VAR_CURRENT_TRAVELTIME: Storage.readDouble, 41 tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt, 42 tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt, 43 tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList, 44 tc.LAST_STEP_PERSON_ID_LIST: Storage.readStringList, 45 } 46 47 48class EdgeDomain(Domain): 49 50 def __init__(self): 51 Domain.__init__(self, "edge", tc.CMD_GET_EDGE_VARIABLE, tc.CMD_SET_EDGE_VARIABLE, 52 tc.CMD_SUBSCRIBE_EDGE_VARIABLE, tc.RESPONSE_SUBSCRIBE_EDGE_VARIABLE, 53 tc.CMD_SUBSCRIBE_EDGE_CONTEXT, tc.RESPONSE_SUBSCRIBE_EDGE_CONTEXT, 54 _RETURN_VALUE_FUNC) 55 56 def getAdaptedTraveltime(self, edgeID, time): 57 """getAdaptedTraveltime(string, double) -> double 58 59 Returns the travel time value (in s) used for (re-)routing 60 which is valid on the edge at the given time. 61 """ 62 self._connection._beginMessage(tc.CMD_GET_EDGE_VARIABLE, tc.VAR_EDGE_TRAVELTIME, 63 edgeID, 1 + 8) 64 self._connection._string += struct.pack( 65 "!Bd", tc.TYPE_DOUBLE, time) 66 return self._connection._checkResult(tc.CMD_GET_EDGE_VARIABLE, 67 tc.VAR_EDGE_TRAVELTIME, edgeID).readDouble() 68 69 def getWaitingTime(self, edgeID): 70 """getWaitingTime() -> double 71 Returns the sum of the waiting time of all vehicles currently on 72 that edge (see traci.vehicle.getWaitingTime). 73 """ 74 return self._getUniversal(tc.VAR_WAITING_TIME, edgeID) 75 76 def getEffort(self, edgeID, time): 77 """getEffort(string, double) -> double 78 79 Returns the effort value used for (re-)routing 80 which is valid on the edge at the given time. 81 """ 82 self._connection._beginMessage(tc.CMD_GET_EDGE_VARIABLE, tc.VAR_EDGE_EFFORT, 83 edgeID, 1 + 8) 84 self._connection._string += struct.pack( 85 "!Bd", tc.TYPE_DOUBLE, time) 86 return self._connection._checkResult(tc.CMD_GET_EDGE_VARIABLE, 87 tc.VAR_EDGE_EFFORT, edgeID).readDouble() 88 89 def getCO2Emission(self, edgeID): 90 """getCO2Emission(string) -> double 91 92 Returns the CO2 emission in mg for the last time step on the given edge. 93 """ 94 return self._getUniversal(tc.VAR_CO2EMISSION, edgeID) 95 96 def getCOEmission(self, edgeID): 97 """getCOEmission(string) -> double 98 99 Returns the CO emission in mg for the last time step on the given edge. 100 """ 101 return self._getUniversal(tc.VAR_COEMISSION, edgeID) 102 103 def getHCEmission(self, edgeID): 104 """getHCEmission(string) -> double 105 106 Returns the HC emission in mg for the last time step on the given edge. 107 """ 108 return self._getUniversal(tc.VAR_HCEMISSION, edgeID) 109 110 def getPMxEmission(self, edgeID): 111 """getPMxEmission(string) -> double 112 113 Returns the particular matter emission in mg for the last time step on the given edge. 114 """ 115 return self._getUniversal(tc.VAR_PMXEMISSION, edgeID) 116 117 def getNOxEmission(self, edgeID): 118 """getNOxEmission(string) -> double 119 120 Returns the NOx emission in mg for the last time step on the given edge. 121 """ 122 return self._getUniversal(tc.VAR_NOXEMISSION, edgeID) 123 124 def getFuelConsumption(self, edgeID): 125 """getFuelConsumption(string) -> double 126 127 Returns the fuel consumption in ml for the last time step on the given edge. 128 """ 129 return self._getUniversal(tc.VAR_FUELCONSUMPTION, edgeID) 130 131 def getNoiseEmission(self, edgeID): 132 """getNoiseEmission(string) -> double 133 134 Returns the noise emission in db for the last time step on the given edge. 135 """ 136 return self._getUniversal(tc.VAR_NOISEEMISSION, edgeID) 137 138 def getElectricityConsumption(self, edgeID): 139 """getElectricityConsumption(string) -> double 140 141 Returns the electricity consumption in ml for the last time step. 142 """ 143 return self._getUniversal(tc.VAR_ELECTRICITYCONSUMPTION, edgeID) 144 145 def getLastStepMeanSpeed(self, edgeID): 146 """getLastStepMeanSpeed(string) -> double 147 148 Returns the average speed in m/s for the last time step on the given edge. 149 """ 150 return self._getUniversal(tc.LAST_STEP_MEAN_SPEED, edgeID) 151 152 def getLastStepOccupancy(self, edgeID): 153 """getLastStepOccupancy(string) -> double 154 155 Returns the net occupancy (excluding inter-vehicle gaps) in % for the last time step on the given edge. 156 """ 157 return self._getUniversal(tc.LAST_STEP_OCCUPANCY, edgeID) 158 159 def getLastStepLength(self, edgeID): 160 """getLastStepLength(string) -> double 161 162 Returns the mean vehicle length in m for the last time step on the given edge. 163 """ 164 return self._getUniversal(tc.LAST_STEP_LENGTH, edgeID) 165 166 def getLaneNumber(self, edgeID): 167 """getLaneNumber(string) -> int 168 169 Returns the number of lanes of this edge 170 """ 171 return self._getUniversal(tc.VAR_LANE_INDEX, edgeID) 172 173 def getStreetName(self, edgeID): 174 """getStreetName(string) -> string 175 176 Returns the street name of this edge 177 """ 178 return self._getUniversal(tc.VAR_NAME, edgeID) 179 180 def getTraveltime(self, edgeID): 181 """getTraveltime(string) -> double 182 183 Returns the estimated travel time in s for the last time step on the given edge. 184 """ 185 return self._getUniversal(tc.VAR_CURRENT_TRAVELTIME, edgeID) 186 187 def getLastStepVehicleNumber(self, edgeID): 188 """getLastStepVehicleNumber(string) -> integer 189 190 Returns the total number of vehicles for the last time step on the given edge. 191 """ 192 return self._getUniversal(tc.LAST_STEP_VEHICLE_NUMBER, edgeID) 193 194 def getLastStepHaltingNumber(self, edgeID): 195 """getLastStepHaltingNumber(string) -> integer 196 197 Returns the total number of halting vehicles for the last time step on the given edge. 198 A speed of less than 0.1 m/s is considered a halt. 199 """ 200 return self._getUniversal(tc.LAST_STEP_VEHICLE_HALTING_NUMBER, edgeID) 201 202 def getLastStepVehicleIDs(self, edgeID): 203 """getLastStepVehicleIDs(string) -> list(string) 204 205 Returns the ids of the vehicles for the last time step on the given edge. 206 """ 207 return self._getUniversal(tc.LAST_STEP_VEHICLE_ID_LIST, edgeID) 208 209 def getLastStepPersonIDs(self, edgeID): 210 """getLastStepPersonIDs(string) -> list(string) 211 212 Returns the ids of the persons on the given edge during the last time step. 213 """ 214 return self._getUniversal(tc.LAST_STEP_PERSON_ID_LIST, edgeID) 215 216 def adaptTraveltime(self, edgeID, time, begin=None, end=None): 217 """adaptTraveltime(string, double, double, double) -> None 218 219 Adapt the travel time value (in s) used for (re-)routing for the given edge. 220 221 When setting begin time and end time (in seconds), the changes only 222 apply to that time range. Otherwise they apply all the time 223 """ 224 if begin is None and end is None: 225 self._connection._beginMessage( 226 tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_TRAVELTIME, edgeID, 1 + 4 + 1 + 8) 227 self._connection._string += struct.pack("!BiBd", 228 tc.TYPE_COMPOUND, 1, tc.TYPE_DOUBLE, time) 229 self._connection._sendExact() 230 elif begin is not None and end is not None: 231 self._connection._beginMessage( 232 tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_TRAVELTIME, edgeID, 1 + 4 + 1 + 8 + 1 + 8 + 1 + 8) 233 self._connection._string += struct.pack("!BiBdBdBd", 234 tc.TYPE_COMPOUND, 3, 235 tc.TYPE_DOUBLE, begin, 236 tc.TYPE_DOUBLE, end, 237 tc.TYPE_DOUBLE, time) 238 self._connection._sendExact() 239 else: 240 raise TraCIException("Both, begin time and end time must be specified") 241 242 def setEffort(self, edgeID, effort, begin=None, end=None): 243 """setEffort(string, double, double, double) -> None 244 245 Adapt the effort value used for (re-)routing for the given edge. 246 247 When setting begin time and end time (in seconds), the changes only 248 apply to that time range. Otherwise they apply all the time. 249 """ 250 if begin is None and end is None: 251 self._connection._beginMessage( 252 tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_EFFORT, edgeID, 1 + 4 + 1 + 8) 253 self._connection._string += struct.pack("!BiBd", 254 tc.TYPE_COMPOUND, 1, tc.TYPE_DOUBLE, effort) 255 self._connection._sendExact() 256 elif begin is not None and end is not None: 257 self._connection._beginMessage( 258 tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_EFFORT, edgeID, 1 + 4 + 1 + 8 + 1 + 8 + 1 + 8) 259 self._connection._string += struct.pack("!BiBdBdBd", 260 tc.TYPE_COMPOUND, 3, 261 tc.TYPE_DOUBLE, begin, 262 tc.TYPE_DOUBLE, end, 263 tc.TYPE_DOUBLE, effort) 264 self._connection._sendExact() 265 else: 266 raise TraCIException("Both, begin time and end time must be specified") 267 268 def setMaxSpeed(self, edgeID, speed): 269 """setMaxSpeed(string, double) -> None 270 271 Set a new maximum speed (in m/s) for all lanes of the edge. 272 """ 273 self._connection._sendDoubleCmd( 274 tc.CMD_SET_EDGE_VARIABLE, tc.VAR_MAXSPEED, edgeID, speed) 275 276 277EdgeDomain() 278