1 /*
2  *  Copyright 2012 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "examples/peerconnection/client/peer_connection_client.h"
12 
13 #include "examples/peerconnection/client/defaults.h"
14 #include "rtc_base/checks.h"
15 #include "rtc_base/logging.h"
16 #include "rtc_base/net_helpers.h"
17 
18 #ifdef WIN32
19 #include "rtc_base/win32_socket_server.h"
20 #endif
21 
22 namespace {
23 
24 // This is our magical hangup signal.
25 const char kByeMessage[] = "BYE";
26 // Delay between server connection retries, in milliseconds
27 const int kReconnectDelay = 2000;
28 
CreateClientSocket(int family)29 rtc::AsyncSocket* CreateClientSocket(int family) {
30 #ifdef WIN32
31   rtc::Win32Socket* sock = new rtc::Win32Socket();
32   sock->CreateT(family, SOCK_STREAM);
33   return sock;
34 #elif defined(WEBRTC_POSIX)
35   rtc::Thread* thread = rtc::Thread::Current();
36   RTC_DCHECK(thread != NULL);
37   return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
38 #else
39 #error Platform not supported.
40 #endif
41 }
42 
43 }  // namespace
44 
PeerConnectionClient()45 PeerConnectionClient::PeerConnectionClient()
46     : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {}
47 
~PeerConnectionClient()48 PeerConnectionClient::~PeerConnectionClient() {}
49 
InitSocketSignals()50 void PeerConnectionClient::InitSocketSignals() {
51   RTC_DCHECK(control_socket_.get() != NULL);
52   RTC_DCHECK(hanging_get_.get() != NULL);
53   control_socket_->SignalCloseEvent.connect(this,
54                                             &PeerConnectionClient::OnClose);
55   hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
56   control_socket_->SignalConnectEvent.connect(this,
57                                               &PeerConnectionClient::OnConnect);
58   hanging_get_->SignalConnectEvent.connect(
59       this, &PeerConnectionClient::OnHangingGetConnect);
60   control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
61   hanging_get_->SignalReadEvent.connect(
62       this, &PeerConnectionClient::OnHangingGetRead);
63 }
64 
id() const65 int PeerConnectionClient::id() const {
66   return my_id_;
67 }
68 
is_connected() const69 bool PeerConnectionClient::is_connected() const {
70   return my_id_ != -1;
71 }
72 
peers() const73 const Peers& PeerConnectionClient::peers() const {
74   return peers_;
75 }
76 
RegisterObserver(PeerConnectionClientObserver * callback)77 void PeerConnectionClient::RegisterObserver(
78     PeerConnectionClientObserver* callback) {
79   RTC_DCHECK(!callback_);
80   callback_ = callback;
81 }
82 
Connect(const std::string & server,int port,const std::string & client_name)83 void PeerConnectionClient::Connect(const std::string& server,
84                                    int port,
85                                    const std::string& client_name) {
86   RTC_DCHECK(!server.empty());
87   RTC_DCHECK(!client_name.empty());
88 
89   if (state_ != NOT_CONNECTED) {
90     RTC_LOG(WARNING)
91         << "The client must not be connected before you can call Connect()";
92     callback_->OnServerConnectionFailure();
93     return;
94   }
95 
96   if (server.empty() || client_name.empty()) {
97     callback_->OnServerConnectionFailure();
98     return;
99   }
100 
101   if (port <= 0)
102     port = kDefaultServerPort;
103 
104   server_address_.SetIP(server);
105   server_address_.SetPort(port);
106   client_name_ = client_name;
107 
108   if (server_address_.IsUnresolvedIP()) {
109     state_ = RESOLVING;
110     resolver_ = new rtc::AsyncResolver();
111     resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
112     resolver_->Start(server_address_);
113   } else {
114     DoConnect();
115   }
116 }
117 
OnResolveResult(rtc::AsyncResolverInterface * resolver)118 void PeerConnectionClient::OnResolveResult(
119     rtc::AsyncResolverInterface* resolver) {
120   if (resolver_->GetError() != 0) {
121     callback_->OnServerConnectionFailure();
122     resolver_->Destroy(false);
123     resolver_ = NULL;
124     state_ = NOT_CONNECTED;
125   } else {
126     server_address_ = resolver_->address();
127     DoConnect();
128   }
129 }
130 
DoConnect()131 void PeerConnectionClient::DoConnect() {
132   control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
133   hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
134   InitSocketSignals();
135   char buffer[1024];
136   snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
137            client_name_.c_str());
138   onconnect_data_ = buffer;
139 
140   bool ret = ConnectControlSocket();
141   if (ret)
142     state_ = SIGNING_IN;
143   if (!ret) {
144     callback_->OnServerConnectionFailure();
145   }
146 }
147 
SendToPeer(int peer_id,const std::string & message)148 bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
149   if (state_ != CONNECTED)
150     return false;
151 
152   RTC_DCHECK(is_connected());
153   RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
154   if (!is_connected() || peer_id == -1)
155     return false;
156 
157   char headers[1024];
158   snprintf(headers, sizeof(headers),
159            "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
160            "Content-Length: %zu\r\n"
161            "Content-Type: text/plain\r\n"
162            "\r\n",
163            my_id_, peer_id, message.length());
164   onconnect_data_ = headers;
165   onconnect_data_ += message;
166   return ConnectControlSocket();
167 }
168 
SendHangUp(int peer_id)169 bool PeerConnectionClient::SendHangUp(int peer_id) {
170   return SendToPeer(peer_id, kByeMessage);
171 }
172 
IsSendingMessage()173 bool PeerConnectionClient::IsSendingMessage() {
174   return state_ == CONNECTED &&
175          control_socket_->GetState() != rtc::Socket::CS_CLOSED;
176 }
177 
SignOut()178 bool PeerConnectionClient::SignOut() {
179   if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
180     return true;
181 
182   if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
183     hanging_get_->Close();
184 
185   if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
186     state_ = SIGNING_OUT;
187 
188     if (my_id_ != -1) {
189       char buffer[1024];
190       snprintf(buffer, sizeof(buffer),
191                "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
192       onconnect_data_ = buffer;
193       return ConnectControlSocket();
194     } else {
195       // Can occur if the app is closed before we finish connecting.
196       return true;
197     }
198   } else {
199     state_ = SIGNING_OUT_WAITING;
200   }
201 
202   return true;
203 }
204 
Close()205 void PeerConnectionClient::Close() {
206   control_socket_->Close();
207   hanging_get_->Close();
208   onconnect_data_.clear();
209   peers_.clear();
210   if (resolver_ != NULL) {
211     resolver_->Destroy(false);
212     resolver_ = NULL;
213   }
214   my_id_ = -1;
215   state_ = NOT_CONNECTED;
216 }
217 
ConnectControlSocket()218 bool PeerConnectionClient::ConnectControlSocket() {
219   RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
220   int err = control_socket_->Connect(server_address_);
221   if (err == SOCKET_ERROR) {
222     Close();
223     return false;
224   }
225   return true;
226 }
227 
OnConnect(rtc::AsyncSocket * socket)228 void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
229   RTC_DCHECK(!onconnect_data_.empty());
230   size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
231   RTC_DCHECK(sent == onconnect_data_.length());
232   onconnect_data_.clear();
233 }
234 
OnHangingGetConnect(rtc::AsyncSocket * socket)235 void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
236   char buffer[1024];
237   snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
238            my_id_);
239   int len = static_cast<int>(strlen(buffer));
240   int sent = socket->Send(buffer, len);
241   RTC_DCHECK(sent == len);
242 }
243 
OnMessageFromPeer(int peer_id,const std::string & message)244 void PeerConnectionClient::OnMessageFromPeer(int peer_id,
245                                              const std::string& message) {
246   if (message.length() == (sizeof(kByeMessage) - 1) &&
247       message.compare(kByeMessage) == 0) {
248     callback_->OnPeerDisconnected(peer_id);
249   } else {
250     callback_->OnMessageFromPeer(peer_id, message);
251   }
252 }
253 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,size_t * value)254 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
255                                           size_t eoh,
256                                           const char* header_pattern,
257                                           size_t* value) {
258   RTC_DCHECK(value != NULL);
259   size_t found = data.find(header_pattern);
260   if (found != std::string::npos && found < eoh) {
261     *value = atoi(&data[found + strlen(header_pattern)]);
262     return true;
263   }
264   return false;
265 }
266 
GetHeaderValue(const std::string & data,size_t eoh,const char * header_pattern,std::string * value)267 bool PeerConnectionClient::GetHeaderValue(const std::string& data,
268                                           size_t eoh,
269                                           const char* header_pattern,
270                                           std::string* value) {
271   RTC_DCHECK(value != NULL);
272   size_t found = data.find(header_pattern);
273   if (found != std::string::npos && found < eoh) {
274     size_t begin = found + strlen(header_pattern);
275     size_t end = data.find("\r\n", begin);
276     if (end == std::string::npos)
277       end = eoh;
278     value->assign(data.substr(begin, end - begin));
279     return true;
280   }
281   return false;
282 }
283 
ReadIntoBuffer(rtc::AsyncSocket * socket,std::string * data,size_t * content_length)284 bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
285                                           std::string* data,
286                                           size_t* content_length) {
287   char buffer[0xffff];
288   do {
289     int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
290     if (bytes <= 0)
291       break;
292     data->append(buffer, bytes);
293   } while (true);
294 
295   bool ret = false;
296   size_t i = data->find("\r\n\r\n");
297   if (i != std::string::npos) {
298     RTC_LOG(INFO) << "Headers received";
299     if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
300       size_t total_response_size = (i + 4) + *content_length;
301       if (data->length() >= total_response_size) {
302         ret = true;
303         std::string should_close;
304         const char kConnection[] = "\r\nConnection: ";
305         if (GetHeaderValue(*data, i, kConnection, &should_close) &&
306             should_close.compare("close") == 0) {
307           socket->Close();
308           // Since we closed the socket, there was no notification delivered
309           // to us.  Compensate by letting ourselves know.
310           OnClose(socket, 0);
311         }
312       } else {
313         // We haven't received everything.  Just continue to accept data.
314       }
315     } else {
316       RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
317     }
318   }
319   return ret;
320 }
321 
OnRead(rtc::AsyncSocket * socket)322 void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
323   size_t content_length = 0;
324   if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
325     size_t peer_id = 0, eoh = 0;
326     bool ok =
327         ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
328     if (ok) {
329       if (my_id_ == -1) {
330         // First response.  Let's store our server assigned ID.
331         RTC_DCHECK(state_ == SIGNING_IN);
332         my_id_ = static_cast<int>(peer_id);
333         RTC_DCHECK(my_id_ != -1);
334 
335         // The body of the response will be a list of already connected peers.
336         if (content_length) {
337           size_t pos = eoh + 4;
338           while (pos < control_data_.size()) {
339             size_t eol = control_data_.find('\n', pos);
340             if (eol == std::string::npos)
341               break;
342             int id = 0;
343             std::string name;
344             bool connected;
345             if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
346                            &connected) &&
347                 id != my_id_) {
348               peers_[id] = name;
349               callback_->OnPeerConnected(id, name);
350             }
351             pos = eol + 1;
352           }
353         }
354         RTC_DCHECK(is_connected());
355         callback_->OnSignedIn();
356       } else if (state_ == SIGNING_OUT) {
357         Close();
358         callback_->OnDisconnected();
359       } else if (state_ == SIGNING_OUT_WAITING) {
360         SignOut();
361       }
362     }
363 
364     control_data_.clear();
365 
366     if (state_ == SIGNING_IN) {
367       RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
368       state_ = CONNECTED;
369       hanging_get_->Connect(server_address_);
370     }
371   }
372 }
373 
OnHangingGetRead(rtc::AsyncSocket * socket)374 void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
375   RTC_LOG(INFO) << __FUNCTION__;
376   size_t content_length = 0;
377   if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
378     size_t peer_id = 0, eoh = 0;
379     bool ok =
380         ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
381 
382     if (ok) {
383       // Store the position where the body begins.
384       size_t pos = eoh + 4;
385 
386       if (my_id_ == static_cast<int>(peer_id)) {
387         // A notification about a new member or a member that just
388         // disconnected.
389         int id = 0;
390         std::string name;
391         bool connected = false;
392         if (ParseEntry(notification_data_.substr(pos), &name, &id,
393                        &connected)) {
394           if (connected) {
395             peers_[id] = name;
396             callback_->OnPeerConnected(id, name);
397           } else {
398             peers_.erase(id);
399             callback_->OnPeerDisconnected(id);
400           }
401         }
402       } else {
403         OnMessageFromPeer(static_cast<int>(peer_id),
404                           notification_data_.substr(pos));
405       }
406     }
407 
408     notification_data_.clear();
409   }
410 
411   if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
412       state_ == CONNECTED) {
413     hanging_get_->Connect(server_address_);
414   }
415 }
416 
ParseEntry(const std::string & entry,std::string * name,int * id,bool * connected)417 bool PeerConnectionClient::ParseEntry(const std::string& entry,
418                                       std::string* name,
419                                       int* id,
420                                       bool* connected) {
421   RTC_DCHECK(name != NULL);
422   RTC_DCHECK(id != NULL);
423   RTC_DCHECK(connected != NULL);
424   RTC_DCHECK(!entry.empty());
425 
426   *connected = false;
427   size_t separator = entry.find(',');
428   if (separator != std::string::npos) {
429     *id = atoi(&entry[separator + 1]);
430     name->assign(entry.substr(0, separator));
431     separator = entry.find(',', separator + 1);
432     if (separator != std::string::npos) {
433       *connected = atoi(&entry[separator + 1]) ? true : false;
434     }
435   }
436   return !name->empty();
437 }
438 
GetResponseStatus(const std::string & response)439 int PeerConnectionClient::GetResponseStatus(const std::string& response) {
440   int status = -1;
441   size_t pos = response.find(' ');
442   if (pos != std::string::npos)
443     status = atoi(&response[pos + 1]);
444   return status;
445 }
446 
ParseServerResponse(const std::string & response,size_t content_length,size_t * peer_id,size_t * eoh)447 bool PeerConnectionClient::ParseServerResponse(const std::string& response,
448                                                size_t content_length,
449                                                size_t* peer_id,
450                                                size_t* eoh) {
451   int status = GetResponseStatus(response.c_str());
452   if (status != 200) {
453     RTC_LOG(LS_ERROR) << "Received error from server";
454     Close();
455     callback_->OnDisconnected();
456     return false;
457   }
458 
459   *eoh = response.find("\r\n\r\n");
460   RTC_DCHECK(*eoh != std::string::npos);
461   if (*eoh == std::string::npos)
462     return false;
463 
464   *peer_id = -1;
465 
466   // See comment in peer_channel.cc for why we use the Pragma header and
467   // not e.g. "X-Peer-Id".
468   GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
469 
470   return true;
471 }
472 
OnClose(rtc::AsyncSocket * socket,int err)473 void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
474   RTC_LOG(INFO) << __FUNCTION__;
475 
476   socket->Close();
477 
478 #ifdef WIN32
479   if (err != WSAECONNREFUSED) {
480 #else
481   if (err != ECONNREFUSED) {
482 #endif
483     if (socket == hanging_get_.get()) {
484       if (state_ == CONNECTED) {
485         hanging_get_->Close();
486         hanging_get_->Connect(server_address_);
487       }
488     } else {
489       callback_->OnMessageSent(err);
490     }
491   } else {
492     if (socket == control_socket_.get()) {
493       RTC_LOG(WARNING) << "Connection refused; retrying in 2 seconds";
494       rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, kReconnectDelay, this,
495                                           0);
496     } else {
497       Close();
498       callback_->OnDisconnected();
499     }
500   }
501 }
502 
503 void PeerConnectionClient::OnMessage(rtc::Message* msg) {
504   // ignore msg; there is currently only one supported message ("retry")
505   DoConnect();
506 }
507