1 /*
2  *  Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "test/network/network_emulation.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 
17 #include "api/units/data_size.h"
18 #include "rtc_base/bind.h"
19 #include "rtc_base/logging.h"
20 
21 namespace webrtc {
22 
OnPacketReceived(EmulatedIpPacket packet)23 void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
24   task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
25     RTC_DCHECK_RUN_ON(task_queue_);
26 
27     uint64_t packet_id = next_packet_id_++;
28     bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
29         packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
30     if (sent) {
31       packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
32     }
33     if (process_task_.Running())
34       return;
35     absl::optional<int64_t> next_time_us =
36         network_behavior_->NextDeliveryTimeUs();
37     if (!next_time_us)
38       return;
39     Timestamp current_time = clock_->CurrentTime();
40     process_task_ = RepeatingTaskHandle::DelayedStart(
41         task_queue_->Get(),
42         std::max(TimeDelta::Zero(),
43                  Timestamp::Micros(*next_time_us) - current_time),
44         [this]() {
45           RTC_DCHECK_RUN_ON(task_queue_);
46           Timestamp current_time = clock_->CurrentTime();
47           Process(current_time);
48           absl::optional<int64_t> next_time_us =
49               network_behavior_->NextDeliveryTimeUs();
50           if (!next_time_us) {
51             process_task_.Stop();
52             return TimeDelta::Zero();  // This is ignored.
53           }
54           RTC_DCHECK_GE(*next_time_us, current_time.us());
55           return Timestamp::Micros(*next_time_us) - current_time;
56         });
57   });
58 }
59 
Process(Timestamp at_time)60 void LinkEmulation::Process(Timestamp at_time) {
61   std::vector<PacketDeliveryInfo> delivery_infos =
62       network_behavior_->DequeueDeliverablePackets(at_time.us());
63   for (PacketDeliveryInfo& delivery_info : delivery_infos) {
64     StoredPacket* packet = nullptr;
65     for (auto& stored_packet : packets_) {
66       if (stored_packet.id == delivery_info.packet_id) {
67         packet = &stored_packet;
68         break;
69       }
70     }
71     RTC_CHECK(packet);
72     RTC_DCHECK(!packet->removed);
73     packet->removed = true;
74 
75     if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
76       packet->packet.arrival_time =
77           Timestamp::Micros(delivery_info.receive_time_us);
78       receiver_->OnPacketReceived(std::move(packet->packet));
79     }
80     while (!packets_.empty() && packets_.front().removed) {
81       packets_.pop_front();
82     }
83   }
84 }
85 
NetworkRouterNode(rtc::TaskQueue * task_queue)86 NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
87     : task_queue_(task_queue) {}
88 
OnPacketReceived(EmulatedIpPacket packet)89 void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
90   RTC_DCHECK_RUN_ON(task_queue_);
91   if (watcher_) {
92     watcher_(packet);
93   }
94   if (filter_) {
95     if (!filter_(packet))
96       return;
97   }
98   auto receiver_it = routing_.find(packet.to.ipaddr());
99   if (receiver_it == routing_.end()) {
100     return;
101   }
102   RTC_CHECK(receiver_it != routing_.end());
103 
104   receiver_it->second->OnPacketReceived(std::move(packet));
105 }
106 
SetReceiver(const rtc::IPAddress & dest_ip,EmulatedNetworkReceiverInterface * receiver)107 void NetworkRouterNode::SetReceiver(
108     const rtc::IPAddress& dest_ip,
109     EmulatedNetworkReceiverInterface* receiver) {
110   task_queue_->PostTask([=] {
111     RTC_DCHECK_RUN_ON(task_queue_);
112     EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
113     RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
114         << "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
115     routing_[dest_ip] = receiver;
116   });
117 }
118 
RemoveReceiver(const rtc::IPAddress & dest_ip)119 void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
120   RTC_DCHECK_RUN_ON(task_queue_);
121   routing_.erase(dest_ip);
122 }
123 
SetWatcher(std::function<void (const EmulatedIpPacket &)> watcher)124 void NetworkRouterNode::SetWatcher(
125     std::function<void(const EmulatedIpPacket&)> watcher) {
126   task_queue_->PostTask([=] {
127     RTC_DCHECK_RUN_ON(task_queue_);
128     watcher_ = watcher;
129   });
130 }
131 
SetFilter(std::function<bool (const EmulatedIpPacket &)> filter)132 void NetworkRouterNode::SetFilter(
133     std::function<bool(const EmulatedIpPacket&)> filter) {
134   task_queue_->PostTask([=] {
135     RTC_DCHECK_RUN_ON(task_queue_);
136     filter_ = filter;
137   });
138 }
139 
EmulatedNetworkNode(Clock * clock,rtc::TaskQueue * task_queue,std::unique_ptr<NetworkBehaviorInterface> network_behavior)140 EmulatedNetworkNode::EmulatedNetworkNode(
141     Clock* clock,
142     rtc::TaskQueue* task_queue,
143     std::unique_ptr<NetworkBehaviorInterface> network_behavior)
144     : router_(task_queue),
145       link_(clock, task_queue, std::move(network_behavior), &router_) {}
146 
OnPacketReceived(EmulatedIpPacket packet)147 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
148   link_.OnPacketReceived(std::move(packet));
149 }
150 
CreateRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes,EmulatedNetworkReceiverInterface * receiver)151 void EmulatedNetworkNode::CreateRoute(
152     const rtc::IPAddress& receiver_ip,
153     std::vector<EmulatedNetworkNode*> nodes,
154     EmulatedNetworkReceiverInterface* receiver) {
155   RTC_CHECK(!nodes.empty());
156   for (size_t i = 0; i + 1 < nodes.size(); ++i)
157     nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
158   nodes.back()->router()->SetReceiver(receiver_ip, receiver);
159 }
160 
ClearRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes)161 void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
162                                      std::vector<EmulatedNetworkNode*> nodes) {
163   for (EmulatedNetworkNode* node : nodes)
164     node->router()->RemoveReceiver(receiver_ip);
165 }
166 
167 EmulatedNetworkNode::~EmulatedNetworkNode() = default;
168 
EmulatedEndpointImpl(uint64_t id,const rtc::IPAddress & ip,bool is_enabled,rtc::AdapterType type,rtc::TaskQueue * task_queue,Clock * clock)169 EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id,
170                                            const rtc::IPAddress& ip,
171                                            bool is_enabled,
172                                            rtc::AdapterType type,
173                                            rtc::TaskQueue* task_queue,
174                                            Clock* clock)
175     : id_(id),
176       peer_local_addr_(ip),
177       is_enabled_(is_enabled),
178       type_(type),
179       clock_(clock),
180       task_queue_(task_queue),
181       router_(task_queue_),
182       next_port_(kFirstEphemeralPort) {
183   constexpr int kIPv4NetworkPrefixLength = 24;
184   constexpr int kIPv6NetworkPrefixLength = 64;
185 
186   int prefix_length = 0;
187   if (ip.family() == AF_INET) {
188     prefix_length = kIPv4NetworkPrefixLength;
189   } else if (ip.family() == AF_INET6) {
190     prefix_length = kIPv6NetworkPrefixLength;
191   }
192   rtc::IPAddress prefix = TruncateIP(ip, prefix_length);
193   network_ = std::make_unique<rtc::Network>(
194       ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix,
195       prefix_length, type_);
196   network_->AddIP(ip);
197 
198   enabled_state_checker_.Detach();
199 }
200 EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
201 
GetId() const202 uint64_t EmulatedEndpointImpl::GetId() const {
203   return id_;
204 }
205 
SendPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,rtc::CopyOnWriteBuffer packet_data,uint16_t application_overhead)206 void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
207                                       const rtc::SocketAddress& to,
208                                       rtc::CopyOnWriteBuffer packet_data,
209                                       uint16_t application_overhead) {
210   RTC_CHECK(from.ipaddr() == peer_local_addr_);
211   EmulatedIpPacket packet(from, to, std::move(packet_data),
212                           clock_->CurrentTime(), application_overhead);
213   task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
214     RTC_DCHECK_RUN_ON(task_queue_);
215     Timestamp current_time = clock_->CurrentTime();
216     if (stats_.first_packet_sent_time.IsInfinite()) {
217       stats_.first_packet_sent_time = current_time;
218       stats_.first_sent_packet_size = DataSize::Bytes(packet.ip_packet_size());
219     }
220     stats_.last_packet_sent_time = current_time;
221     stats_.packets_sent++;
222     stats_.bytes_sent += DataSize::Bytes(packet.ip_packet_size());
223 
224     router_.OnPacketReceived(std::move(packet));
225   });
226 }
227 
BindReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)228 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
229     uint16_t desired_port,
230     EmulatedNetworkReceiverInterface* receiver) {
231   rtc::CritScope crit(&receiver_lock_);
232   uint16_t port = desired_port;
233   if (port == 0) {
234     // Because client can specify its own port, next_port_ can be already in
235     // use, so we need to find next available port.
236     int ports_pool_size =
237         std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
238     for (int i = 0; i < ports_pool_size; ++i) {
239       uint16_t next_port = NextPort();
240       if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
241         port = next_port;
242         break;
243       }
244     }
245   }
246   RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
247                        << id_;
248   bool result = port_to_receiver_.insert({port, receiver}).second;
249   if (!result) {
250     RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
251                   << " in endpoint " << id_;
252     return absl::nullopt;
253   }
254   RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
255                 << port;
256   return port;
257 }
258 
NextPort()259 uint16_t EmulatedEndpointImpl::NextPort() {
260   uint16_t out = next_port_;
261   if (next_port_ == std::numeric_limits<uint16_t>::max()) {
262     next_port_ = kFirstEphemeralPort;
263   } else {
264     next_port_++;
265   }
266   return out;
267 }
268 
UnbindReceiver(uint16_t port)269 void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
270   rtc::CritScope crit(&receiver_lock_);
271   port_to_receiver_.erase(port);
272 }
273 
GetPeerLocalAddress() const274 rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
275   return peer_local_addr_;
276 }
277 
OnPacketReceived(EmulatedIpPacket packet)278 void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
279   RTC_DCHECK_RUN_ON(task_queue_);
280   RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
281       << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
282       << packet.to.ipaddr().ToString()
283       << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
284   rtc::CritScope crit(&receiver_lock_);
285   UpdateReceiveStats(packet);
286   auto it = port_to_receiver_.find(packet.to.port());
287   if (it == port_to_receiver_.end()) {
288     // It can happen, that remote peer closed connection, but there still some
289     // packets, that are going to it. It can happen during peer connection close
290     // process: one peer closed connection, second still sending data.
291     RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
292                   << " on port " << packet.to.port();
293     stats_.packets_dropped++;
294     stats_.bytes_dropped += DataSize::Bytes(packet.ip_packet_size());
295     return;
296   }
297   // Endpoint assumes frequent calls to bind and unbind methods, so it holds
298   // lock during packet processing to ensure that receiver won't be deleted
299   // before call to OnPacketReceived.
300   it->second->OnPacketReceived(std::move(packet));
301 }
302 
Enable()303 void EmulatedEndpointImpl::Enable() {
304   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
305   RTC_CHECK(!is_enabled_);
306   is_enabled_ = true;
307 }
308 
Disable()309 void EmulatedEndpointImpl::Disable() {
310   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
311   RTC_CHECK(is_enabled_);
312   is_enabled_ = false;
313 }
314 
Enabled() const315 bool EmulatedEndpointImpl::Enabled() const {
316   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
317   return is_enabled_;
318 }
319 
stats()320 EmulatedNetworkStats EmulatedEndpointImpl::stats() {
321   RTC_DCHECK_RUN_ON(task_queue_);
322   return stats_;
323 }
324 
UpdateReceiveStats(const EmulatedIpPacket & packet)325 void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) {
326   RTC_DCHECK_RUN_ON(task_queue_);
327   Timestamp current_time = clock_->CurrentTime();
328   if (stats_.first_packet_received_time.IsInfinite()) {
329     stats_.first_packet_received_time = current_time;
330     stats_.first_received_packet_size =
331         DataSize::Bytes(packet.ip_packet_size());
332   }
333   stats_.last_packet_received_time = current_time;
334   stats_.packets_received++;
335   stats_.bytes_received += DataSize::Bytes(packet.ip_packet_size());
336 }
337 
EndpointsContainer(const std::vector<EmulatedEndpointImpl * > & endpoints)338 EndpointsContainer::EndpointsContainer(
339     const std::vector<EmulatedEndpointImpl*>& endpoints)
340     : endpoints_(endpoints) {}
341 
LookupByLocalAddress(const rtc::IPAddress & local_ip) const342 EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
343     const rtc::IPAddress& local_ip) const {
344   for (auto* endpoint : endpoints_) {
345     rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
346     if (peer_local_address == local_ip) {
347       return endpoint;
348     }
349   }
350   RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
351 }
352 
HasEndpoint(EmulatedEndpointImpl * endpoint) const353 bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
354   for (auto* e : endpoints_) {
355     if (e->GetId() == endpoint->GetId()) {
356       return true;
357     }
358   }
359   return false;
360 }
361 
362 std::vector<std::unique_ptr<rtc::Network>>
GetEnabledNetworks() const363 EndpointsContainer::GetEnabledNetworks() const {
364   std::vector<std::unique_ptr<rtc::Network>> networks;
365   for (auto* endpoint : endpoints_) {
366     if (endpoint->Enabled()) {
367       networks.emplace_back(
368           std::make_unique<rtc::Network>(endpoint->network()));
369     }
370   }
371   return networks;
372 }
373 
GetStats() const374 EmulatedNetworkStats EndpointsContainer::GetStats() const {
375   EmulatedNetworkStats stats;
376   for (auto* endpoint : endpoints_) {
377     EmulatedNetworkStats endpoint_stats = endpoint->stats();
378     stats.packets_sent += endpoint_stats.packets_sent;
379     stats.bytes_sent += endpoint_stats.bytes_sent;
380     stats.packets_received += endpoint_stats.packets_received;
381     stats.bytes_received += endpoint_stats.bytes_received;
382     stats.packets_dropped += endpoint_stats.packets_dropped;
383     stats.bytes_dropped += endpoint_stats.bytes_dropped;
384     if (stats.first_packet_received_time >
385         endpoint_stats.first_packet_received_time) {
386       stats.first_packet_received_time =
387           endpoint_stats.first_packet_received_time;
388       stats.first_received_packet_size =
389           endpoint_stats.first_received_packet_size;
390     }
391     if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
392       stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
393       stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
394     }
395     if (stats.last_packet_received_time.IsInfinite() ||
396         stats.last_packet_received_time <
397             endpoint_stats.last_packet_received_time) {
398       stats.last_packet_received_time =
399           endpoint_stats.last_packet_received_time;
400     }
401     if (stats.last_packet_sent_time.IsInfinite() ||
402         stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
403       stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
404     }
405   }
406   return stats;
407 }
408 
409 }  // namespace webrtc
410