1 /*
2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
4 *
5 * SPDX-License-Identifier: LGPL-2.1-or-later
6 * See LICENSES/README.md for more information.
7 */
8
9 #include "ActorProtocol.h"
10
11 #include "threads/Event.h"
12
13 #include <cstring>
14
15 using namespace Actor;
16
Release()17 void Message::Release()
18 {
19 bool skip;
20 origin.Lock();
21 skip = isSync ? !isSyncFini : false;
22 isSyncFini = true;
23 origin.Unlock();
24
25 if (skip)
26 return;
27
28 // free data buffer
29 if (data != buffer)
30 delete [] data;
31
32 payloadObj.reset();
33
34 // delete event in case of sync message
35 delete event;
36
37 origin.ReturnMessage(this);
38 }
39
Reply(int sig,void * data,size_t size)40 bool Message::Reply(int sig, void *data /* = NULL*/, size_t size /* = 0 */)
41 {
42 if (!isSync)
43 {
44 if (isOut)
45 return origin.SendInMessage(sig, data, size);
46 else
47 return origin.SendOutMessage(sig, data, size);
48 }
49
50 origin.Lock();
51
52 if (!isSyncTimeout)
53 {
54 Message *msg = origin.GetMessage();
55 msg->signal = sig;
56 msg->isOut = !isOut;
57 replyMessage = msg;
58 if (data)
59 {
60 if (size > sizeof(msg->buffer))
61 msg->data = new uint8_t[size];
62 else
63 msg->data = msg->buffer;
64 memcpy(msg->data, data, size);
65 }
66 }
67
68 origin.Unlock();
69
70 if (event)
71 event->Set();
72
73 return true;
74 }
75
~Protocol()76 Protocol::~Protocol()
77 {
78 Message *msg;
79 Purge();
80 while (!freeMessageQueue.empty())
81 {
82 msg = freeMessageQueue.front();
83 freeMessageQueue.pop();
84 delete msg;
85 }
86 }
87
GetMessage()88 Message *Protocol::GetMessage()
89 {
90 Message *msg;
91
92 CSingleLock lock(criticalSection);
93
94 if (!freeMessageQueue.empty())
95 {
96 msg = freeMessageQueue.front();
97 freeMessageQueue.pop();
98 }
99 else
100 msg = new Message(*this);
101
102 msg->isSync = false;
103 msg->isSyncFini = false;
104 msg->isSyncTimeout = false;
105 msg->event = NULL;
106 msg->data = NULL;
107 msg->payloadSize = 0;
108 msg->replyMessage = NULL;
109
110 return msg;
111 }
112
ReturnMessage(Message * msg)113 void Protocol::ReturnMessage(Message *msg)
114 {
115 CSingleLock lock(criticalSection);
116
117 freeMessageQueue.push(msg);
118 }
119
SendOutMessage(int signal,const void * data,size_t size,Message * outMsg)120 bool Protocol::SendOutMessage(int signal,
121 const void* data /* = NULL */,
122 size_t size /* = 0 */,
123 Message* outMsg /* = NULL */)
124 {
125 Message *msg;
126 if (outMsg)
127 msg = outMsg;
128 else
129 msg = GetMessage();
130
131 msg->signal = signal;
132 msg->isOut = true;
133
134 if (data)
135 {
136 if (size > sizeof(msg->buffer))
137 msg->data = new uint8_t[size];
138 else
139 msg->data = msg->buffer;
140 memcpy(msg->data, data, size);
141 }
142
143 { CSingleLock lock(criticalSection);
144 outMessages.push(msg);
145 }
146 if (containerOutEvent)
147 containerOutEvent->Set();
148
149 return true;
150 }
151
SendOutMessage(int signal,CPayloadWrapBase * payload,Message * outMsg)152 bool Protocol::SendOutMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
153 {
154 Message *msg;
155 if (outMsg)
156 msg = outMsg;
157 else
158 msg = GetMessage();
159
160 msg->signal = signal;
161 msg->isOut = true;
162
163 msg->payloadObj.reset(payload);
164
165 { CSingleLock lock(criticalSection);
166 outMessages.push(msg);
167 }
168 if (containerOutEvent)
169 containerOutEvent->Set();
170
171 return true;
172 }
173
SendInMessage(int signal,const void * data,size_t size,Message * outMsg)174 bool Protocol::SendInMessage(int signal,
175 const void* data /* = NULL */,
176 size_t size /* = 0 */,
177 Message* outMsg /* = NULL */)
178 {
179 Message *msg;
180 if (outMsg)
181 msg = outMsg;
182 else
183 msg = GetMessage();
184
185 msg->signal = signal;
186 msg->isOut = false;
187
188 if (data)
189 {
190 if (size > sizeof(msg->data))
191 msg->data = new uint8_t[size];
192 else
193 msg->data = msg->buffer;
194 memcpy(msg->data, data, size);
195 }
196
197 { CSingleLock lock(criticalSection);
198 inMessages.push(msg);
199 }
200 if (containerInEvent)
201 containerInEvent->Set();
202
203 return true;
204 }
205
SendInMessage(int signal,CPayloadWrapBase * payload,Message * outMsg)206 bool Protocol::SendInMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
207 {
208 Message *msg;
209 if (outMsg)
210 msg = outMsg;
211 else
212 msg = GetMessage();
213
214 msg->signal = signal;
215 msg->isOut = false;
216
217 msg->payloadObj.reset(payload);
218
219 { CSingleLock lock(criticalSection);
220 inMessages.push(msg);
221 }
222 if (containerInEvent)
223 containerInEvent->Set();
224
225 return true;
226 }
227
SendOutMessageSync(int signal,Message ** retMsg,int timeout,const void * data,size_t size)228 bool Protocol::SendOutMessageSync(
229 int signal, Message** retMsg, int timeout, const void* data /* = NULL */, size_t size /* = 0 */)
230 {
231 Message *msg = GetMessage();
232 msg->isOut = true;
233 msg->isSync = true;
234 msg->event = new CEvent;
235 msg->event->Reset();
236 SendOutMessage(signal, data, size, msg);
237
238 if (!msg->event->WaitMSec(timeout))
239 {
240 const CSingleLock lock(criticalSection);
241 if (msg->replyMessage)
242 *retMsg = msg->replyMessage;
243 else
244 {
245 *retMsg = NULL;
246 msg->isSyncTimeout = true;
247 }
248 }
249 else
250 *retMsg = msg->replyMessage;
251
252 msg->Release();
253
254 if (*retMsg)
255 return true;
256 else
257 return false;
258 }
259
SendOutMessageSync(int signal,Message ** retMsg,int timeout,CPayloadWrapBase * payload)260 bool Protocol::SendOutMessageSync(int signal, Message **retMsg, int timeout, CPayloadWrapBase *payload)
261 {
262 Message *msg = GetMessage();
263 msg->isOut = true;
264 msg->isSync = true;
265 msg->event = new CEvent;
266 msg->event->Reset();
267 SendOutMessage(signal, payload, msg);
268
269 if (!msg->event->WaitMSec(timeout))
270 {
271 const CSingleLock lock(criticalSection);
272 if (msg->replyMessage)
273 *retMsg = msg->replyMessage;
274 else
275 {
276 *retMsg = NULL;
277 msg->isSyncTimeout = true;
278 }
279 }
280 else
281 *retMsg = msg->replyMessage;
282
283 msg->Release();
284
285 if (*retMsg)
286 return true;
287 else
288 return false;
289 }
290
ReceiveOutMessage(Message ** msg)291 bool Protocol::ReceiveOutMessage(Message **msg)
292 {
293 CSingleLock lock(criticalSection);
294
295 if (outMessages.empty() || outDefered)
296 return false;
297
298 *msg = outMessages.front();
299 outMessages.pop();
300
301 return true;
302 }
303
ReceiveInMessage(Message ** msg)304 bool Protocol::ReceiveInMessage(Message **msg)
305 {
306 CSingleLock lock(criticalSection);
307
308 if (inMessages.empty() || inDefered)
309 return false;
310
311 *msg = inMessages.front();
312 inMessages.pop();
313
314 return true;
315 }
316
317
Purge()318 void Protocol::Purge()
319 {
320 Message *msg;
321
322 while (ReceiveInMessage(&msg))
323 msg->Release();
324
325 while (ReceiveOutMessage(&msg))
326 msg->Release();
327 }
328
PurgeIn(int signal)329 void Protocol::PurgeIn(int signal)
330 {
331 Message *msg;
332 std::queue<Message*> msgs;
333
334 CSingleLock lock(criticalSection);
335
336 while (!inMessages.empty())
337 {
338 msg = inMessages.front();
339 inMessages.pop();
340 if (msg->signal != signal)
341 msgs.push(msg);
342 }
343 while (!msgs.empty())
344 {
345 msg = msgs.front();
346 msgs.pop();
347 inMessages.push(msg);
348 }
349 }
350
PurgeOut(int signal)351 void Protocol::PurgeOut(int signal)
352 {
353 Message *msg;
354 std::queue<Message*> msgs;
355
356 CSingleLock lock(criticalSection);
357
358 while (!outMessages.empty())
359 {
360 msg = outMessages.front();
361 outMessages.pop();
362 if (msg->signal != signal)
363 msgs.push(msg);
364 }
365 while (!msgs.empty())
366 {
367 msg = msgs.front();
368 msgs.pop();
369 outMessages.push(msg);
370 }
371 }
372