1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_
6 #define COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_
7 
8 #include <string>
9 
10 #include "base/containers/queue.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/sequence_checker.h"
14 #include "base/threading/thread_checker.h"
15 #include "components/cast_channel/cast_channel_enum.h"
16 #include "components/cast_channel/logger.h"
17 #include "net/base/completion_once_callback.h"
18 #include "net/base/ip_endpoint.h"
19 #include "third_party/openscreen/src/cast/common/channel/proto/cast_channel.pb.h"
20 
21 namespace net {
22 class DrainableIOBuffer;
23 class DrainableIOBuffer;
24 class GrowableIOBuffer;
25 class IOBuffer;
26 class Socket;
27 }  // namespace net
28 
29 namespace cast_channel {
30 
31 using ::cast::channel::CastMessage;
32 class MessageFramer;
33 
34 class CastTransport {
35  public:
~CastTransport()36   virtual ~CastTransport() {}
37 
38   // Object to be informed of incoming messages and read errors.
39   class Delegate {
40    public:
~Delegate()41     virtual ~Delegate() {}
42 
43     // Called once Transport is successfully initialized and started.
44     // Owned read delegates are Start()ed automatically.
45     virtual void Start() = 0;
46 
47     // An error occurred on the channel.
48     // The caller is responsible for closing |socket| if an error occurred.
49     virtual void OnError(ChannelError error_state) = 0;
50 
51     // A message was received on the channel.
52     virtual void OnMessage(const CastMessage& message) = 0;
53   };
54 
55   // Sends a CastMessage to |socket_|.
56   // |message|: The message to send.
57   // |callback|: Callback to be invoked when the write operation has finished.
58   // Virtual for testing.
59   virtual void SendMessage(const CastMessage& message,
60                            net::CompletionOnceCallback callback) = 0;
61 
62   // Initializes the reading state machine and starts reading from the
63   // underlying socket.
64   // Virtual for testing.
65   virtual void Start() = 0;
66 
67   // Changes the delegate for processing read events. Pending reads remain
68   // in-flight.
69   // Ownership of the pointee of |delegate| is assumed by the transport.
70   // Prior delegates are deleted automatically.
71   virtual void SetReadDelegate(std::unique_ptr<Delegate> delegate) = 0;
72 };
73 
74 // Manager class for reading and writing messages to/from a socket.
75 class CastTransportImpl : public CastTransport {
76  public:
77   using ChannelError = ::cast_channel::ChannelError;
78 
79   // Interface to read/write data from a socket to ease unit-testing.
80   class Channel {
81    public:
~Channel()82     virtual ~Channel() {}
83     virtual void Read(net::IOBuffer* buffer,
84                       int bytes,
85                       net::CompletionOnceCallback callback) = 0;
86     virtual void Write(net::IOBuffer* buffer,
87                        int bytes,
88                        net::CompletionOnceCallback callback) = 0;
89   };
90 
91   // Adds a CastMessage read/write layer to a socket.
92   // Message read events are propagated to the owner via |read_delegate|.
93   // |vlog_prefix| sets the prefix used for all VLOGged output.
94   // |channel| and |logger| must all out-live the
95   // CastTransportImpl instance.
96   // |read_delegate| is owned by this CastTransportImpl object.
97   CastTransportImpl(Channel* channel,
98                     int channel_id,
99                     const net::IPEndPoint& ip_endpoint_,
100                     scoped_refptr<Logger> logger);
101 
102   ~CastTransportImpl() override;
103 
104   // CastTransport interface.
105   void SendMessage(const CastMessage& message,
106                    net::CompletionOnceCallback callback) override;
107   void Start() override;
108   void SetReadDelegate(std::unique_ptr<Delegate> delegate) override;
109 
110  private:
111   // Holds a message to be written to the socket. |callback| is invoked when the
112   // message is fully written or an error occurrs.
113   struct WriteRequest {
114     explicit WriteRequest(const std::string& namespace_,
115                           const std::string& payload,
116                           net::CompletionOnceCallback callback);
117     WriteRequest(WriteRequest&& other);
118     ~WriteRequest();
119 
120     // Namespace of the serialized message.
121     std::string message_namespace;
122     // Write completion callback, invoked when the operation has completed or
123     // failed.
124     net::CompletionOnceCallback callback;
125     // Buffer with outgoing data.
126     scoped_refptr<net::DrainableIOBuffer> io_buffer;
127   };
128 
129   static bool IsTerminalReadState(ReadState read_state);
130   static bool IsTerminalWriteState(WriteState write_state);
131 
132   void SetReadState(ReadState read_state);
133   void SetWriteState(WriteState write_state);
134   void SetErrorState(ChannelError error_state);
135 
136   // Terminates all in-flight write callbacks with error code ERR_FAILED.
137   void FlushWriteQueue();
138 
139   // Main method that performs write flow state transitions.
140   void OnWriteResult(int result);
141 
142   // Each of the below Do* method is executed in the corresponding
143   // write state. For example when write state is WRITE_STATE_WRITE_COMPLETE
144   // DowriteComplete is called, and so on.
145   int DoWrite();
146   int DoWriteComplete(int result);
147   int DoWriteCallback();
148   int DoWriteHandleError(int result);
149 
150   // Main method that performs write flow state transitions.
151   void OnReadResult(int result);
152 
153   // Each of the below Do* method is executed in the corresponding
154   // write state. For example when read state is READ_STATE_READ_COMPLETE
155   // DoReadComplete is called, and so on.
156   int DoRead();
157   int DoReadComplete(int result);
158   int DoReadCallback();
159   int DoReadHandleError(int result);
160 
161   // Indicates that the transport object is started and may receive and send
162   // messages.
163   bool started_;
164 
165   // Queue of pending writes. The message at the front of the queue is the one
166   // being written.
167   base::queue<WriteRequest> write_queue_;
168 
169   // Buffer used for read operations. Reused for every read.
170   scoped_refptr<net::GrowableIOBuffer> read_buffer_;
171 
172   // Constructs and parses the wire representation of message frames.
173   std::unique_ptr<MessageFramer> framer_;
174 
175   // Last message received on the socket.
176   std::unique_ptr<CastMessage> current_message_;
177 
178   // Channel used for I/O operations.
179   Channel* const channel_;
180 
181   // Methods for communicating message receipt and error status to client code.
182   std::unique_ptr<Delegate> delegate_;
183 
184   // Write flow state machine state.
185   WriteState write_state_;
186 
187   // Read flow state machine state.
188   ReadState read_state_;
189 
190   // The last error encountered by the channel.
191   ChannelError error_state_;
192 
193   // Connection metadata for logging purposes.
194   // Socket ID assigned by ApiResourceManager.
195   int channel_id_;
196 
197   // IP address of the remote end.
198   const net::IPEndPoint ip_endpoint_;
199 
200   // Accumulates details of events and errors, for debugging purposes.
201   scoped_refptr<Logger> logger_;
202 
203   SEQUENCE_CHECKER(sequence_checker_);
204 
205   DISALLOW_COPY_AND_ASSIGN(CastTransportImpl);
206 };
207 }  // namespace cast_channel
208 
209 #endif  // COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_
210