1 use crate::{AgentType, AlertLocation, CarID, Event, ParkingSpot, TripID, TripMode, TripPhaseType};
2 use abstutil::Counter;
3 use geom::{Distance, Duration, Time};
4 use map_model::{
5     BusRouteID, BusStopID, CompressedMovementID, IntersectionID, LaneID, Map, MovementID,
6     ParkingLotID, Path, PathRequest, RoadID, Traversable,
7 };
8 use serde::{Deserialize, Serialize};
9 use std::collections::{BTreeMap, VecDeque};
10 
11 #[derive(Clone, Serialize, Deserialize)]
12 pub struct Analytics {
13     pub road_thruput: TimeSeriesCount<RoadID>,
14     pub intersection_thruput: TimeSeriesCount<IntersectionID>,
15     // TODO For traffic signals, intersection_thruput could theoretically use this. But that
16     // requires occasionally expensive or complicated summing or merging over all directions of an
17     // intersection. So for now, eat the file size cost.
18     pub traffic_signal_thruput: TimeSeriesCount<CompressedMovementID>,
19 
20     // Unlike everything else in Analytics, this is just for a moment in time.
21     pub demand: BTreeMap<MovementID, usize>,
22 
23     // TODO Reconsider this one
24     pub bus_arrivals: Vec<(Time, CarID, BusRouteID, BusStopID)>,
25     // For each passenger boarding, how long did they wait at the stop?
26     pub passengers_boarding: BTreeMap<BusStopID, Vec<(Time, BusRouteID, Duration)>>,
27     pub passengers_alighting: BTreeMap<BusStopID, Vec<(Time, BusRouteID)>>,
28 
29     pub started_trips: BTreeMap<TripID, Time>,
30     // TODO Hack: No TripMode means aborted
31     // Finish time, ID, mode (or None as aborted), trip duration
32     pub finished_trips: Vec<(Time, TripID, Option<TripMode>, Duration)>,
33     // TODO This subsumes finished_trips
34     pub trip_log: Vec<(Time, TripID, Option<PathRequest>, TripPhaseType)>,
35 
36     // TODO Transit riders aren't represented here yet, just the vehicle they're riding.
37     // Only for traffic signals. The u8 is the movement index from a CompressedMovementID.
38     pub intersection_delays: BTreeMap<IntersectionID, Vec<(u8, Time, Duration, AgentType)>>,
39 
40     // Per parking lane or lot, when does a spot become filled (true) or free (false)
41     pub parking_lane_changes: BTreeMap<LaneID, Vec<(Time, bool)>>,
42     pub parking_lot_changes: BTreeMap<ParkingLotID, Vec<(Time, bool)>>,
43 
44     pub(crate) alerts: Vec<(Time, AlertLocation, String)>,
45 
46     // After we restore from a savestate, don't record anything. This is only going to make sense
47     // if savestates are only used for quickly previewing against prebaked results, where we have
48     // the full Analytics anyway.
49     record_anything: bool,
50 }
51 
52 impl Analytics {
new() -> Analytics53     pub fn new() -> Analytics {
54         Analytics {
55             road_thruput: TimeSeriesCount::new(),
56             intersection_thruput: TimeSeriesCount::new(),
57             traffic_signal_thruput: TimeSeriesCount::new(),
58             demand: BTreeMap::new(),
59             bus_arrivals: Vec::new(),
60             passengers_boarding: BTreeMap::new(),
61             passengers_alighting: BTreeMap::new(),
62             started_trips: BTreeMap::new(),
63             finished_trips: Vec::new(),
64             trip_log: Vec::new(),
65             intersection_delays: BTreeMap::new(),
66             parking_lane_changes: BTreeMap::new(),
67             parking_lot_changes: BTreeMap::new(),
68             alerts: Vec::new(),
69             record_anything: true,
70         }
71     }
72 
event(&mut self, ev: Event, time: Time, map: &Map)73     pub fn event(&mut self, ev: Event, time: Time, map: &Map) {
74         if !self.record_anything {
75             return;
76         }
77 
78         // Throughput
79         if let Event::AgentEntersTraversable(a, to, passengers) = ev {
80             match to {
81                 Traversable::Lane(l) => {
82                     self.road_thruput
83                         .record(time, map.get_l(l).parent, a.to_type(), 1);
84                     if let Some(n) = passengers {
85                         self.road_thruput.record(
86                             time,
87                             map.get_l(l).parent,
88                             AgentType::TransitRider,
89                             n,
90                         );
91                     }
92                 }
93                 Traversable::Turn(t) => {
94                     self.intersection_thruput
95                         .record(time, t.parent, a.to_type(), 1);
96                     if let Some(n) = passengers {
97                         self.intersection_thruput.record(
98                             time,
99                             t.parent,
100                             AgentType::TransitRider,
101                             n,
102                         );
103                     }
104 
105                     if let Some(id) = map.get_movement(t) {
106                         *self.demand.entry(id).or_insert(0) -= 1;
107 
108                         let m = map.get_traffic_signal(t.parent).compressed_id(t);
109                         self.traffic_signal_thruput.record(time, m, a.to_type(), 1);
110                         if let Some(n) = passengers {
111                             self.traffic_signal_thruput
112                                 .record(time, m, AgentType::TransitRider, n);
113                         }
114                     }
115                 }
116             };
117         }
118         match ev {
119             Event::PersonLeavesMap(_, maybe_a, i, _) => {
120                 // Ignore aborted trips
121                 if let Some(a) = maybe_a {
122                     self.intersection_thruput.record(time, i, a.to_type(), 1);
123                 }
124             }
125             Event::PersonEntersMap(_, a, i, _) => {
126                 self.intersection_thruput.record(time, i, a.to_type(), 1);
127             }
128             _ => {}
129         }
130 
131         // Bus arrivals
132         if let Event::BusArrivedAtStop(bus, route, stop) = ev {
133             self.bus_arrivals.push((time, bus, route, stop));
134         }
135 
136         // Passengers boarding/alighting
137         if let Event::PassengerBoardsTransit(_, _, route, stop, waiting) = ev {
138             self.passengers_boarding
139                 .entry(stop)
140                 .or_insert_with(Vec::new)
141                 .push((time, route, waiting));
142         }
143         if let Event::PassengerAlightsTransit(_, _, route, stop) = ev {
144             self.passengers_alighting
145                 .entry(stop)
146                 .or_insert_with(Vec::new)
147                 .push((time, route));
148         }
149 
150         // Started trips
151         if let Event::TripPhaseStarting(id, _, _, _) = ev {
152             self.started_trips.entry(id).or_insert(time);
153         }
154 
155         // Finished trips
156         if let Event::TripFinished {
157             trip,
158             mode,
159             total_time,
160             ..
161         } = ev
162         {
163             self.finished_trips
164                 .push((time, trip, Some(mode), total_time));
165         } else if let Event::TripAborted(id) = ev {
166             self.started_trips.entry(id).or_insert(time);
167             self.finished_trips.push((time, id, None, Duration::ZERO));
168         }
169 
170         // Intersection delays
171         if let Event::IntersectionDelayMeasured(id, delay, agent) = ev {
172             self.intersection_delays
173                 .entry(id.i)
174                 .or_insert_with(Vec::new)
175                 .push((id.idx, time, delay, agent.to_type()));
176         }
177 
178         // Parking spot changes
179         if let Event::CarReachedParkingSpot(_, spot) = ev {
180             if let ParkingSpot::Onstreet(l, _) = spot {
181                 self.parking_lane_changes
182                     .entry(l)
183                     .or_insert_with(Vec::new)
184                     .push((time, true));
185             } else if let ParkingSpot::Lot(pl, _) = spot {
186                 self.parking_lot_changes
187                     .entry(pl)
188                     .or_insert_with(Vec::new)
189                     .push((time, true));
190             }
191         }
192         if let Event::CarLeftParkingSpot(_, spot) = ev {
193             if let ParkingSpot::Onstreet(l, _) = spot {
194                 self.parking_lane_changes
195                     .entry(l)
196                     .or_insert_with(Vec::new)
197                     .push((time, false));
198             } else if let ParkingSpot::Lot(pl, _) = spot {
199                 self.parking_lot_changes
200                     .entry(pl)
201                     .or_insert_with(Vec::new)
202                     .push((time, false));
203             }
204         }
205 
206         // TODO Kinda hacky, but these all consume the event, so kinda bundle em.
207         match ev {
208             Event::TripPhaseStarting(id, _, maybe_req, phase_type) => {
209                 self.trip_log.push((time, id, maybe_req, phase_type));
210             }
211             Event::TripAborted(id) => {
212                 self.trip_log.push((time, id, None, TripPhaseType::Aborted));
213             }
214             Event::TripFinished { trip, .. } => {
215                 self.trip_log
216                     .push((time, trip, None, TripPhaseType::Finished));
217             }
218             Event::PathAmended(path) => {
219                 self.record_demand(&path, map);
220             }
221             Event::Alert(loc, msg) => {
222                 self.alerts.push((time, loc, msg));
223             }
224             _ => {}
225         }
226     }
227 
record_demand(&mut self, path: &Path, map: &Map)228     pub fn record_demand(&mut self, path: &Path, map: &Map) {
229         for step in path.get_steps() {
230             if let Traversable::Turn(t) = step.as_traversable() {
231                 if let Some(id) = map.get_movement(t) {
232                     *self.demand.entry(id).or_insert(0) += 1;
233                 }
234             }
235         }
236     }
237 
238     // TODO If these ever need to be speeded up, just cache the histogram and index in the events
239     // list.
240 
241     // Ignores the current time. Returns None for aborted trips.
finished_trip_time(&self, trip: TripID) -> Option<Duration>242     pub fn finished_trip_time(&self, trip: TripID) -> Option<Duration> {
243         // TODO This is so inefficient!
244         for (_, id, maybe_mode, dt) in &self.finished_trips {
245             if *id == trip {
246                 if maybe_mode.is_some() {
247                     return Some(*dt);
248                 } else {
249                     return None;
250                 }
251             }
252         }
253         None
254     }
255 
256     // Returns pairs of trip times for finished trips in both worlds. (before, after, mode)
both_finished_trips( &self, now: Time, before: &Analytics, ) -> Vec<(Duration, Duration, TripMode)>257     pub fn both_finished_trips(
258         &self,
259         now: Time,
260         before: &Analytics,
261     ) -> Vec<(Duration, Duration, TripMode)> {
262         let mut a = BTreeMap::new();
263         for (t, id, maybe_mode, dt) in &self.finished_trips {
264             if *t > now {
265                 break;
266             }
267             if maybe_mode.is_some() {
268                 a.insert(*id, *dt);
269             }
270         }
271 
272         let mut results = Vec::new();
273         for (t, id, maybe_mode, dt) in &before.finished_trips {
274             if *t > now {
275                 break;
276             }
277             if let Some(mode) = maybe_mode {
278                 if let Some(dt1) = a.remove(id) {
279                     results.push((*dt, dt1, *mode));
280                 }
281             }
282         }
283         results
284     }
285 
286     // Find intersections where the cumulative sum of delay has changed. Negative means faster.
compare_delay(&self, now: Time, before: &Analytics) -> Vec<(IntersectionID, Duration)>287     pub fn compare_delay(&self, now: Time, before: &Analytics) -> Vec<(IntersectionID, Duration)> {
288         let mut results = Vec::new();
289         for (i, list1) in &self.intersection_delays {
290             if let Some(list2) = before.intersection_delays.get(i) {
291                 let mut sum1 = Duration::ZERO;
292                 for (_, t, dt, _) in list1 {
293                     if *t > now {
294                         break;
295                     }
296                     sum1 += *dt;
297                 }
298 
299                 let mut sum2 = Duration::ZERO;
300                 for (_, t, dt, _) in list2 {
301                     if *t > now {
302                         break;
303                     }
304                     sum2 += *dt;
305                 }
306 
307                 if sum1 != sum2 {
308                     results.push((*i, sum1 - sum2));
309                 }
310             }
311         }
312         results
313     }
314 
get_trip_phases(&self, trip: TripID, map: &Map) -> Vec<TripPhase>315     pub fn get_trip_phases(&self, trip: TripID, map: &Map) -> Vec<TripPhase> {
316         let mut phases: Vec<TripPhase> = Vec::new();
317         for (t, id, maybe_req, phase_type) in &self.trip_log {
318             if *id != trip {
319                 continue;
320             }
321             if let Some(ref mut last) = phases.last_mut() {
322                 last.end_time = Some(*t);
323             }
324             if *phase_type == TripPhaseType::Finished || *phase_type == TripPhaseType::Aborted {
325                 break;
326             }
327             phases.push(TripPhase {
328                 start_time: *t,
329                 end_time: None,
330                 // Unwrap should be safe, because this is the request that was actually done...
331                 // TODO Not if this is prebaked data and we've made edits. Woops.
332                 path: maybe_req.as_ref().and_then(|req| {
333                     map.pathfind(req.clone())
334                         .map(|path| (req.start.dist_along(), path))
335                 }),
336                 has_path_req: maybe_req.is_some(),
337                 phase_type: *phase_type,
338             })
339         }
340         phases
341     }
342 
get_all_trip_phases(&self) -> BTreeMap<TripID, Vec<TripPhase>>343     pub fn get_all_trip_phases(&self) -> BTreeMap<TripID, Vec<TripPhase>> {
344         let mut trips = BTreeMap::new();
345         for (t, id, maybe_req, phase_type) in &self.trip_log {
346             let phases: &mut Vec<TripPhase> = trips.entry(*id).or_insert_with(Vec::new);
347             if let Some(ref mut last) = phases.last_mut() {
348                 last.end_time = Some(*t);
349             }
350             if *phase_type == TripPhaseType::Finished {
351                 continue;
352             }
353             // Remove aborted trips
354             if *phase_type == TripPhaseType::Aborted {
355                 trips.remove(id);
356                 continue;
357             }
358             phases.push(TripPhase {
359                 start_time: *t,
360                 end_time: None,
361                 // Don't compute any paths
362                 path: None,
363                 has_path_req: maybe_req.is_some(),
364                 phase_type: *phase_type,
365             })
366         }
367         trips
368     }
369 
active_agents(&self, now: Time) -> Vec<(Time, usize)>370     pub fn active_agents(&self, now: Time) -> Vec<(Time, usize)> {
371         let mut starts_stops: Vec<(Time, bool)> = Vec::new();
372         for t in self.started_trips.values() {
373             if *t <= now {
374                 starts_stops.push((*t, false));
375             }
376         }
377         for (t, _, _, _) in &self.finished_trips {
378             if *t > now {
379                 break;
380             }
381             starts_stops.push((*t, true));
382         }
383         // Make sure the start events get sorted before the stops.
384         starts_stops.sort();
385 
386         let mut pts = Vec::new();
387         let mut cnt = 0;
388         let mut last_t = Time::START_OF_DAY;
389         for (t, ended) in starts_stops {
390             if t != last_t {
391                 // Step functions. Don't interpolate.
392                 pts.push((last_t, cnt));
393             }
394             last_t = t;
395             if ended {
396                 // release mode disables this check, so...
397                 if cnt == 0 {
398                     panic!("active_agents at {} has more ended trips than started", t);
399                 }
400                 cnt -= 1;
401             } else {
402                 cnt += 1;
403             }
404         }
405         pts.push((last_t, cnt));
406         if last_t != now {
407             pts.push((now, cnt));
408         }
409         pts
410     }
411 
412     // Returns the free spots over time
parking_lane_availability( &self, now: Time, l: LaneID, capacity: usize, ) -> Vec<(Time, usize)>413     pub fn parking_lane_availability(
414         &self,
415         now: Time,
416         l: LaneID,
417         capacity: usize,
418     ) -> Vec<(Time, usize)> {
419         if let Some(changes) = self.parking_lane_changes.get(&l) {
420             Analytics::parking_spot_availability(now, changes, capacity)
421         } else {
422             vec![(Time::START_OF_DAY, capacity), (now, capacity)]
423         }
424     }
parking_lot_availability( &self, now: Time, pl: ParkingLotID, capacity: usize, ) -> Vec<(Time, usize)>425     pub fn parking_lot_availability(
426         &self,
427         now: Time,
428         pl: ParkingLotID,
429         capacity: usize,
430     ) -> Vec<(Time, usize)> {
431         if let Some(changes) = self.parking_lot_changes.get(&pl) {
432             Analytics::parking_spot_availability(now, changes, capacity)
433         } else {
434             vec![(Time::START_OF_DAY, capacity), (now, capacity)]
435         }
436     }
437 
parking_spot_availability( now: Time, changes: &Vec<(Time, bool)>, capacity: usize, ) -> Vec<(Time, usize)>438     fn parking_spot_availability(
439         now: Time,
440         changes: &Vec<(Time, bool)>,
441         capacity: usize,
442     ) -> Vec<(Time, usize)> {
443         let mut pts = Vec::new();
444         let mut cnt = capacity;
445         let mut last_t = Time::START_OF_DAY;
446 
447         for (t, filled) in changes {
448             if *t > now {
449                 break;
450             }
451             if *t != last_t {
452                 // Step functions. Don't interpolate.
453                 pts.push((last_t, cnt));
454             }
455             last_t = *t;
456             if *filled {
457                 if cnt == 0 {
458                     panic!("parking_spot_availability at {} went below 0", t);
459                 }
460                 cnt -= 1;
461             } else {
462                 cnt += 1;
463             }
464         }
465         pts.push((last_t, cnt));
466         if last_t != now {
467             pts.push((now, cnt));
468         }
469         pts
470     }
471 }
472 
473 impl Default for Analytics {
default() -> Analytics474     fn default() -> Analytics {
475         let mut a = Analytics::new();
476         a.record_anything = false;
477         a
478     }
479 }
480 
481 #[derive(Debug)]
482 pub struct TripPhase {
483     pub start_time: Time,
484     pub end_time: Option<Time>,
485     // Plumb along start distance
486     pub path: Option<(Distance, Path)>,
487     pub has_path_req: bool,
488     pub phase_type: TripPhaseType,
489 }
490 
491 #[derive(Clone, Serialize, Deserialize)]
492 pub struct TimeSeriesCount<X: Ord + Clone> {
493     // (Road or intersection, type, hour block) -> count for that hour
494     pub counts: BTreeMap<(X, AgentType, usize), usize>,
495 
496     // Very expensive to store, so it's optional. But useful to flag on to experiment with
497     // representations better than the hour count above.
498     pub raw: Vec<(Time, AgentType, X)>,
499 }
500 
501 impl<X: Ord + Clone> TimeSeriesCount<X> {
new() -> TimeSeriesCount<X>502     fn new() -> TimeSeriesCount<X> {
503         TimeSeriesCount {
504             counts: BTreeMap::new(),
505             raw: Vec::new(),
506         }
507     }
508 
record(&mut self, time: Time, id: X, agent_type: AgentType, count: usize)509     fn record(&mut self, time: Time, id: X, agent_type: AgentType, count: usize) {
510         // TODO Manually change flag
511         if false {
512             // TODO Woo, handling transit passengers is even more expensive in this already
513             // expensive representation...
514             for _ in 0..count {
515                 self.raw.push((time, agent_type, id.clone()));
516             }
517         }
518 
519         let hour = time.get_parts().0;
520         *self.counts.entry((id, agent_type, hour)).or_insert(0) += count;
521     }
522 
total_for(&self, id: X) -> usize523     pub fn total_for(&self, id: X) -> usize {
524         let mut cnt = 0;
525         for agent_type in AgentType::all() {
526             // TODO Hmm
527             for hour in 0..24 {
528                 cnt += self
529                     .counts
530                     .get(&(id.clone(), agent_type, hour))
531                     .cloned()
532                     .unwrap_or(0);
533             }
534         }
535         cnt
536     }
537 
all_total_counts(&self) -> Counter<X>538     pub fn all_total_counts(&self) -> Counter<X> {
539         let mut cnt = Counter::new();
540         for ((id, _, _), value) in &self.counts {
541             cnt.add(id.clone(), *value);
542         }
543         cnt
544     }
545 
count_per_hour(&self, id: X, time: Time) -> Vec<(AgentType, Vec<(Time, usize)>)>546     pub fn count_per_hour(&self, id: X, time: Time) -> Vec<(AgentType, Vec<(Time, usize)>)> {
547         let hour = time.get_hours();
548         let mut results = Vec::new();
549         for agent_type in AgentType::all() {
550             let mut pts = Vec::new();
551             for hour in 0..=hour {
552                 let cnt = self
553                     .counts
554                     .get(&(id.clone(), agent_type, hour))
555                     .cloned()
556                     .unwrap_or(0);
557                 pts.push((Time::START_OF_DAY + Duration::hours(hour), cnt));
558                 pts.push((Time::START_OF_DAY + Duration::hours(hour + 1), cnt));
559             }
560             pts.pop();
561             results.push((agent_type, pts));
562         }
563         results
564     }
565 
raw_throughput(&self, now: Time, id: X) -> Vec<(AgentType, Vec<(Time, usize)>)>566     pub fn raw_throughput(&self, now: Time, id: X) -> Vec<(AgentType, Vec<(Time, usize)>)> {
567         let window_size = Duration::hours(1);
568         let mut pts_per_type: BTreeMap<AgentType, Vec<(Time, usize)>> = BTreeMap::new();
569         let mut windows_per_type: BTreeMap<AgentType, Window> = BTreeMap::new();
570         for agent_type in AgentType::all() {
571             pts_per_type.insert(agent_type, vec![(Time::START_OF_DAY, 0)]);
572             windows_per_type.insert(agent_type, Window::new(window_size));
573         }
574 
575         for (t, agent_type, x) in &self.raw {
576             if *x != id {
577                 continue;
578             }
579             if *t > now {
580                 break;
581             }
582 
583             let count = windows_per_type.get_mut(agent_type).unwrap().add(*t);
584             pts_per_type.get_mut(agent_type).unwrap().push((*t, count));
585         }
586 
587         for (agent_type, pts) in pts_per_type.iter_mut() {
588             let mut window = windows_per_type.remove(agent_type).unwrap();
589 
590             // Add a drop-off after window_size (+ a little epsilon!)
591             let t = (pts.last().unwrap().0 + window_size + Duration::seconds(0.1)).min(now);
592             if pts.last().unwrap().0 != t {
593                 pts.push((t, window.count(t)));
594             }
595 
596             if pts.last().unwrap().0 != now {
597                 pts.push((now, window.count(now)));
598             }
599         }
600 
601         pts_per_type.into_iter().collect()
602     }
603 }
604 
605 pub struct Window {
606     times: VecDeque<Time>,
607     window_size: Duration,
608 }
609 
610 impl Window {
new(window_size: Duration) -> Window611     pub fn new(window_size: Duration) -> Window {
612         Window {
613             times: VecDeque::new(),
614             window_size,
615         }
616     }
617 
618     // Returns the count at time
add(&mut self, time: Time) -> usize619     pub fn add(&mut self, time: Time) -> usize {
620         self.times.push_back(time);
621         self.count(time)
622     }
623 
624     // Grab the count at this time, but don't add a new time
count(&mut self, end: Time) -> usize625     pub fn count(&mut self, end: Time) -> usize {
626         while !self.times.is_empty() && end - *self.times.front().unwrap() > self.window_size {
627             self.times.pop_front();
628         }
629         self.times.len()
630     }
631 }
632