1 /*
2 * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "test/network/network_emulation.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16
17 #include "api/units/data_size.h"
18 #include "rtc_base/bind.h"
19 #include "rtc_base/logging.h"
20
21 namespace webrtc {
22
OnPacketReceived(EmulatedIpPacket packet)23 void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
24 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
25 RTC_DCHECK_RUN_ON(task_queue_);
26
27 uint64_t packet_id = next_packet_id_++;
28 bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
29 packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
30 if (sent) {
31 packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
32 }
33 if (process_task_.Running())
34 return;
35 absl::optional<int64_t> next_time_us =
36 network_behavior_->NextDeliveryTimeUs();
37 if (!next_time_us)
38 return;
39 Timestamp current_time = clock_->CurrentTime();
40 process_task_ = RepeatingTaskHandle::DelayedStart(
41 task_queue_->Get(),
42 std::max(TimeDelta::Zero(),
43 Timestamp::Micros(*next_time_us) - current_time),
44 [this]() {
45 RTC_DCHECK_RUN_ON(task_queue_);
46 Timestamp current_time = clock_->CurrentTime();
47 Process(current_time);
48 absl::optional<int64_t> next_time_us =
49 network_behavior_->NextDeliveryTimeUs();
50 if (!next_time_us) {
51 process_task_.Stop();
52 return TimeDelta::Zero(); // This is ignored.
53 }
54 RTC_DCHECK_GE(*next_time_us, current_time.us());
55 return Timestamp::Micros(*next_time_us) - current_time;
56 });
57 });
58 }
59
Process(Timestamp at_time)60 void LinkEmulation::Process(Timestamp at_time) {
61 std::vector<PacketDeliveryInfo> delivery_infos =
62 network_behavior_->DequeueDeliverablePackets(at_time.us());
63 for (PacketDeliveryInfo& delivery_info : delivery_infos) {
64 StoredPacket* packet = nullptr;
65 for (auto& stored_packet : packets_) {
66 if (stored_packet.id == delivery_info.packet_id) {
67 packet = &stored_packet;
68 break;
69 }
70 }
71 RTC_CHECK(packet);
72 RTC_DCHECK(!packet->removed);
73 packet->removed = true;
74
75 if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
76 packet->packet.arrival_time =
77 Timestamp::Micros(delivery_info.receive_time_us);
78 receiver_->OnPacketReceived(std::move(packet->packet));
79 }
80 while (!packets_.empty() && packets_.front().removed) {
81 packets_.pop_front();
82 }
83 }
84 }
85
NetworkRouterNode(rtc::TaskQueue * task_queue)86 NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
87 : task_queue_(task_queue) {}
88
OnPacketReceived(EmulatedIpPacket packet)89 void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
90 RTC_DCHECK_RUN_ON(task_queue_);
91 if (watcher_) {
92 watcher_(packet);
93 }
94 if (filter_) {
95 if (!filter_(packet))
96 return;
97 }
98 auto receiver_it = routing_.find(packet.to.ipaddr());
99 if (receiver_it == routing_.end()) {
100 return;
101 }
102 RTC_CHECK(receiver_it != routing_.end());
103
104 receiver_it->second->OnPacketReceived(std::move(packet));
105 }
106
SetReceiver(const rtc::IPAddress & dest_ip,EmulatedNetworkReceiverInterface * receiver)107 void NetworkRouterNode::SetReceiver(
108 const rtc::IPAddress& dest_ip,
109 EmulatedNetworkReceiverInterface* receiver) {
110 task_queue_->PostTask([=] {
111 RTC_DCHECK_RUN_ON(task_queue_);
112 EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
113 RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
114 << "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
115 routing_[dest_ip] = receiver;
116 });
117 }
118
RemoveReceiver(const rtc::IPAddress & dest_ip)119 void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
120 RTC_DCHECK_RUN_ON(task_queue_);
121 routing_.erase(dest_ip);
122 }
123
SetWatcher(std::function<void (const EmulatedIpPacket &)> watcher)124 void NetworkRouterNode::SetWatcher(
125 std::function<void(const EmulatedIpPacket&)> watcher) {
126 task_queue_->PostTask([=] {
127 RTC_DCHECK_RUN_ON(task_queue_);
128 watcher_ = watcher;
129 });
130 }
131
SetFilter(std::function<bool (const EmulatedIpPacket &)> filter)132 void NetworkRouterNode::SetFilter(
133 std::function<bool(const EmulatedIpPacket&)> filter) {
134 task_queue_->PostTask([=] {
135 RTC_DCHECK_RUN_ON(task_queue_);
136 filter_ = filter;
137 });
138 }
139
EmulatedNetworkNode(Clock * clock,rtc::TaskQueue * task_queue,std::unique_ptr<NetworkBehaviorInterface> network_behavior)140 EmulatedNetworkNode::EmulatedNetworkNode(
141 Clock* clock,
142 rtc::TaskQueue* task_queue,
143 std::unique_ptr<NetworkBehaviorInterface> network_behavior)
144 : router_(task_queue),
145 link_(clock, task_queue, std::move(network_behavior), &router_) {}
146
OnPacketReceived(EmulatedIpPacket packet)147 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
148 link_.OnPacketReceived(std::move(packet));
149 }
150
CreateRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes,EmulatedNetworkReceiverInterface * receiver)151 void EmulatedNetworkNode::CreateRoute(
152 const rtc::IPAddress& receiver_ip,
153 std::vector<EmulatedNetworkNode*> nodes,
154 EmulatedNetworkReceiverInterface* receiver) {
155 RTC_CHECK(!nodes.empty());
156 for (size_t i = 0; i + 1 < nodes.size(); ++i)
157 nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
158 nodes.back()->router()->SetReceiver(receiver_ip, receiver);
159 }
160
ClearRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes)161 void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
162 std::vector<EmulatedNetworkNode*> nodes) {
163 for (EmulatedNetworkNode* node : nodes)
164 node->router()->RemoveReceiver(receiver_ip);
165 }
166
167 EmulatedNetworkNode::~EmulatedNetworkNode() = default;
168
EmulatedEndpointImpl(uint64_t id,const rtc::IPAddress & ip,bool is_enabled,rtc::AdapterType type,rtc::TaskQueue * task_queue,Clock * clock)169 EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id,
170 const rtc::IPAddress& ip,
171 bool is_enabled,
172 rtc::AdapterType type,
173 rtc::TaskQueue* task_queue,
174 Clock* clock)
175 : id_(id),
176 peer_local_addr_(ip),
177 is_enabled_(is_enabled),
178 type_(type),
179 clock_(clock),
180 task_queue_(task_queue),
181 router_(task_queue_),
182 next_port_(kFirstEphemeralPort) {
183 constexpr int kIPv4NetworkPrefixLength = 24;
184 constexpr int kIPv6NetworkPrefixLength = 64;
185
186 int prefix_length = 0;
187 if (ip.family() == AF_INET) {
188 prefix_length = kIPv4NetworkPrefixLength;
189 } else if (ip.family() == AF_INET6) {
190 prefix_length = kIPv6NetworkPrefixLength;
191 }
192 rtc::IPAddress prefix = TruncateIP(ip, prefix_length);
193 network_ = std::make_unique<rtc::Network>(
194 ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix,
195 prefix_length, type_);
196 network_->AddIP(ip);
197
198 enabled_state_checker_.Detach();
199 }
200 EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
201
GetId() const202 uint64_t EmulatedEndpointImpl::GetId() const {
203 return id_;
204 }
205
SendPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,rtc::CopyOnWriteBuffer packet_data,uint16_t application_overhead)206 void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
207 const rtc::SocketAddress& to,
208 rtc::CopyOnWriteBuffer packet_data,
209 uint16_t application_overhead) {
210 RTC_CHECK(from.ipaddr() == peer_local_addr_);
211 EmulatedIpPacket packet(from, to, std::move(packet_data),
212 clock_->CurrentTime(), application_overhead);
213 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
214 RTC_DCHECK_RUN_ON(task_queue_);
215 Timestamp current_time = clock_->CurrentTime();
216 if (stats_.first_packet_sent_time.IsInfinite()) {
217 stats_.first_packet_sent_time = current_time;
218 stats_.first_sent_packet_size = DataSize::Bytes(packet.ip_packet_size());
219 }
220 stats_.last_packet_sent_time = current_time;
221 stats_.packets_sent++;
222 stats_.bytes_sent += DataSize::Bytes(packet.ip_packet_size());
223
224 router_.OnPacketReceived(std::move(packet));
225 });
226 }
227
BindReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)228 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
229 uint16_t desired_port,
230 EmulatedNetworkReceiverInterface* receiver) {
231 rtc::CritScope crit(&receiver_lock_);
232 uint16_t port = desired_port;
233 if (port == 0) {
234 // Because client can specify its own port, next_port_ can be already in
235 // use, so we need to find next available port.
236 int ports_pool_size =
237 std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
238 for (int i = 0; i < ports_pool_size; ++i) {
239 uint16_t next_port = NextPort();
240 if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
241 port = next_port;
242 break;
243 }
244 }
245 }
246 RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
247 << id_;
248 bool result = port_to_receiver_.insert({port, receiver}).second;
249 if (!result) {
250 RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
251 << " in endpoint " << id_;
252 return absl::nullopt;
253 }
254 RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
255 << port;
256 return port;
257 }
258
NextPort()259 uint16_t EmulatedEndpointImpl::NextPort() {
260 uint16_t out = next_port_;
261 if (next_port_ == std::numeric_limits<uint16_t>::max()) {
262 next_port_ = kFirstEphemeralPort;
263 } else {
264 next_port_++;
265 }
266 return out;
267 }
268
UnbindReceiver(uint16_t port)269 void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
270 rtc::CritScope crit(&receiver_lock_);
271 port_to_receiver_.erase(port);
272 }
273
GetPeerLocalAddress() const274 rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
275 return peer_local_addr_;
276 }
277
OnPacketReceived(EmulatedIpPacket packet)278 void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
279 RTC_DCHECK_RUN_ON(task_queue_);
280 RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
281 << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
282 << packet.to.ipaddr().ToString()
283 << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
284 rtc::CritScope crit(&receiver_lock_);
285 UpdateReceiveStats(packet);
286 auto it = port_to_receiver_.find(packet.to.port());
287 if (it == port_to_receiver_.end()) {
288 // It can happen, that remote peer closed connection, but there still some
289 // packets, that are going to it. It can happen during peer connection close
290 // process: one peer closed connection, second still sending data.
291 RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
292 << " on port " << packet.to.port();
293 stats_.packets_dropped++;
294 stats_.bytes_dropped += DataSize::Bytes(packet.ip_packet_size());
295 return;
296 }
297 // Endpoint assumes frequent calls to bind and unbind methods, so it holds
298 // lock during packet processing to ensure that receiver won't be deleted
299 // before call to OnPacketReceived.
300 it->second->OnPacketReceived(std::move(packet));
301 }
302
Enable()303 void EmulatedEndpointImpl::Enable() {
304 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
305 RTC_CHECK(!is_enabled_);
306 is_enabled_ = true;
307 }
308
Disable()309 void EmulatedEndpointImpl::Disable() {
310 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
311 RTC_CHECK(is_enabled_);
312 is_enabled_ = false;
313 }
314
Enabled() const315 bool EmulatedEndpointImpl::Enabled() const {
316 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
317 return is_enabled_;
318 }
319
stats()320 EmulatedNetworkStats EmulatedEndpointImpl::stats() {
321 RTC_DCHECK_RUN_ON(task_queue_);
322 return stats_;
323 }
324
UpdateReceiveStats(const EmulatedIpPacket & packet)325 void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) {
326 RTC_DCHECK_RUN_ON(task_queue_);
327 Timestamp current_time = clock_->CurrentTime();
328 if (stats_.first_packet_received_time.IsInfinite()) {
329 stats_.first_packet_received_time = current_time;
330 stats_.first_received_packet_size =
331 DataSize::Bytes(packet.ip_packet_size());
332 }
333 stats_.last_packet_received_time = current_time;
334 stats_.packets_received++;
335 stats_.bytes_received += DataSize::Bytes(packet.ip_packet_size());
336 }
337
EndpointsContainer(const std::vector<EmulatedEndpointImpl * > & endpoints)338 EndpointsContainer::EndpointsContainer(
339 const std::vector<EmulatedEndpointImpl*>& endpoints)
340 : endpoints_(endpoints) {}
341
LookupByLocalAddress(const rtc::IPAddress & local_ip) const342 EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
343 const rtc::IPAddress& local_ip) const {
344 for (auto* endpoint : endpoints_) {
345 rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
346 if (peer_local_address == local_ip) {
347 return endpoint;
348 }
349 }
350 RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
351 }
352
HasEndpoint(EmulatedEndpointImpl * endpoint) const353 bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
354 for (auto* e : endpoints_) {
355 if (e->GetId() == endpoint->GetId()) {
356 return true;
357 }
358 }
359 return false;
360 }
361
362 std::vector<std::unique_ptr<rtc::Network>>
GetEnabledNetworks() const363 EndpointsContainer::GetEnabledNetworks() const {
364 std::vector<std::unique_ptr<rtc::Network>> networks;
365 for (auto* endpoint : endpoints_) {
366 if (endpoint->Enabled()) {
367 networks.emplace_back(
368 std::make_unique<rtc::Network>(endpoint->network()));
369 }
370 }
371 return networks;
372 }
373
GetStats() const374 EmulatedNetworkStats EndpointsContainer::GetStats() const {
375 EmulatedNetworkStats stats;
376 for (auto* endpoint : endpoints_) {
377 EmulatedNetworkStats endpoint_stats = endpoint->stats();
378 stats.packets_sent += endpoint_stats.packets_sent;
379 stats.bytes_sent += endpoint_stats.bytes_sent;
380 stats.packets_received += endpoint_stats.packets_received;
381 stats.bytes_received += endpoint_stats.bytes_received;
382 stats.packets_dropped += endpoint_stats.packets_dropped;
383 stats.bytes_dropped += endpoint_stats.bytes_dropped;
384 if (stats.first_packet_received_time >
385 endpoint_stats.first_packet_received_time) {
386 stats.first_packet_received_time =
387 endpoint_stats.first_packet_received_time;
388 stats.first_received_packet_size =
389 endpoint_stats.first_received_packet_size;
390 }
391 if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
392 stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
393 stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
394 }
395 if (stats.last_packet_received_time.IsInfinite() ||
396 stats.last_packet_received_time <
397 endpoint_stats.last_packet_received_time) {
398 stats.last_packet_received_time =
399 endpoint_stats.last_packet_received_time;
400 }
401 if (stats.last_packet_sent_time.IsInfinite() ||
402 stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
403 stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
404 }
405 }
406 return stats;
407 }
408
409 } // namespace webrtc
410