1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo 2# Copyright (C) 2017-2019 German Aerospace Center (DLR) and others. 3# This program and the accompanying materials 4# are made available under the terms of the Eclipse Public License v2.0 5# which accompanies this distribution, and is available at 6# http://www.eclipse.org/legal/epl-v20.html 7# SPDX-License-Identifier: EPL-2.0 8 9# @file _utils.py 10# @author Leonhard Luecken 11# @date 2018-06-26 12# @version $Id$ 13 14''' 15Utility functions and classes for simpla 16''' 17 18import traci 19 20 21class SimplaException(Exception): 22 ''' 23 Simple exception raised by simpla. 24 ''' 25 26 def __init__(self, *args, **kwargs): 27 super(SimplaException, self).__init__(*args, **kwargs) 28 29 30_activeGapControllers = {} 31DEBUG_GAP_CONTROL = False 32 33 34class GapController(traci.StepListener): 35 ''' 36 class that manages opening a gap in front of its assigned vehicle 37 ''' 38 39 def __init__(self, vehID, desiredGap, desiredSpeedDiff, maximumDecel, duration): 40 ''' 41 GapController() 42 43 @see openGap() 44 ''' 45 self._desiredGap = float(desiredGap) 46 self._desiredSpeedDiff = float(desiredSpeedDiff) 47 self._maximumDecel = float(maximumDecel) 48 self._duration = float(duration) 49 self._vehID = vehID 50 51 # gains for linear controller that should stabilize the desired gap 52# self._gapGain = 0.23 53# self._speedGain = 0.07 54 self._gapGain = 0.04 55 self._speedGain = 0.8 56 57 # leaderHorizon coefficients assures that the leader is detected even if 58 # the leader is farther away than the desired gap. The look ahead distance 59 # given to traci.vehicle.getLeader is min(desiredGap*horizonFactor, minHorizon) 60 self._leaderHorizonFactor = 1.5 61 self._minimalLeaderHorizon = 50 62 63 # step-length of simulation 64 self._TS = traci.simulation.getDeltaT() 65 # last received speed info 66 self._leaderSpeed = None 67 self._egoSpeed = None 68 # whether controlled vehicle has an active setSpeed command 69 self._speedControlActive = False 70 71 if DEBUG_GAP_CONTROL: 72 print("Created GapController for vehicle %s" % self._vehID) 73 print("desiredGap=%s, desiredSpeedDiff=%s, maximumDecel=%s, duration=%s" % ( 74 desiredGap, desiredSpeedDiff, maximumDecel, duration)) 75 76 def step(self, s=0): 77 ''' 78 Perform one control step and count down the activity period for the controller 79 ''' 80 self._applyControl() 81 82 self._duration -= self._TS 83 84 if DEBUG_GAP_CONTROL: 85 print("Step of gap controller with listenerID=%s" % self.getID()) 86 print("Remaining activity period: %s" % self._duration) 87 88 return self._duration > 0 89 90 def _applyControl(self): 91 leaderInfo = traci.vehicle.getLeader(self._vehID, max(self._desiredGap * self._leaderHorizonFactor, 92 self._minimalLeaderHorizon)) 93# leaderInfo = traci.vehicle.getLeader(self._vehID, self._desiredGap) 94 if leaderInfo is None: 95 # no leader 96 self._releaseControl() 97 print("Lost leader...") 98 return 99 100 (leaderID, gap) = leaderInfo 101 assert(leaderID is not "") 102 103 self._egoSpeed = traci.vehicle.getSpeed(self._vehID) 104 self._leaderSpeed = traci.vehicle.getSpeed(leaderID) 105 speedDiff = self._leaderSpeed - self._egoSpeed 106 107 # first determine a desired acceleration from the controller 108 accel = self._accel(speedDiff, gap) 109 110 if DEBUG_GAP_CONTROL: 111 print("GapController's acceleration control for veh '%s':" % self._vehID) 112 print("accel(speedDiff=%s, gapError=%s) = %s" % (speedDiff, gap - self._desiredGap, accel)) 113 114 # assure that the speedDifference is not increased above the desired value 115 if (accel <= 0 and speedDiff - self._TS * accel >= self._desiredSpeedDiff): 116 accel = (speedDiff - self._desiredSpeedDiff) / self._TS 117 elif (accel >= 0 and speedDiff - self._TS * accel <= -self._desiredSpeedDiff): 118 accel = (speedDiff + self._desiredSpeedDiff) / self._TS 119 if DEBUG_GAP_CONTROL: 120 print("Truncating to prevent exceeding desired speedDiff results in\n accel=%s" % accel) 121 122 # assure it does not exceed the [maxdecel, maxaccel] bounds (assuming maximumAccel = maximumDecel) 123 maximumAccel = self._maximumDecel 124 accel = max(min(accel, maximumAccel), -self._maximumDecel) 125 if DEBUG_GAP_CONTROL: 126 print("Truncating to maximal decel/accel results in\n accel=%s" % accel) 127 128 # apply the calculated acceleration 129 self._imposeSpeed(self._egoSpeed + self._TS * accel) 130 131 def _accel(self, speedDiff, gap): 132 ''' 133 Returns the acceleration computed by a linear controller 134 ''' 135 gapError = gap - self._desiredGap 136 return self._gapGain * gapError + self._speedGain * speedDiff 137 138 def _releaseControl(self): 139 ''' 140 Releases the vehicle's speed control such that sumo may take over again 141 ''' 142 if self._speedControlActive: 143 traci.vehicle.setSpeed(self._vehID, -1) 144 self._speedControlActive = False 145 146 def _imposeSpeed(self, speed): 147 ''' 148 Sends a setSpeed command to the vehicle via traci 149 ''' 150 traci.vehicle.setSpeed(self._vehID, max(speed, 0)) 151 self._speedControlActive = True 152 153 def cleanUp(self): 154 global _activeGapControllers 155 del _activeGapControllers[self._vehID] 156 traci.StepListener.cleanUp(self) 157 self._releaseControl() 158 if DEBUG_GAP_CONTROL: 159 print("Cleaned up stepListener %s." % self.getID()) 160 161 162def openGap(vehID, desiredGap, desiredSpeedDiff, maximumDecel, duration): 163 ''' 164 openGap(string, float>0, float>0, float>0, float>0) 165 166 vehID - ID of the vehicle to be controlled 167 desiredGap - gap that shall be established 168 desiredSpeedDiff - rate at which the gap is open if possible 169 maximumDecel - maximal deceleration at which the desiredSpeedDiff is tried to be approximated 170 duration - The period for which the gap control should be active 171 172 This methods adds a controller for the opening of a gap in front of the given vehicle. 173 The controller stays active for a period of the given duration. 174 If a leader is closer than the desiredGap, the controller tries to establish the desiredGap by inducing the 175 given speedDifference, while not braking harder than maximumDecel. 176 An object of the class GapCreator is created to manage the vehicle state and is added to traci as a stepListener. 177 ''' 178 global _activeGapControllers 179 180 if DEBUG_GAP_CONTROL: 181 print("openGap()") 182 183 # Check type error 184 errorMsg = None 185 if desiredGap <= 0: 186 errorMsg = "simpla.openGap(): Parameter desiredGap has to be a positive float (given value = %s)." % desiredGap 187 elif desiredSpeedDiff <= 0: 188 errorMsg = "simpla.openGap(): Parameter desiredSpeedDiff has to be a positive float (given value = %s)." % ( 189 desiredSpeedDiff) 190 elif maximumDecel <= 0: 191 errorMsg = "simpla.openGap(): Parameter maximumDecel has to be a positive float (given value = %s)." % ( 192 maximumDecel) 193 elif duration <= 0: 194 errorMsg = "simpla.openGap(): Parameter duration has to be a positive float (given value = %s)." % duration 195 if errorMsg is not None: 196 raise SimplaException(errorMsg) 197 198 # remove any previous controller attached to the vehicle 199 removeGapController(vehID) 200 gc = GapController(vehID, desiredGap, desiredSpeedDiff, maximumDecel, duration) 201 listenerID = traci.addStepListener(gc) 202 _activeGapControllers[vehID] = listenerID 203 if DEBUG_GAP_CONTROL: 204 print("Active controllers: %s." % (_activeGapControllers)) 205 206 207def removeGapController(vehID): 208 ''' 209 Removes any current gap controller 210 ''' 211 global _activeGapControllers 212 if DEBUG_GAP_CONTROL: 213 print("removeGapController(%s)\nactive: %s." % (vehID, _activeGapControllers)) 214 if vehID in _activeGapControllers: 215 listenerID = _activeGapControllers[vehID] 216 traci.removeStepListener(listenerID) 217