1 /*
2  *  Copyright (C) 2005-2018 Team Kodi
3  *  This file is part of Kodi - https://kodi.tv
4  *
5  *  SPDX-License-Identifier: LGPL-2.1-or-later
6  *  See LICENSES/README.md for more information.
7  */
8 
9 #include "ActorProtocol.h"
10 
11 #include "threads/Event.h"
12 
13 #include <cstring>
14 
15 using namespace Actor;
16 
Release()17 void Message::Release()
18 {
19   bool skip;
20   origin.Lock();
21   skip = isSync ? !isSyncFini : false;
22   isSyncFini = true;
23   origin.Unlock();
24 
25   if (skip)
26     return;
27 
28   // free data buffer
29   if (data != buffer)
30     delete [] data;
31 
32   payloadObj.reset();
33 
34   // delete event in case of sync message
35   delete event;
36 
37   origin.ReturnMessage(this);
38 }
39 
Reply(int sig,void * data,size_t size)40 bool Message::Reply(int sig, void *data /* = NULL*/, size_t size /* = 0 */)
41 {
42   if (!isSync)
43   {
44     if (isOut)
45       return origin.SendInMessage(sig, data, size);
46     else
47       return origin.SendOutMessage(sig, data, size);
48   }
49 
50   origin.Lock();
51 
52   if (!isSyncTimeout)
53   {
54     Message *msg = origin.GetMessage();
55     msg->signal = sig;
56     msg->isOut = !isOut;
57     replyMessage = msg;
58     if (data)
59     {
60       if (size > sizeof(msg->buffer))
61         msg->data = new uint8_t[size];
62       else
63         msg->data = msg->buffer;
64       memcpy(msg->data, data, size);
65     }
66   }
67 
68   origin.Unlock();
69 
70   if (event)
71     event->Set();
72 
73   return true;
74 }
75 
~Protocol()76 Protocol::~Protocol()
77 {
78   Message *msg;
79   Purge();
80   while (!freeMessageQueue.empty())
81   {
82     msg = freeMessageQueue.front();
83     freeMessageQueue.pop();
84     delete msg;
85   }
86 }
87 
GetMessage()88 Message *Protocol::GetMessage()
89 {
90   Message *msg;
91 
92   CSingleLock lock(criticalSection);
93 
94   if (!freeMessageQueue.empty())
95   {
96     msg = freeMessageQueue.front();
97     freeMessageQueue.pop();
98   }
99   else
100     msg = new Message(*this);
101 
102   msg->isSync = false;
103   msg->isSyncFini = false;
104   msg->isSyncTimeout = false;
105   msg->event = NULL;
106   msg->data = NULL;
107   msg->payloadSize = 0;
108   msg->replyMessage = NULL;
109 
110   return msg;
111 }
112 
ReturnMessage(Message * msg)113 void Protocol::ReturnMessage(Message *msg)
114 {
115   CSingleLock lock(criticalSection);
116 
117   freeMessageQueue.push(msg);
118 }
119 
SendOutMessage(int signal,const void * data,size_t size,Message * outMsg)120 bool Protocol::SendOutMessage(int signal,
121                               const void* data /* = NULL */,
122                               size_t size /* = 0 */,
123                               Message* outMsg /* = NULL */)
124 {
125   Message *msg;
126   if (outMsg)
127     msg = outMsg;
128   else
129     msg = GetMessage();
130 
131   msg->signal = signal;
132   msg->isOut = true;
133 
134   if (data)
135   {
136     if (size > sizeof(msg->buffer))
137       msg->data = new uint8_t[size];
138     else
139       msg->data = msg->buffer;
140     memcpy(msg->data, data, size);
141   }
142 
143   { CSingleLock lock(criticalSection);
144     outMessages.push(msg);
145   }
146   if (containerOutEvent)
147     containerOutEvent->Set();
148 
149   return true;
150 }
151 
SendOutMessage(int signal,CPayloadWrapBase * payload,Message * outMsg)152 bool Protocol::SendOutMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
153 {
154   Message *msg;
155   if (outMsg)
156     msg = outMsg;
157   else
158     msg = GetMessage();
159 
160   msg->signal = signal;
161   msg->isOut = true;
162 
163   msg->payloadObj.reset(payload);
164 
165   { CSingleLock lock(criticalSection);
166     outMessages.push(msg);
167   }
168   if (containerOutEvent)
169     containerOutEvent->Set();
170 
171   return true;
172 }
173 
SendInMessage(int signal,const void * data,size_t size,Message * outMsg)174 bool Protocol::SendInMessage(int signal,
175                              const void* data /* = NULL */,
176                              size_t size /* = 0 */,
177                              Message* outMsg /* = NULL */)
178 {
179   Message *msg;
180   if (outMsg)
181     msg = outMsg;
182   else
183     msg = GetMessage();
184 
185   msg->signal = signal;
186   msg->isOut = false;
187 
188   if (data)
189   {
190     if (size > sizeof(msg->data))
191       msg->data = new uint8_t[size];
192     else
193       msg->data = msg->buffer;
194     memcpy(msg->data, data, size);
195   }
196 
197   { CSingleLock lock(criticalSection);
198     inMessages.push(msg);
199   }
200   if (containerInEvent)
201     containerInEvent->Set();
202 
203   return true;
204 }
205 
SendInMessage(int signal,CPayloadWrapBase * payload,Message * outMsg)206 bool Protocol::SendInMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
207 {
208   Message *msg;
209   if (outMsg)
210     msg = outMsg;
211   else
212     msg = GetMessage();
213 
214   msg->signal = signal;
215   msg->isOut = false;
216 
217   msg->payloadObj.reset(payload);
218 
219   { CSingleLock lock(criticalSection);
220     inMessages.push(msg);
221   }
222   if (containerInEvent)
223     containerInEvent->Set();
224 
225   return true;
226 }
227 
SendOutMessageSync(int signal,Message ** retMsg,int timeout,const void * data,size_t size)228 bool Protocol::SendOutMessageSync(
229     int signal, Message** retMsg, int timeout, const void* data /* = NULL */, size_t size /* = 0 */)
230 {
231   Message *msg = GetMessage();
232   msg->isOut = true;
233   msg->isSync = true;
234   msg->event = new CEvent;
235   msg->event->Reset();
236   SendOutMessage(signal, data, size, msg);
237 
238   if (!msg->event->WaitMSec(timeout))
239   {
240     const CSingleLock lock(criticalSection);
241     if (msg->replyMessage)
242       *retMsg = msg->replyMessage;
243     else
244     {
245       *retMsg = NULL;
246       msg->isSyncTimeout = true;
247     }
248   }
249   else
250     *retMsg = msg->replyMessage;
251 
252   msg->Release();
253 
254   if (*retMsg)
255     return true;
256   else
257     return false;
258 }
259 
SendOutMessageSync(int signal,Message ** retMsg,int timeout,CPayloadWrapBase * payload)260 bool Protocol::SendOutMessageSync(int signal, Message **retMsg, int timeout, CPayloadWrapBase *payload)
261 {
262   Message *msg = GetMessage();
263   msg->isOut = true;
264   msg->isSync = true;
265   msg->event = new CEvent;
266   msg->event->Reset();
267   SendOutMessage(signal, payload, msg);
268 
269   if (!msg->event->WaitMSec(timeout))
270   {
271     const CSingleLock lock(criticalSection);
272     if (msg->replyMessage)
273       *retMsg = msg->replyMessage;
274     else
275     {
276       *retMsg = NULL;
277       msg->isSyncTimeout = true;
278     }
279   }
280   else
281     *retMsg = msg->replyMessage;
282 
283   msg->Release();
284 
285   if (*retMsg)
286     return true;
287   else
288     return false;
289 }
290 
ReceiveOutMessage(Message ** msg)291 bool Protocol::ReceiveOutMessage(Message **msg)
292 {
293   CSingleLock lock(criticalSection);
294 
295   if (outMessages.empty() || outDefered)
296     return false;
297 
298   *msg = outMessages.front();
299   outMessages.pop();
300 
301   return true;
302 }
303 
ReceiveInMessage(Message ** msg)304 bool Protocol::ReceiveInMessage(Message **msg)
305 {
306   CSingleLock lock(criticalSection);
307 
308   if (inMessages.empty() || inDefered)
309     return false;
310 
311   *msg = inMessages.front();
312   inMessages.pop();
313 
314   return true;
315 }
316 
317 
Purge()318 void Protocol::Purge()
319 {
320   Message *msg;
321 
322   while (ReceiveInMessage(&msg))
323     msg->Release();
324 
325   while (ReceiveOutMessage(&msg))
326     msg->Release();
327 }
328 
PurgeIn(int signal)329 void Protocol::PurgeIn(int signal)
330 {
331   Message *msg;
332   std::queue<Message*> msgs;
333 
334   CSingleLock lock(criticalSection);
335 
336   while (!inMessages.empty())
337   {
338     msg = inMessages.front();
339     inMessages.pop();
340     if (msg->signal != signal)
341       msgs.push(msg);
342   }
343   while (!msgs.empty())
344   {
345     msg = msgs.front();
346     msgs.pop();
347     inMessages.push(msg);
348   }
349 }
350 
PurgeOut(int signal)351 void Protocol::PurgeOut(int signal)
352 {
353   Message *msg;
354   std::queue<Message*> msgs;
355 
356   CSingleLock lock(criticalSection);
357 
358   while (!outMessages.empty())
359   {
360     msg = outMessages.front();
361     outMessages.pop();
362     if (msg->signal != signal)
363       msgs.push(msg);
364   }
365   while (!msgs.empty())
366   {
367     msg = msgs.front();
368     msgs.pop();
369     outMessages.push(msg);
370   }
371 }
372