1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SIMPLE_PERF_UNIX_SOCKET_H_
18 #define SIMPLE_PERF_UNIX_SOCKET_H_
19 
20 #include <unistd.h>
21 
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <vector>
27 
28 #include <android-base/logging.h>
29 
30 #include "IOEventLoop.h"
31 #include "utils.h"
32 
33 // Class wrappers for unix socket communication operations.
34 
35 class UnixSocketConnection;
36 
37 // UnixSocketMessage is the message structure used for communication.
38 struct UnixSocketMessage {
39   uint32_t len;
40   uint32_t type;
41   char data[0];
42 };
43 
44 // We want to avoid memory copy by being able to cast from char array
45 // to UnixSocketMessage* directly (See the implementation in
46 // UnixSocketConnection::ConsumeDataInReadBuffer()). To access members
47 // of UnixSocketMessage and its extensions without causing alignment problems
48 // (On arm, some instructions (like LDRD) don't support unaligned address),
49 // we make sure all messages are stored at 8-bytes aligned addresses. Namely,
50 // each message will be padded to 8-bytes aligned size.
51 static constexpr uint32_t UnixSocketMessageAlignment = 8u;
52 
53 // UnixSocketMessageBuffer is a circular buffer used to store
54 // UnixSocketMessages.
55 class UnixSocketMessageBuffer {
56  public:
UnixSocketMessageBuffer(size_t capacity)57   explicit UnixSocketMessageBuffer(size_t capacity)
58       : data_(capacity), read_head_(0), valid_bytes_(0) {}
59 
Empty()60   bool Empty() const { return valid_bytes_ == 0; }
61 
HalfFull()62   bool HalfFull() const { return valid_bytes_ * 2 >= data_.size(); }
63 
StoreMessage(const UnixSocketMessage & message)64   bool StoreMessage(const UnixSocketMessage& message) {
65     uint32_t aligned_len = Align(message.len, UnixSocketMessageAlignment);
66     if (data_.size() - valid_bytes_ < aligned_len) {
67       return false;
68     }
69     uint32_t write_head = (read_head_ + valid_bytes_) % data_.size();
70     if (message.len <= data_.size() - write_head) {
71       memcpy(data_.data() + write_head, &message, message.len);
72     } else {
73       uint32_t len1 = data_.size() - write_head;
74       memcpy(data_.data() + write_head, &message, len1);
75       memcpy(data_.data(), reinterpret_cast<const char*>(&message) + len1,
76              message.len - len1);
77     }
78     valid_bytes_ += aligned_len;
79     return true;
80   }
81 
PeekData(const char ** pdata)82   size_t PeekData(const char** pdata) {
83     *pdata = &data_[read_head_];
84     if (read_head_ + valid_bytes_ <= data_.size()) {
85       return valid_bytes_;
86     }
87     return data_.size() - read_head_;
88   }
89 
CommitData(size_t size)90   void CommitData(size_t size) {
91     CHECK_GE(valid_bytes_, size);
92     read_head_ = (read_head_ + size) % data_.size();
93     valid_bytes_ -= size;
94   }
95 
96  private:
97   std::vector<char> data_;
98   uint32_t read_head_;
99   uint32_t valid_bytes_;
100 };
101 
102 // UnixSocketServer creates a unix socket server listening on a unix file path.
103 class UnixSocketServer {
104  public:
105   static std::unique_ptr<UnixSocketServer> Create(
106       const std::string& server_path, bool is_abstract);
107 
108   ~UnixSocketServer();
GetPath()109   const std::string& GetPath() const { return path_; }
110   std::unique_ptr<UnixSocketConnection> AcceptConnection();
111 
112  private:
UnixSocketServer(int server_fd,const std::string & path)113   UnixSocketServer(int server_fd, const std::string& path)
114       : server_fd_(server_fd), path_(path) {}
115   const int server_fd_;
116   const std::string path_;
117 };
118 
119 // UnixSocketConnection is used to communicate between server and client.
120 // It is either created by accepting a connection in UnixSocketServer, or by
121 // connecting to a UnixSocketServer.
122 // UnixSocketConnection binds to a IOEventLoop, so it writes messages to fd
123 // when it is writable, and read messages from fd when it is readable. To send
124 // messages, UnixSocketConnection uses a buffer to store to-be-sent messages.
125 // And whenever it receives a complete message from fd, it calls the callback
126 // function.
127 // In UnixSocketConnection, although user can send messages concurrently from
128 // different threads, only the thread running IOEventLoop::RunLoop() can
129 // do IO operations, calling WriteData() and ReadData(). To make it work
130 // properly, the thread creating/destroying UnixSocketConnection should be
131 // the same thread running IOEventLoop::RunLoop().
132 class UnixSocketConnection {
133  private:
134   static constexpr size_t SEND_BUFFER_SIZE = 512 * 1024;
135   static constexpr size_t READ_BUFFER_SIZE = 16 * 1024;
136 
137  public:
UnixSocketConnection(int fd)138   explicit UnixSocketConnection(int fd)
139       : fd_(fd),
140         read_buffer_(READ_BUFFER_SIZE),
141         read_buffer_size_(0),
142         read_event_(nullptr),
143         send_buffer_(SEND_BUFFER_SIZE),
144         write_event_enabled_(true),
145         write_event_(nullptr),
146         no_more_message_(false) {}
147 
148   static std::unique_ptr<UnixSocketConnection> Connect(
149       const std::string& server_path, bool is_abstract);
150 
151   ~UnixSocketConnection();
152 
IsClosed()153   bool IsClosed() {
154     return fd_ == -1;
155   }
156 
157   bool PrepareForIO(IOEventLoop& loop,
158                     const std::function<bool(const UnixSocketMessage&)>&
159                         receive_message_callback,
160                     const std::function<bool()>& close_connection_callback);
161 
162   // Thread-safe function, can be called from signal handler.
163   // The message is put into the send buffer. If [undelayed] is true, messages
164   // in the send buffer are sent immediately, otherwise they will be sent
165   // when the buffer is half full.
SendMessage(const UnixSocketMessage & message,bool undelayed)166   bool SendMessage(const UnixSocketMessage& message, bool undelayed) {
167     std::lock_guard<std::mutex> lock(send_buffer_and_write_event_mtx_);
168     if (no_more_message_ || !send_buffer_.StoreMessage(message)) {
169       return false;
170     }
171     // By buffering messages, we can effectively decrease context-switch times.
172     if (undelayed || send_buffer_.HalfFull()) {
173       return EnableWriteEventWithLock();
174     }
175     return true;
176   }
177 
178   // Thread-safe function.
179   // After NoMoreMessage(), the connection will not accept more messages
180   // in SendMessage(), and it will be closed after sending existing messages
181   // in send buffer.
NoMoreMessage()182   bool NoMoreMessage() {
183     std::lock_guard<std::mutex> lock(send_buffer_and_write_event_mtx_);
184     if (!no_more_message_) {
185       no_more_message_ = true;
186       return EnableWriteEventWithLock();
187     }
188     return true;
189   }
190 
191  private:
192   // The caller should have send_buffer_and_write_event_mtx_ locked.
EnableWriteEventWithLock()193   bool EnableWriteEventWithLock() {
194     if (!write_event_enabled_) {
195       if (!IOEventLoop::EnableEvent(write_event_)) {
196         return false;
197       }
198       write_event_enabled_ = true;
199     }
200     return true;
201   }
202   // The caller should have send_buffer_and_write_event_mtx_ locked.
DisableWriteEventWithLock()203   bool DisableWriteEventWithLock() {
204     if (write_event_enabled_) {
205       if (!IOEventLoop::DisableEvent(write_event_)) {
206         return false;
207       }
208       write_event_enabled_ = false;
209     }
210     return true;
211   }
212 
213   // Below functions are only called in the thread running IO operations.
214   bool WriteData();
215   bool GetDataFromSendBuffer(const char** pdata, size_t* pdata_size);
216   bool ReadData();
217   bool ConsumeDataInReadBuffer();
218   bool CloseConnection();
219 
220   // Below members can only be accessed in the thread running IO operations.
221   int fd_;
222   std::function<bool(const UnixSocketMessage&)> read_callback_;
223   std::function<bool()> close_callback_;
224   // read_buffer_ is used to cache data read from the other end.
225   // read_buffer_size_ is the number of valid bytes in read_buffer_.
226   std::vector<char> read_buffer_;
227   size_t read_buffer_size_;
228   IOEventRef read_event_;
229 
230   // send_buffer_and_write_event_mtx_ protects following members, which can be
231   // accessed in multiple threads.
232   std::mutex send_buffer_and_write_event_mtx_;
233   UnixSocketMessageBuffer send_buffer_;
234   bool write_event_enabled_;
235   IOEventRef write_event_;
236   bool no_more_message_;
237 };
238 
239 #endif  // SIMPLE_PERF_UNIX_SOCKET_H_
240