1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_ 6 #define COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_ 7 8 #include <string> 9 10 #include "base/containers/queue.h" 11 #include "base/macros.h" 12 #include "base/memory/ref_counted.h" 13 #include "base/sequence_checker.h" 14 #include "base/threading/thread_checker.h" 15 #include "components/cast_channel/cast_channel_enum.h" 16 #include "components/cast_channel/logger.h" 17 #include "net/base/completion_once_callback.h" 18 #include "net/base/ip_endpoint.h" 19 #include "third_party/openscreen/src/cast/common/channel/proto/cast_channel.pb.h" 20 21 namespace net { 22 class DrainableIOBuffer; 23 class DrainableIOBuffer; 24 class GrowableIOBuffer; 25 class IOBuffer; 26 class Socket; 27 } // namespace net 28 29 namespace cast_channel { 30 31 using ::cast::channel::CastMessage; 32 class MessageFramer; 33 34 class CastTransport { 35 public: ~CastTransport()36 virtual ~CastTransport() {} 37 38 // Object to be informed of incoming messages and read errors. 39 class Delegate { 40 public: ~Delegate()41 virtual ~Delegate() {} 42 43 // Called once Transport is successfully initialized and started. 44 // Owned read delegates are Start()ed automatically. 45 virtual void Start() = 0; 46 47 // An error occurred on the channel. 48 // The caller is responsible for closing |socket| if an error occurred. 49 virtual void OnError(ChannelError error_state) = 0; 50 51 // A message was received on the channel. 52 virtual void OnMessage(const CastMessage& message) = 0; 53 }; 54 55 // Sends a CastMessage to |socket_|. 56 // |message|: The message to send. 57 // |callback|: Callback to be invoked when the write operation has finished. 58 // Virtual for testing. 59 virtual void SendMessage(const CastMessage& message, 60 net::CompletionOnceCallback callback) = 0; 61 62 // Initializes the reading state machine and starts reading from the 63 // underlying socket. 64 // Virtual for testing. 65 virtual void Start() = 0; 66 67 // Changes the delegate for processing read events. Pending reads remain 68 // in-flight. 69 // Ownership of the pointee of |delegate| is assumed by the transport. 70 // Prior delegates are deleted automatically. 71 virtual void SetReadDelegate(std::unique_ptr<Delegate> delegate) = 0; 72 }; 73 74 // Manager class for reading and writing messages to/from a socket. 75 class CastTransportImpl : public CastTransport { 76 public: 77 using ChannelError = ::cast_channel::ChannelError; 78 79 // Interface to read/write data from a socket to ease unit-testing. 80 class Channel { 81 public: ~Channel()82 virtual ~Channel() {} 83 virtual void Read(net::IOBuffer* buffer, 84 int bytes, 85 net::CompletionOnceCallback callback) = 0; 86 virtual void Write(net::IOBuffer* buffer, 87 int bytes, 88 net::CompletionOnceCallback callback) = 0; 89 }; 90 91 // Adds a CastMessage read/write layer to a socket. 92 // Message read events are propagated to the owner via |read_delegate|. 93 // |vlog_prefix| sets the prefix used for all VLOGged output. 94 // |channel| and |logger| must all out-live the 95 // CastTransportImpl instance. 96 // |read_delegate| is owned by this CastTransportImpl object. 97 CastTransportImpl(Channel* channel, 98 int channel_id, 99 const net::IPEndPoint& ip_endpoint_, 100 scoped_refptr<Logger> logger); 101 102 ~CastTransportImpl() override; 103 104 // CastTransport interface. 105 void SendMessage(const CastMessage& message, 106 net::CompletionOnceCallback callback) override; 107 void Start() override; 108 void SetReadDelegate(std::unique_ptr<Delegate> delegate) override; 109 110 private: 111 // Holds a message to be written to the socket. |callback| is invoked when the 112 // message is fully written or an error occurrs. 113 struct WriteRequest { 114 explicit WriteRequest(const std::string& namespace_, 115 const std::string& payload, 116 net::CompletionOnceCallback callback); 117 WriteRequest(WriteRequest&& other); 118 ~WriteRequest(); 119 120 // Namespace of the serialized message. 121 std::string message_namespace; 122 // Write completion callback, invoked when the operation has completed or 123 // failed. 124 net::CompletionOnceCallback callback; 125 // Buffer with outgoing data. 126 scoped_refptr<net::DrainableIOBuffer> io_buffer; 127 }; 128 129 static bool IsTerminalReadState(ReadState read_state); 130 static bool IsTerminalWriteState(WriteState write_state); 131 132 void SetReadState(ReadState read_state); 133 void SetWriteState(WriteState write_state); 134 void SetErrorState(ChannelError error_state); 135 136 // Terminates all in-flight write callbacks with error code ERR_FAILED. 137 void FlushWriteQueue(); 138 139 // Main method that performs write flow state transitions. 140 void OnWriteResult(int result); 141 142 // Each of the below Do* method is executed in the corresponding 143 // write state. For example when write state is WRITE_STATE_WRITE_COMPLETE 144 // DowriteComplete is called, and so on. 145 int DoWrite(); 146 int DoWriteComplete(int result); 147 int DoWriteCallback(); 148 int DoWriteHandleError(int result); 149 150 // Main method that performs write flow state transitions. 151 void OnReadResult(int result); 152 153 // Each of the below Do* method is executed in the corresponding 154 // write state. For example when read state is READ_STATE_READ_COMPLETE 155 // DoReadComplete is called, and so on. 156 int DoRead(); 157 int DoReadComplete(int result); 158 int DoReadCallback(); 159 int DoReadHandleError(int result); 160 161 // Indicates that the transport object is started and may receive and send 162 // messages. 163 bool started_; 164 165 // Queue of pending writes. The message at the front of the queue is the one 166 // being written. 167 base::queue<WriteRequest> write_queue_; 168 169 // Buffer used for read operations. Reused for every read. 170 scoped_refptr<net::GrowableIOBuffer> read_buffer_; 171 172 // Constructs and parses the wire representation of message frames. 173 std::unique_ptr<MessageFramer> framer_; 174 175 // Last message received on the socket. 176 std::unique_ptr<CastMessage> current_message_; 177 178 // Channel used for I/O operations. 179 Channel* const channel_; 180 181 // Methods for communicating message receipt and error status to client code. 182 std::unique_ptr<Delegate> delegate_; 183 184 // Write flow state machine state. 185 WriteState write_state_; 186 187 // Read flow state machine state. 188 ReadState read_state_; 189 190 // The last error encountered by the channel. 191 ChannelError error_state_; 192 193 // Connection metadata for logging purposes. 194 // Socket ID assigned by ApiResourceManager. 195 int channel_id_; 196 197 // IP address of the remote end. 198 const net::IPEndPoint ip_endpoint_; 199 200 // Accumulates details of events and errors, for debugging purposes. 201 scoped_refptr<Logger> logger_; 202 203 SEQUENCE_CHECKER(sequence_checker_); 204 205 DISALLOW_COPY_AND_ASSIGN(CastTransportImpl); 206 }; 207 } // namespace cast_channel 208 209 #endif // COMPONENTS_CAST_CHANNEL_CAST_TRANSPORT_H_ 210