1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include "talk/p2p/base/transport.h"
29
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/candidate.h"
33 #include "talk/p2p/base/constants.h"
34 #include "talk/p2p/base/sessionmanager.h"
35 #include "talk/p2p/base/parsing.h"
36 #include "talk/p2p/base/transportchannelimpl.h"
37 #include "talk/xmllite/xmlelement.h"
38 #include "talk/xmpp/constants.h"
39
40 namespace cricket {
41
42 struct ChannelParams {
ChannelParamscricket::ChannelParams43 ChannelParams() : channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams44 explicit ChannelParams(const std::string& name)
45 : name(name), channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams46 ChannelParams(const std::string& name,
47 const std::string& content_type)
48 : name(name), content_type(content_type),
49 channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams50 explicit ChannelParams(cricket::Candidate* candidate) :
51 channel(NULL), candidate(candidate) {
52 name = candidate->name();
53 }
54
~ChannelParamscricket::ChannelParams55 ~ChannelParams() {
56 delete candidate;
57 }
58
59 std::string name;
60 std::string content_type;
61 cricket::TransportChannelImpl* channel;
62 cricket::Candidate* candidate;
63 };
64 // TODO: Merge ChannelParams and ChannelMessage.
65 typedef talk_base::ScopedMessageData<ChannelParams> ChannelMessage;
66
67 enum {
68 MSG_CREATECHANNEL = 1,
69 MSG_DESTROYCHANNEL = 2,
70 MSG_DESTROYALLCHANNELS = 3,
71 MSG_CONNECTCHANNELS = 4,
72 MSG_RESETCHANNELS = 5,
73 MSG_ONSIGNALINGREADY = 6,
74 MSG_ONREMOTECANDIDATE = 7,
75 MSG_READSTATE = 8,
76 MSG_WRITESTATE = 9,
77 MSG_REQUESTSIGNALING = 10,
78 MSG_CANDIDATEREADY = 11,
79 MSG_ROUTECHANGE = 12,
80 MSG_CONNECTING = 13,
81 MSG_CANDIDATEALLOCATIONCOMPLETE = 14,
82 };
83
Transport(talk_base::Thread * signaling_thread,talk_base::Thread * worker_thread,const std::string & type,PortAllocator * allocator)84 Transport::Transport(talk_base::Thread* signaling_thread,
85 talk_base::Thread* worker_thread,
86 const std::string& type,
87 PortAllocator* allocator)
88 : signaling_thread_(signaling_thread),
89 worker_thread_(worker_thread), type_(type), allocator_(allocator),
90 destroyed_(false), readable_(false), writable_(false),
91 connect_requested_(false), allow_local_ips_(false) {
92 }
93
~Transport()94 Transport::~Transport() {
95 ASSERT(signaling_thread_->IsCurrent());
96 ASSERT(destroyed_);
97 }
98
CreateChannel(const std::string & name,const std::string & content_type)99 TransportChannelImpl* Transport::CreateChannel(
100 const std::string& name, const std::string& content_type) {
101 ChannelMessage msg(new ChannelParams(name, content_type));
102 worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
103 return msg.data()->channel;
104 }
105
CreateChannel_w(const std::string & name,const std::string & content_type)106 TransportChannelImpl* Transport::CreateChannel_w(
107 const std::string& name, const std::string& content_type) {
108 ASSERT(worker_thread()->IsCurrent());
109 TransportChannelImpl *impl;
110 talk_base::CritScope cs(&crit_);
111
112 // Create the entry if it does not exist
113 if (channels_.find(name) == channels_.end()) {
114 impl = CreateTransportChannel(name, content_type);
115 channels_[name] = ChannelMapEntry(impl);
116 } else {
117 impl = channels_[name].get();
118 }
119
120 // Increase the ref count
121 channels_[name].AddRef();
122 destroyed_ = false;
123
124 impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
125 impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
126 impl->SignalRequestSignaling.connect(
127 this, &Transport::OnChannelRequestSignaling);
128 impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
129 impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
130
131 if (connect_requested_) {
132 impl->Connect();
133 if (channels_.size() == 1) {
134 // If this is the first channel, then indicate that we have started
135 // connecting.
136 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
137 }
138 }
139 return impl;
140 }
141
GetChannel(const std::string & name)142 TransportChannelImpl* Transport::GetChannel(const std::string& name) {
143 talk_base::CritScope cs(&crit_);
144 ChannelMap::iterator iter = channels_.find(name);
145 return (iter != channels_.end()) ? iter->second.get() : NULL;
146 }
147
HasChannels()148 bool Transport::HasChannels() {
149 talk_base::CritScope cs(&crit_);
150 return !channels_.empty();
151 }
152
DestroyChannel(const std::string & name)153 void Transport::DestroyChannel(const std::string& name) {
154 ChannelMessage msg(new ChannelParams(name));
155 worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
156 }
157
DestroyChannel_w(const std::string & name)158 void Transport::DestroyChannel_w(const std::string& name) {
159 ASSERT(worker_thread()->IsCurrent());
160
161 TransportChannelImpl* impl = NULL;
162 {
163 talk_base::CritScope cs(&crit_);
164 ChannelMap::iterator iter = channels_.find(name);
165 if (iter == channels_.end())
166 return;
167
168 iter->second.DecRef();
169 if (!iter->second.ref()) {
170 impl = iter->second.get();
171 channels_.erase(iter);
172 }
173 }
174
175 if (connect_requested_ && channels_.empty()) {
176 // We're no longer attempting to connect.
177 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
178 }
179
180 if (impl) {
181 // Check in case the deleted channel was the only non-writable channel.
182 OnChannelWritableState(impl);
183 DestroyTransportChannel(impl);
184 }
185 }
186
ConnectChannels()187 void Transport::ConnectChannels() {
188 ASSERT(signaling_thread()->IsCurrent());
189 worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
190 }
191
ConnectChannels_w()192 void Transport::ConnectChannels_w() {
193 ASSERT(worker_thread()->IsCurrent());
194 if (connect_requested_ || channels_.empty())
195 return;
196 connect_requested_ = true;
197 signaling_thread()->Post(
198 this, MSG_CANDIDATEREADY, NULL);
199 CallChannels_w(&TransportChannelImpl::Connect);
200 if (!channels_.empty()) {
201 signaling_thread()->Post(this, MSG_CONNECTING, NULL);
202 }
203 }
204
OnConnecting_s()205 void Transport::OnConnecting_s() {
206 ASSERT(signaling_thread()->IsCurrent());
207 SignalConnecting(this);
208 }
209
DestroyAllChannels()210 void Transport::DestroyAllChannels() {
211 ASSERT(signaling_thread()->IsCurrent());
212 worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
213 worker_thread()->Clear(this);
214 signaling_thread()->Clear(this);
215 destroyed_ = true;
216 }
217
DestroyAllChannels_w()218 void Transport::DestroyAllChannels_w() {
219 ASSERT(worker_thread()->IsCurrent());
220 std::vector<TransportChannelImpl*> impls;
221 {
222 talk_base::CritScope cs(&crit_);
223 for (ChannelMap::iterator iter = channels_.begin();
224 iter != channels_.end();
225 ++iter) {
226 iter->second.DecRef();
227 if (!iter->second.ref())
228 impls.push_back(iter->second.get());
229 }
230 }
231 channels_.clear();
232
233 for (size_t i = 0; i < impls.size(); ++i)
234 DestroyTransportChannel(impls[i]);
235 }
236
ResetChannels()237 void Transport::ResetChannels() {
238 ASSERT(signaling_thread()->IsCurrent());
239 worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
240 }
241
ResetChannels_w()242 void Transport::ResetChannels_w() {
243 ASSERT(worker_thread()->IsCurrent());
244
245 // We are no longer attempting to connect
246 connect_requested_ = false;
247
248 // Clear out the old messages, they aren't relevant
249 talk_base::CritScope cs(&crit_);
250 ready_candidates_.clear();
251
252 // Reset all of the channels
253 CallChannels_w(&TransportChannelImpl::Reset);
254 }
255
OnSignalingReady()256 void Transport::OnSignalingReady() {
257 ASSERT(signaling_thread()->IsCurrent());
258 if (destroyed_) return;
259
260 worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
261
262 // Notify the subclass.
263 OnTransportSignalingReady();
264 }
265
CallChannels_w(TransportChannelFunc func)266 void Transport::CallChannels_w(TransportChannelFunc func) {
267 ASSERT(worker_thread()->IsCurrent());
268 talk_base::CritScope cs(&crit_);
269 for (ChannelMap::iterator iter = channels_.begin();
270 iter != channels_.end();
271 ++iter) {
272 ((iter->second.get())->*func)();
273 }
274 }
275
VerifyCandidate(const Candidate & cand,ParseError * error)276 bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
277 if (cand.address().IsLocalIP() && !allow_local_ips_)
278 return BadParse("candidate has local IP address", error);
279
280 // No address zero.
281 if (cand.address().IsAny()) {
282 return BadParse("candidate has address of zero", error);
283 }
284
285 // Disallow all ports below 1024, except for 80 and 443 on public addresses.
286 int port = cand.address().port();
287 if (port < 1024) {
288 if ((port != 80) && (port != 443))
289 return BadParse(
290 "candidate has port below 1024, but not 80 or 443", error);
291 if (cand.address().IsPrivateIP()) {
292 return BadParse(
293 "candidate has port of 80 or 443 with private IP address", error);
294 }
295 }
296
297 return true;
298 }
299
OnRemoteCandidates(const std::vector<Candidate> & candidates)300 void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
301 for (std::vector<Candidate>::const_iterator iter = candidates.begin();
302 iter != candidates.end();
303 ++iter) {
304 OnRemoteCandidate(*iter);
305 }
306 }
307
OnRemoteCandidate(const Candidate & candidate)308 void Transport::OnRemoteCandidate(const Candidate& candidate) {
309 ASSERT(signaling_thread()->IsCurrent());
310 if (destroyed_) return;
311 if (!HasChannel(candidate.name())) {
312 LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
313 << candidate.name();
314 return;
315 }
316
317 ChannelMessage* msg = new ChannelMessage(
318 new ChannelParams(new Candidate(candidate)));
319 worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
320 }
321
OnRemoteCandidate_w(const Candidate & candidate)322 void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
323 ASSERT(worker_thread()->IsCurrent());
324 ChannelMap::iterator iter = channels_.find(candidate.name());
325 // It's ok for a channel to go away while this message is in transit.
326 if (iter != channels_.end()) {
327 iter->second.get()->OnCandidate(candidate);
328 }
329 }
330
OnChannelReadableState(TransportChannel * channel)331 void Transport::OnChannelReadableState(TransportChannel* channel) {
332 ASSERT(worker_thread()->IsCurrent());
333 signaling_thread()->Post(this, MSG_READSTATE, NULL);
334 }
335
OnChannelReadableState_s()336 void Transport::OnChannelReadableState_s() {
337 ASSERT(signaling_thread()->IsCurrent());
338 bool readable = GetTransportState_s(true);
339 if (readable_ != readable) {
340 readable_ = readable;
341 SignalReadableState(this);
342 }
343 }
344
OnChannelWritableState(TransportChannel * channel)345 void Transport::OnChannelWritableState(TransportChannel* channel) {
346 ASSERT(worker_thread()->IsCurrent());
347 signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
348 }
349
OnChannelWritableState_s()350 void Transport::OnChannelWritableState_s() {
351 ASSERT(signaling_thread()->IsCurrent());
352 bool writable = GetTransportState_s(false);
353 if (writable_ != writable) {
354 writable_ = writable;
355 SignalWritableState(this);
356 }
357 }
358
GetTransportState_s(bool read)359 bool Transport::GetTransportState_s(bool read) {
360 ASSERT(signaling_thread()->IsCurrent());
361 bool result = false;
362 talk_base::CritScope cs(&crit_);
363 for (ChannelMap::iterator iter = channels_.begin();
364 iter != channels_.end();
365 ++iter) {
366 bool b = (read ? iter->second.get()->readable() :
367 iter->second.get()->writable());
368 result = result || b;
369 }
370 return result;
371 }
372
OnChannelRequestSignaling(TransportChannelImpl * channel)373 void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
374 ASSERT(worker_thread()->IsCurrent());
375 ChannelMessage* msg = new ChannelMessage(
376 new ChannelParams(channel->name()));
377 signaling_thread()->Post(this, MSG_REQUESTSIGNALING, msg);
378 }
379
OnChannelRequestSignaling_s(const std::string & name)380 void Transport::OnChannelRequestSignaling_s(const std::string& name) {
381 ASSERT(signaling_thread()->IsCurrent());
382 // Resetting ICE state for the channel.
383 {
384 talk_base::CritScope cs(&crit_);
385 ChannelMap::iterator iter = channels_.find(name);
386 if (iter != channels_.end())
387 iter->second.set_candidates_allocated(false);
388 }
389 SignalRequestSignaling(this);
390 }
391
OnChannelCandidateReady(TransportChannelImpl * channel,const Candidate & candidate)392 void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
393 const Candidate& candidate) {
394 ASSERT(worker_thread()->IsCurrent());
395 talk_base::CritScope cs(&crit_);
396 ready_candidates_.push_back(candidate);
397
398 // We hold any messages until the client lets us connect.
399 if (connect_requested_) {
400 signaling_thread()->Post(
401 this, MSG_CANDIDATEREADY, NULL);
402 }
403 }
404
OnChannelCandidateReady_s()405 void Transport::OnChannelCandidateReady_s() {
406 ASSERT(signaling_thread()->IsCurrent());
407 ASSERT(connect_requested_);
408
409 std::vector<Candidate> candidates;
410 {
411 talk_base::CritScope cs(&crit_);
412 candidates.swap(ready_candidates_);
413 }
414
415 // we do the deleting of Candidate* here to keep the new above and
416 // delete below close to each other
417 if (!candidates.empty()) {
418 SignalCandidatesReady(this, candidates);
419 }
420 }
421
OnChannelRouteChange(TransportChannel * channel,const Candidate & remote_candidate)422 void Transport::OnChannelRouteChange(TransportChannel* channel,
423 const Candidate& remote_candidate) {
424 ASSERT(worker_thread()->IsCurrent());
425 ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
426 signaling_thread()->Post(this, MSG_ROUTECHANGE, new ChannelMessage(params));
427 }
428
OnChannelRouteChange_s(const std::string & name,const Candidate & remote_candidate)429 void Transport::OnChannelRouteChange_s(const std::string& name,
430 const Candidate& remote_candidate) {
431 ASSERT(signaling_thread()->IsCurrent());
432 SignalRouteChange(this, name, remote_candidate);
433 }
434
OnChannelCandidatesAllocationDone(TransportChannelImpl * channel)435 void Transport::OnChannelCandidatesAllocationDone(
436 TransportChannelImpl* channel) {
437 ASSERT(worker_thread()->IsCurrent());
438 ChannelMap::iterator iter = channels_.find(channel->name());
439 ASSERT(iter != channels_.end());
440 iter->second.set_candidates_allocated(true);
441
442 // If all channels belonging to this Transport got signal, then
443 // forward this signal to upper layer.
444 // Can this signal arrive before all transport channels are created?
445 for (iter = channels_.begin(); iter != channels_.end(); ++iter) {
446 if (!iter->second.candidates_allocated())
447 return;
448 }
449 signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
450 }
451
OnMessage(talk_base::Message * msg)452 void Transport::OnMessage(talk_base::Message* msg) {
453 switch (msg->message_id) {
454 case MSG_CREATECHANNEL:
455 {
456 ChannelParams* params =
457 static_cast<ChannelMessage*>(msg->pdata)->data().get();
458 params->channel = CreateChannel_w(params->name, params->content_type);
459 }
460 break;
461 case MSG_DESTROYCHANNEL:
462 {
463 ChannelParams* params =
464 static_cast<ChannelMessage*>(msg->pdata)->data().get();
465 DestroyChannel_w(params->name);
466 }
467 break;
468 case MSG_CONNECTCHANNELS:
469 ConnectChannels_w();
470 break;
471 case MSG_RESETCHANNELS:
472 ResetChannels_w();
473 break;
474 case MSG_DESTROYALLCHANNELS:
475 DestroyAllChannels_w();
476 break;
477 case MSG_ONSIGNALINGREADY:
478 CallChannels_w(&TransportChannelImpl::OnSignalingReady);
479 break;
480 case MSG_ONREMOTECANDIDATE:
481 {
482 ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
483 OnRemoteCandidate_w(*(channel_msg->data()->candidate));
484 delete channel_msg;
485 }
486 break;
487 case MSG_CONNECTING:
488 OnConnecting_s();
489 break;
490 case MSG_READSTATE:
491 OnChannelReadableState_s();
492 break;
493 case MSG_WRITESTATE:
494 OnChannelWritableState_s();
495 break;
496 case MSG_REQUESTSIGNALING:
497 {
498 ChannelParams* params =
499 static_cast<ChannelMessage*>(msg->pdata)->data().get();
500 OnChannelRequestSignaling_s(params->name);
501 delete params;
502 }
503 break;
504 case MSG_CANDIDATEREADY:
505 OnChannelCandidateReady_s();
506 break;
507 case MSG_ROUTECHANGE:
508 {
509 ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
510 ChannelParams* params = channel_msg->data().get();
511 OnChannelRouteChange_s(params->name, *params->candidate);
512 delete channel_msg;
513 }
514 break;
515 case MSG_CANDIDATEALLOCATIONCOMPLETE:
516 SignalCandidatesAllocationDone(this);
517 break;
518 }
519 }
520
ParseAddress(const buzz::XmlElement * elem,const buzz::QName & address_name,const buzz::QName & port_name,talk_base::SocketAddress * address,ParseError * error)521 bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
522 const buzz::QName& address_name,
523 const buzz::QName& port_name,
524 talk_base::SocketAddress* address,
525 ParseError* error) {
526 if (!elem->HasAttr(address_name))
527 return BadParse("address does not have " + address_name.LocalPart(), error);
528 if (!elem->HasAttr(port_name))
529 return BadParse("address does not have " + port_name.LocalPart(), error);
530
531 address->SetIP(elem->Attr(address_name));
532 std::istringstream ist(elem->Attr(port_name));
533 int port = 0;
534 ist >> port;
535 address->SetPort(port);
536
537 return true;
538 }
539
540 } // namespace cricket
541