1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "talk/p2p/base/transport.h"
29 
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/p2p/base/candidate.h"
33 #include "talk/p2p/base/constants.h"
34 #include "talk/p2p/base/sessionmanager.h"
35 #include "talk/p2p/base/parsing.h"
36 #include "talk/p2p/base/transportchannelimpl.h"
37 #include "talk/xmllite/xmlelement.h"
38 #include "talk/xmpp/constants.h"
39 
40 namespace cricket {
41 
42 struct ChannelParams {
ChannelParamscricket::ChannelParams43   ChannelParams() : channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams44   explicit ChannelParams(const std::string& name)
45       : name(name), channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams46   ChannelParams(const std::string& name,
47                 const std::string& content_type)
48       : name(name), content_type(content_type),
49         channel(NULL), candidate(NULL) {}
ChannelParamscricket::ChannelParams50   explicit ChannelParams(cricket::Candidate* candidate) :
51       channel(NULL), candidate(candidate) {
52     name = candidate->name();
53   }
54 
~ChannelParamscricket::ChannelParams55   ~ChannelParams() {
56     delete candidate;
57   }
58 
59   std::string name;
60   std::string content_type;
61   cricket::TransportChannelImpl* channel;
62   cricket::Candidate* candidate;
63 };
64 // TODO: Merge ChannelParams and ChannelMessage.
65 typedef talk_base::ScopedMessageData<ChannelParams> ChannelMessage;
66 
67 enum {
68   MSG_CREATECHANNEL = 1,
69   MSG_DESTROYCHANNEL = 2,
70   MSG_DESTROYALLCHANNELS = 3,
71   MSG_CONNECTCHANNELS = 4,
72   MSG_RESETCHANNELS = 5,
73   MSG_ONSIGNALINGREADY = 6,
74   MSG_ONREMOTECANDIDATE = 7,
75   MSG_READSTATE = 8,
76   MSG_WRITESTATE = 9,
77   MSG_REQUESTSIGNALING = 10,
78   MSG_CANDIDATEREADY = 11,
79   MSG_ROUTECHANGE = 12,
80   MSG_CONNECTING = 13,
81   MSG_CANDIDATEALLOCATIONCOMPLETE = 14,
82 };
83 
Transport(talk_base::Thread * signaling_thread,talk_base::Thread * worker_thread,const std::string & type,PortAllocator * allocator)84 Transport::Transport(talk_base::Thread* signaling_thread,
85                      talk_base::Thread* worker_thread,
86                      const std::string& type,
87                      PortAllocator* allocator)
88   : signaling_thread_(signaling_thread),
89     worker_thread_(worker_thread), type_(type), allocator_(allocator),
90     destroyed_(false), readable_(false), writable_(false),
91     connect_requested_(false), allow_local_ips_(false) {
92 }
93 
~Transport()94 Transport::~Transport() {
95   ASSERT(signaling_thread_->IsCurrent());
96   ASSERT(destroyed_);
97 }
98 
CreateChannel(const std::string & name,const std::string & content_type)99 TransportChannelImpl* Transport::CreateChannel(
100     const std::string& name, const std::string& content_type) {
101   ChannelMessage msg(new ChannelParams(name, content_type));
102   worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
103   return msg.data()->channel;
104 }
105 
CreateChannel_w(const std::string & name,const std::string & content_type)106 TransportChannelImpl* Transport::CreateChannel_w(
107     const std::string& name, const std::string& content_type) {
108   ASSERT(worker_thread()->IsCurrent());
109   TransportChannelImpl *impl;
110   talk_base::CritScope cs(&crit_);
111 
112   // Create the entry if it does not exist
113   if (channels_.find(name) == channels_.end()) {
114     impl = CreateTransportChannel(name, content_type);
115     channels_[name] = ChannelMapEntry(impl);
116   } else {
117     impl = channels_[name].get();
118   }
119 
120   // Increase the ref count
121   channels_[name].AddRef();
122   destroyed_ = false;
123 
124   impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
125   impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
126   impl->SignalRequestSignaling.connect(
127       this, &Transport::OnChannelRequestSignaling);
128   impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
129   impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
130 
131   if (connect_requested_) {
132     impl->Connect();
133     if (channels_.size() == 1) {
134       // If this is the first channel, then indicate that we have started
135       // connecting.
136       signaling_thread()->Post(this, MSG_CONNECTING, NULL);
137     }
138   }
139   return impl;
140 }
141 
GetChannel(const std::string & name)142 TransportChannelImpl* Transport::GetChannel(const std::string& name) {
143   talk_base::CritScope cs(&crit_);
144   ChannelMap::iterator iter = channels_.find(name);
145   return (iter != channels_.end()) ? iter->second.get() : NULL;
146 }
147 
HasChannels()148 bool Transport::HasChannels() {
149   talk_base::CritScope cs(&crit_);
150   return !channels_.empty();
151 }
152 
DestroyChannel(const std::string & name)153 void Transport::DestroyChannel(const std::string& name) {
154   ChannelMessage msg(new ChannelParams(name));
155   worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
156 }
157 
DestroyChannel_w(const std::string & name)158 void Transport::DestroyChannel_w(const std::string& name) {
159   ASSERT(worker_thread()->IsCurrent());
160 
161   TransportChannelImpl* impl = NULL;
162   {
163     talk_base::CritScope cs(&crit_);
164     ChannelMap::iterator iter = channels_.find(name);
165     if (iter == channels_.end())
166       return;
167 
168     iter->second.DecRef();
169     if (!iter->second.ref()) {
170       impl = iter->second.get();
171       channels_.erase(iter);
172     }
173   }
174 
175   if (connect_requested_ && channels_.empty()) {
176     // We're no longer attempting to connect.
177     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
178   }
179 
180   if (impl) {
181     // Check in case the deleted channel was the only non-writable channel.
182     OnChannelWritableState(impl);
183     DestroyTransportChannel(impl);
184   }
185 }
186 
ConnectChannels()187 void Transport::ConnectChannels() {
188   ASSERT(signaling_thread()->IsCurrent());
189   worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
190 }
191 
ConnectChannels_w()192 void Transport::ConnectChannels_w() {
193   ASSERT(worker_thread()->IsCurrent());
194   if (connect_requested_ || channels_.empty())
195     return;
196   connect_requested_ = true;
197   signaling_thread()->Post(
198       this, MSG_CANDIDATEREADY, NULL);
199   CallChannels_w(&TransportChannelImpl::Connect);
200   if (!channels_.empty()) {
201     signaling_thread()->Post(this, MSG_CONNECTING, NULL);
202   }
203 }
204 
OnConnecting_s()205 void Transport::OnConnecting_s() {
206   ASSERT(signaling_thread()->IsCurrent());
207   SignalConnecting(this);
208 }
209 
DestroyAllChannels()210 void Transport::DestroyAllChannels() {
211   ASSERT(signaling_thread()->IsCurrent());
212   worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
213   worker_thread()->Clear(this);
214   signaling_thread()->Clear(this);
215   destroyed_ = true;
216 }
217 
DestroyAllChannels_w()218 void Transport::DestroyAllChannels_w() {
219   ASSERT(worker_thread()->IsCurrent());
220   std::vector<TransportChannelImpl*> impls;
221   {
222     talk_base::CritScope cs(&crit_);
223     for (ChannelMap::iterator iter = channels_.begin();
224          iter != channels_.end();
225          ++iter) {
226       iter->second.DecRef();
227       if (!iter->second.ref())
228         impls.push_back(iter->second.get());
229       }
230     }
231   channels_.clear();
232 
233   for (size_t i = 0; i < impls.size(); ++i)
234     DestroyTransportChannel(impls[i]);
235 }
236 
ResetChannels()237 void Transport::ResetChannels() {
238   ASSERT(signaling_thread()->IsCurrent());
239   worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
240 }
241 
ResetChannels_w()242 void Transport::ResetChannels_w() {
243   ASSERT(worker_thread()->IsCurrent());
244 
245   // We are no longer attempting to connect
246   connect_requested_ = false;
247 
248   // Clear out the old messages, they aren't relevant
249   talk_base::CritScope cs(&crit_);
250   ready_candidates_.clear();
251 
252   // Reset all of the channels
253   CallChannels_w(&TransportChannelImpl::Reset);
254 }
255 
OnSignalingReady()256 void Transport::OnSignalingReady() {
257   ASSERT(signaling_thread()->IsCurrent());
258   if (destroyed_) return;
259 
260   worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
261 
262   // Notify the subclass.
263   OnTransportSignalingReady();
264 }
265 
CallChannels_w(TransportChannelFunc func)266 void Transport::CallChannels_w(TransportChannelFunc func) {
267   ASSERT(worker_thread()->IsCurrent());
268   talk_base::CritScope cs(&crit_);
269   for (ChannelMap::iterator iter = channels_.begin();
270        iter != channels_.end();
271        ++iter) {
272     ((iter->second.get())->*func)();
273   }
274 }
275 
VerifyCandidate(const Candidate & cand,ParseError * error)276 bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
277   if (cand.address().IsLocalIP() && !allow_local_ips_)
278     return BadParse("candidate has local IP address", error);
279 
280   // No address zero.
281   if (cand.address().IsAny()) {
282     return BadParse("candidate has address of zero", error);
283   }
284 
285   // Disallow all ports below 1024, except for 80 and 443 on public addresses.
286   int port = cand.address().port();
287   if (port < 1024) {
288     if ((port != 80) && (port != 443))
289       return BadParse(
290           "candidate has port below 1024, but not 80 or 443", error);
291     if (cand.address().IsPrivateIP()) {
292       return BadParse(
293           "candidate has port of 80 or 443 with private IP address", error);
294     }
295   }
296 
297   return true;
298 }
299 
OnRemoteCandidates(const std::vector<Candidate> & candidates)300 void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
301   for (std::vector<Candidate>::const_iterator iter = candidates.begin();
302        iter != candidates.end();
303        ++iter) {
304     OnRemoteCandidate(*iter);
305   }
306 }
307 
OnRemoteCandidate(const Candidate & candidate)308 void Transport::OnRemoteCandidate(const Candidate& candidate) {
309   ASSERT(signaling_thread()->IsCurrent());
310   if (destroyed_) return;
311   if (!HasChannel(candidate.name())) {
312     LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
313                     << candidate.name();
314     return;
315   }
316 
317   ChannelMessage* msg = new ChannelMessage(
318       new ChannelParams(new Candidate(candidate)));
319   worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
320 }
321 
OnRemoteCandidate_w(const Candidate & candidate)322 void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
323   ASSERT(worker_thread()->IsCurrent());
324   ChannelMap::iterator iter = channels_.find(candidate.name());
325   // It's ok for a channel to go away while this message is in transit.
326   if (iter != channels_.end()) {
327     iter->second.get()->OnCandidate(candidate);
328   }
329 }
330 
OnChannelReadableState(TransportChannel * channel)331 void Transport::OnChannelReadableState(TransportChannel* channel) {
332   ASSERT(worker_thread()->IsCurrent());
333   signaling_thread()->Post(this, MSG_READSTATE, NULL);
334 }
335 
OnChannelReadableState_s()336 void Transport::OnChannelReadableState_s() {
337   ASSERT(signaling_thread()->IsCurrent());
338   bool readable = GetTransportState_s(true);
339   if (readable_ != readable) {
340     readable_ = readable;
341     SignalReadableState(this);
342   }
343 }
344 
OnChannelWritableState(TransportChannel * channel)345 void Transport::OnChannelWritableState(TransportChannel* channel) {
346   ASSERT(worker_thread()->IsCurrent());
347   signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
348 }
349 
OnChannelWritableState_s()350 void Transport::OnChannelWritableState_s() {
351   ASSERT(signaling_thread()->IsCurrent());
352   bool writable = GetTransportState_s(false);
353   if (writable_ != writable) {
354     writable_ = writable;
355     SignalWritableState(this);
356   }
357 }
358 
GetTransportState_s(bool read)359 bool Transport::GetTransportState_s(bool read) {
360   ASSERT(signaling_thread()->IsCurrent());
361   bool result = false;
362   talk_base::CritScope cs(&crit_);
363   for (ChannelMap::iterator iter = channels_.begin();
364        iter != channels_.end();
365        ++iter) {
366     bool b = (read ? iter->second.get()->readable() :
367       iter->second.get()->writable());
368     result = result || b;
369   }
370   return result;
371 }
372 
OnChannelRequestSignaling(TransportChannelImpl * channel)373 void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
374   ASSERT(worker_thread()->IsCurrent());
375   ChannelMessage* msg = new ChannelMessage(
376       new ChannelParams(channel->name()));
377   signaling_thread()->Post(this, MSG_REQUESTSIGNALING, msg);
378 }
379 
OnChannelRequestSignaling_s(const std::string & name)380 void Transport::OnChannelRequestSignaling_s(const std::string& name) {
381   ASSERT(signaling_thread()->IsCurrent());
382   // Resetting ICE state for the channel.
383   {
384     talk_base::CritScope cs(&crit_);
385     ChannelMap::iterator iter = channels_.find(name);
386     if (iter != channels_.end())
387       iter->second.set_candidates_allocated(false);
388   }
389   SignalRequestSignaling(this);
390 }
391 
OnChannelCandidateReady(TransportChannelImpl * channel,const Candidate & candidate)392 void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
393                                         const Candidate& candidate) {
394   ASSERT(worker_thread()->IsCurrent());
395   talk_base::CritScope cs(&crit_);
396   ready_candidates_.push_back(candidate);
397 
398   // We hold any messages until the client lets us connect.
399   if (connect_requested_) {
400     signaling_thread()->Post(
401         this, MSG_CANDIDATEREADY, NULL);
402   }
403 }
404 
OnChannelCandidateReady_s()405 void Transport::OnChannelCandidateReady_s() {
406   ASSERT(signaling_thread()->IsCurrent());
407   ASSERT(connect_requested_);
408 
409   std::vector<Candidate> candidates;
410   {
411     talk_base::CritScope cs(&crit_);
412     candidates.swap(ready_candidates_);
413   }
414 
415   // we do the deleting of Candidate* here to keep the new above and
416   // delete below close to each other
417   if (!candidates.empty()) {
418     SignalCandidatesReady(this, candidates);
419   }
420 }
421 
OnChannelRouteChange(TransportChannel * channel,const Candidate & remote_candidate)422 void Transport::OnChannelRouteChange(TransportChannel* channel,
423                                      const Candidate& remote_candidate) {
424   ASSERT(worker_thread()->IsCurrent());
425   ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
426   signaling_thread()->Post(this, MSG_ROUTECHANGE, new ChannelMessage(params));
427 }
428 
OnChannelRouteChange_s(const std::string & name,const Candidate & remote_candidate)429 void Transport::OnChannelRouteChange_s(const std::string& name,
430                                        const Candidate& remote_candidate) {
431   ASSERT(signaling_thread()->IsCurrent());
432   SignalRouteChange(this, name, remote_candidate);
433 }
434 
OnChannelCandidatesAllocationDone(TransportChannelImpl * channel)435 void Transport::OnChannelCandidatesAllocationDone(
436     TransportChannelImpl* channel) {
437   ASSERT(worker_thread()->IsCurrent());
438   ChannelMap::iterator iter = channels_.find(channel->name());
439   ASSERT(iter != channels_.end());
440   iter->second.set_candidates_allocated(true);
441 
442   // If all channels belonging to this Transport got signal, then
443   // forward this signal to upper layer.
444   // Can this signal arrive before all transport channels are created?
445   for (iter = channels_.begin(); iter != channels_.end(); ++iter) {
446     if (!iter->second.candidates_allocated())
447       return;
448   }
449   signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
450 }
451 
OnMessage(talk_base::Message * msg)452 void Transport::OnMessage(talk_base::Message* msg) {
453   switch (msg->message_id) {
454   case MSG_CREATECHANNEL:
455     {
456       ChannelParams* params =
457           static_cast<ChannelMessage*>(msg->pdata)->data().get();
458       params->channel = CreateChannel_w(params->name, params->content_type);
459     }
460     break;
461   case MSG_DESTROYCHANNEL:
462     {
463       ChannelParams* params =
464           static_cast<ChannelMessage*>(msg->pdata)->data().get();
465       DestroyChannel_w(params->name);
466     }
467     break;
468   case MSG_CONNECTCHANNELS:
469     ConnectChannels_w();
470     break;
471   case MSG_RESETCHANNELS:
472     ResetChannels_w();
473     break;
474   case MSG_DESTROYALLCHANNELS:
475     DestroyAllChannels_w();
476     break;
477   case MSG_ONSIGNALINGREADY:
478     CallChannels_w(&TransportChannelImpl::OnSignalingReady);
479     break;
480   case MSG_ONREMOTECANDIDATE:
481     {
482       ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
483       OnRemoteCandidate_w(*(channel_msg->data()->candidate));
484       delete channel_msg;
485     }
486     break;
487   case MSG_CONNECTING:
488     OnConnecting_s();
489     break;
490   case MSG_READSTATE:
491     OnChannelReadableState_s();
492     break;
493   case MSG_WRITESTATE:
494     OnChannelWritableState_s();
495     break;
496   case MSG_REQUESTSIGNALING:
497     {
498       ChannelParams* params =
499           static_cast<ChannelMessage*>(msg->pdata)->data().get();
500       OnChannelRequestSignaling_s(params->name);
501       delete params;
502     }
503     break;
504   case MSG_CANDIDATEREADY:
505     OnChannelCandidateReady_s();
506     break;
507   case MSG_ROUTECHANGE:
508     {
509       ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
510       ChannelParams* params = channel_msg->data().get();
511       OnChannelRouteChange_s(params->name, *params->candidate);
512       delete channel_msg;
513     }
514     break;
515   case MSG_CANDIDATEALLOCATIONCOMPLETE:
516     SignalCandidatesAllocationDone(this);
517     break;
518   }
519 }
520 
ParseAddress(const buzz::XmlElement * elem,const buzz::QName & address_name,const buzz::QName & port_name,talk_base::SocketAddress * address,ParseError * error)521 bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
522                                    const buzz::QName& address_name,
523                                    const buzz::QName& port_name,
524                                    talk_base::SocketAddress* address,
525                                    ParseError* error) {
526   if (!elem->HasAttr(address_name))
527     return BadParse("address does not have " + address_name.LocalPart(), error);
528   if (!elem->HasAttr(port_name))
529     return BadParse("address does not have " + port_name.LocalPart(), error);
530 
531   address->SetIP(elem->Attr(address_name));
532   std::istringstream ist(elem->Attr(port_name));
533   int port = 0;
534   ist >> port;
535   address->SetPort(port);
536 
537   return true;
538 }
539 
540 }  // namespace cricket
541