1 // Copyright (c) 2012- PPSSPP Project.
2
3 // This program is free software: you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation, version 2.0 or later versions.
6
7 // This program is distributed in the hope that it will be useful,
8 // but WITHOUT ANY WARRANTY; without even the implied warranty of
9 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 // GNU General Public License 2.0 for more details.
11
12 // A copy of the GPL 2.0 should have been included with the program.
13 // If not, see http://www.gnu.org/licenses/
14
15 // Official git repository and contact information can be found at
16 // https://github.com/hrydgard/ppsspp and http://www.ppsspp.org/.
17
18 #include <algorithm>
19
20 #include "Common/Serialize/Serializer.h"
21 #include "Common/Serialize/SerializeFuncs.h"
22 #include "Common/Serialize/SerializeMap.h"
23 #include "Core/Reporting.h"
24 #include "Core/CoreTiming.h"
25 #include "Core/MemMapHelpers.h"
26 #include "Core/HLE/HLE.h"
27 #include "Core/HLE/sceKernel.h"
28 #include "Core/HLE/sceKernelMsgPipe.h"
29 #include "Core/HLE/sceKernelMemory.h"
30 #include "Core/HLE/sceKernelInterrupt.h"
31 #include "Core/HLE/sceKernelThread.h"
32 #include "Core/HLE/KernelWaitHelpers.h"
33
34 #define SCE_KERNEL_MPA_THFIFO_S 0x0000
35 #define SCE_KERNEL_MPA_THPRI_S 0x0100
36 #define SCE_KERNEL_MPA_THFIFO_R 0x0000
37 #define SCE_KERNEL_MPA_THPRI_R 0x1000
38 #define SCE_KERNEL_MPA_HIGHMEM 0x4000
39 #define SCE_KERNEL_MPA_KNOWN (SCE_KERNEL_MPA_THPRI_S | SCE_KERNEL_MPA_THPRI_R | SCE_KERNEL_MPA_HIGHMEM)
40
41 #define SCE_KERNEL_MPW_FULL 0
42 #define SCE_KERNEL_MPW_ASAP 1
43
44 static const u32 MSGPIPE_WAIT_VALUE_SEND = 0;
45 static const u32 MSGPIPE_WAIT_VALUE_RECV = 1;
46
47 // State: the timer for MsgPipe timeouts.
48 static int waitTimer = -1;
49
50 struct NativeMsgPipe
51 {
52 SceSize_le size;
53 char name[32];
54 SceUInt_le attr;
55 s32_le bufSize;
56 s32_le freeSize;
57 s32_le numSendWaitThreads;
58 s32_le numReceiveWaitThreads;
59 };
60
61 struct MsgPipeWaitingThread
62 {
63 SceUID threadID;
64 u32 bufAddr;
65 u32 bufSize;
66 // Free space at the end for receive, valid/free to read bytes from end for send.
67 u32 freeSize;
68 s32 waitMode;
69 PSPPointer<u32_le> transferredBytes;
70 u64 pausedTimeout;
71
IsStillWaitingMsgPipeWaitingThread72 bool IsStillWaiting(SceUID waitID) const
73 {
74 return HLEKernel::VerifyWait(threadID, WAITTYPE_MSGPIPE, waitID);
75 }
76
WriteCurrentTimeoutMsgPipeWaitingThread77 void WriteCurrentTimeout(SceUID waitID) const
78 {
79 u32 error;
80 if (IsStillWaiting(waitID))
81 {
82 u32 timeoutPtr = __KernelGetWaitTimeoutPtr(threadID, error);
83 if (timeoutPtr != 0 && waitTimer != -1)
84 {
85 // Remove any event for this thread.
86 s64 cyclesLeft = CoreTiming::UnscheduleEvent(waitTimer, threadID);
87 Memory::Write_U32((u32) cyclesToUs(cyclesLeft), timeoutPtr);
88 }
89 }
90 }
91
CompleteMsgPipeWaitingThread92 void Complete(SceUID waitID, int result) const
93 {
94 if (IsStillWaiting(waitID))
95 {
96 WriteCurrentTimeout(waitID);
97 __KernelResumeThreadFromWait(threadID, result);
98 }
99 }
100
CancelMsgPipeWaitingThread101 void Cancel(SceUID waitID, int result) const
102 {
103 Complete(waitID, result);
104 }
105
ReadBufferMsgPipeWaitingThread106 void ReadBuffer(u32 destPtr, u32 len)
107 {
108 Memory::Memcpy(destPtr, bufAddr + bufSize - freeSize, len, "MsgPipeReadBuffer");
109 freeSize -= len;
110 if (transferredBytes.IsValid())
111 *transferredBytes += len;
112 }
113
WriteBufferMsgPipeWaitingThread114 void WriteBuffer(u32 srcPtr, u32 len)
115 {
116 Memory::Memcpy(bufAddr + (bufSize - freeSize), srcPtr, len, "MsgPipeWriteBuffer");
117 freeSize -= len;
118 if (transferredBytes.IsValid())
119 *transferredBytes += len;
120 }
121
operator ==MsgPipeWaitingThread122 bool operator ==(const SceUID &otherThreadID) const
123 {
124 return threadID == otherThreadID;
125 }
126 };
127
__KernelMsgPipeThreadSortPriority(MsgPipeWaitingThread thread1,MsgPipeWaitingThread thread2)128 static bool __KernelMsgPipeThreadSortPriority(MsgPipeWaitingThread thread1, MsgPipeWaitingThread thread2)
129 {
130 return __KernelThreadSortPriority(thread1.threadID, thread2.threadID);
131 }
132
133 struct MsgPipe : public KernelObject
134 {
GetNameMsgPipe135 const char *GetName() override { return nmp.name; }
GetTypeNameMsgPipe136 const char *GetTypeName() override { return GetStaticTypeName(); }
GetStaticTypeNameMsgPipe137 static const char *GetStaticTypeName() { return "MsgPipe"; }
GetMissingErrorCodeMsgPipe138 static u32 GetMissingErrorCode() { return SCE_KERNEL_ERROR_UNKNOWN_MPPID; }
GetStaticIDTypeMsgPipe139 static int GetStaticIDType() { return SCE_KERNEL_TMID_Mpipe; }
GetIDTypeMsgPipe140 int GetIDType() const override { return SCE_KERNEL_TMID_Mpipe; }
141
MsgPipeMsgPipe142 MsgPipe() : buffer(0) {}
~MsgPipeMsgPipe143 ~MsgPipe()
144 {
145 if (buffer != 0)
146 userMemory.Free(buffer);
147 }
148
GetUsedSizeMsgPipe149 u32 GetUsedSize()
150 {
151 return (u32)(nmp.bufSize - nmp.freeSize);
152 }
153
AddWaitingThreadMsgPipe154 void AddWaitingThread(std::vector<MsgPipeWaitingThread> &list, SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr)
155 {
156 MsgPipeWaitingThread thread = { id, addr, size, size, waitMode, { transferredBytesAddr } };
157 // Start out with 0 transferred bytes while waiting.
158 // TODO: for receive, it might be a different (partial) number.
159 if (thread.transferredBytes.IsValid())
160 *thread.transferredBytes = 0;
161
162 list.push_back(thread);
163 }
164
AddSendWaitingThreadMsgPipe165 void AddSendWaitingThread(SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr)
166 {
167 AddWaitingThread(sendWaitingThreads, id, addr, size, waitMode, transferredBytesAddr);
168 }
169
AddReceiveWaitingThreadMsgPipe170 void AddReceiveWaitingThread(SceUID id, u32 addr, u32 size, int waitMode, u32 transferredBytesAddr)
171 {
172 AddWaitingThread(receiveWaitingThreads, id, addr, size, waitMode, transferredBytesAddr);
173 }
174
CheckSendThreadsMsgPipe175 bool CheckSendThreads()
176 {
177 SortSendThreads();
178
179 bool wokeThreads = false;
180 bool filledSpace = false;
181 while (!sendWaitingThreads.empty() && nmp.freeSize > 0)
182 {
183 MsgPipeWaitingThread *thread = &sendWaitingThreads.front();
184 u32 bytesToSend = std::min(thread->freeSize, (u32) nmp.freeSize);
185
186 thread->ReadBuffer(buffer + GetUsedSize(), bytesToSend);
187 nmp.freeSize -= bytesToSend;
188 filledSpace = true;
189
190 if (thread->waitMode == SCE_KERNEL_MPW_ASAP || thread->freeSize == 0)
191 {
192 thread->Complete(GetUID(), 0);
193 sendWaitingThreads.erase(sendWaitingThreads.begin());
194 wokeThreads = true;
195 thread = NULL;
196 }
197 // Unlike receives, we don't do partial sends. Stop at first blocked thread.
198 else
199 break;
200 }
201
202 if (filledSpace)
203 wokeThreads |= CheckReceiveThreads();
204
205 return wokeThreads;
206 }
207
208 // This function should be only ran when the temporary buffer size is not 0 (otherwise, data is copied directly to the threads)
CheckReceiveThreadsMsgPipe209 bool CheckReceiveThreads()
210 {
211 SortReceiveThreads();
212
213 bool wokeThreads = false;
214 bool freedSpace = false;
215 while (!receiveWaitingThreads.empty() && GetUsedSize() > 0)
216 {
217 MsgPipeWaitingThread *thread = &receiveWaitingThreads.front();
218 // Receive as much as possible, even if it's not enough to wake up.
219 u32 bytesToSend = std::min(thread->freeSize, GetUsedSize());
220
221 u8* ptr = Memory::GetPointer(buffer);
222 thread->WriteBuffer(buffer, bytesToSend);
223 // Put the unused data at the start of the buffer.
224 nmp.freeSize += bytesToSend;
225 memmove(ptr, ptr + bytesToSend, GetUsedSize());
226 freedSpace = true;
227
228 if (thread->waitMode == SCE_KERNEL_MPW_ASAP || thread->freeSize == 0)
229 {
230 thread->Complete(GetUID(), 0);
231 receiveWaitingThreads.erase(receiveWaitingThreads.begin());
232 wokeThreads = true;
233 thread = NULL;
234 }
235 // Stop at the first that can't wake up.
236 else
237 break;
238 }
239
240 if (freedSpace)
241 wokeThreads |= CheckSendThreads();
242
243 return wokeThreads;
244 }
245
SortThreadsMsgPipe246 void SortThreads(std::vector<MsgPipeWaitingThread> &waitingThreads, bool usePrio)
247 {
248 // Clean up any not waiting at the same time.
249 HLEKernel::CleanupWaitingThreads(WAITTYPE_MSGPIPE, GetUID(), waitingThreads);
250
251 if (usePrio)
252 std::stable_sort(waitingThreads.begin(), waitingThreads.end(), __KernelMsgPipeThreadSortPriority);
253 }
254
SortReceiveThreadsMsgPipe255 void SortReceiveThreads()
256 {
257 bool usePrio = (nmp.attr & SCE_KERNEL_MPA_THPRI_R) != 0;
258 SortThreads(receiveWaitingThreads, usePrio);
259 }
260
SortSendThreadsMsgPipe261 void SortSendThreads()
262 {
263 bool usePrio = (nmp.attr & SCE_KERNEL_MPA_THPRI_S) != 0;
264 SortThreads(sendWaitingThreads, usePrio);
265 }
266
RemoveReceiveWaitingThreadMsgPipe267 void RemoveReceiveWaitingThread(SceUID threadID)
268 {
269 HLEKernel::RemoveWaitingThread(receiveWaitingThreads, threadID);
270 }
271
RemoveSendWaitingThreadMsgPipe272 void RemoveSendWaitingThread(SceUID threadID)
273 {
274 HLEKernel::RemoveWaitingThread(sendWaitingThreads, threadID);
275 }
276
DoStateMsgPipe277 void DoState(PointerWrap &p) override
278 {
279 auto s = p.Section("MsgPipe", 1);
280 if (!s)
281 return;
282
283 Do(p, nmp);
284 MsgPipeWaitingThread mpwt1 = {0}, mpwt2 = {0};
285 Do(p, sendWaitingThreads, mpwt1);
286 Do(p, receiveWaitingThreads, mpwt2);
287 Do(p, pausedSendWaits);
288 Do(p, pausedReceiveWaits);
289 Do(p, buffer);
290 }
291
292 NativeMsgPipe nmp;
293
294 std::vector<MsgPipeWaitingThread> sendWaitingThreads;
295 std::vector<MsgPipeWaitingThread> receiveWaitingThreads;
296 // Key is the callback id it was for, or if no callback, the thread id.
297 std::map<SceUID, MsgPipeWaitingThread> pausedSendWaits;
298 std::map<SceUID, MsgPipeWaitingThread> pausedReceiveWaits;
299
300 u32 buffer;
301 };
302
__KernelMsgPipeObject()303 KernelObject *__KernelMsgPipeObject()
304 {
305 return new MsgPipe;
306 }
307
__KernelMsgPipeTimeout(u64 userdata,int cyclesLate)308 static void __KernelMsgPipeTimeout(u64 userdata, int cyclesLate)
309 {
310 SceUID threadID = (SceUID) userdata;
311 HLEKernel::WaitExecTimeout<MsgPipe, WAITTYPE_MSGPIPE>(threadID);
312 }
313
__KernelSetMsgPipeTimeout(u32 timeoutPtr)314 static bool __KernelSetMsgPipeTimeout(u32 timeoutPtr)
315 {
316 if (timeoutPtr == 0 || waitTimer == -1)
317 return true;
318
319 int micro = (int) Memory::Read_U32(timeoutPtr);
320 if (micro <= 2)
321 {
322 // Don't wait or reschedule, just timeout immediately.
323 return false;
324 }
325
326 if (micro <= 210)
327 micro = 250;
328 CoreTiming::ScheduleEvent(usToCycles(micro), waitTimer, __KernelGetCurThread());
329 return true;
330 }
331
__KernelSendMsgPipe(MsgPipe * m,u32 sendBufAddr,u32 sendSize,int waitMode,u32 resultAddr,u32 timeoutPtr,bool cbEnabled,bool poll,bool & needsResched,bool & needsWait)332 static int __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool poll, bool &needsResched, bool &needsWait)
333 {
334 u32 curSendAddr = sendBufAddr;
335 SceUID uid = m->GetUID();
336
337 // If the buffer size is 0, nothing is buffered and all operations wait.
338 if (m->nmp.bufSize == 0)
339 {
340 m->SortReceiveThreads();
341
342 while (!m->receiveWaitingThreads.empty() && sendSize != 0)
343 {
344 MsgPipeWaitingThread *thread = &m->receiveWaitingThreads.front();
345
346 u32 bytesToSend = std::min(thread->freeSize, sendSize);
347 if (bytesToSend > 0)
348 {
349 thread->WriteBuffer(curSendAddr, bytesToSend);
350 sendSize -= bytesToSend;
351 curSendAddr += bytesToSend;
352
353 if (thread->freeSize == 0 || thread->waitMode == SCE_KERNEL_MPW_ASAP)
354 {
355 thread->Complete(uid, 0);
356 m->receiveWaitingThreads.erase(m->receiveWaitingThreads.begin());
357 needsResched = true;
358 thread = NULL;
359 }
360 }
361 }
362
363 // If there is still data to send and (we want to send all of it or we didn't send anything)
364 if (sendSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curSendAddr == sendBufAddr))
365 {
366 if (poll)
367 {
368 // Generally, result is not updated in this case. But for a 0 size buffer in ASAP mode, it is.
369 if (Memory::IsValidAddress(resultAddr) && waitMode == SCE_KERNEL_MPW_ASAP)
370 Memory::Write_U32(curSendAddr - sendBufAddr, resultAddr);
371 return SCE_KERNEL_ERROR_MPP_FULL;
372 }
373 else
374 {
375 m->AddSendWaitingThread(__KernelGetCurThread(), curSendAddr, sendSize, waitMode, resultAddr);
376 needsWait = true;
377 return 0;
378 }
379 }
380 }
381 else
382 {
383 if (sendSize > (u32) m->nmp.bufSize)
384 {
385 ERROR_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): size %d too large for buffer", uid, sendSize);
386 return SCE_KERNEL_ERROR_ILLEGAL_SIZE;
387 }
388
389 u32 bytesToSend = 0;
390 // If others are already waiting, space or not, we have to get in line.
391 m->SortSendThreads();
392 if (m->sendWaitingThreads.empty())
393 {
394 if (sendSize <= (u32) m->nmp.freeSize)
395 bytesToSend = sendSize;
396 else if (waitMode == SCE_KERNEL_MPW_ASAP)
397 bytesToSend = m->nmp.freeSize;
398 }
399
400 if (bytesToSend != 0)
401 {
402 Memory::Memcpy(m->buffer + (m->nmp.bufSize - m->nmp.freeSize), sendBufAddr, bytesToSend, "MsgPipeSend");
403 m->nmp.freeSize -= bytesToSend;
404 curSendAddr += bytesToSend;
405 sendSize -= bytesToSend;
406
407 if (m->CheckReceiveThreads())
408 needsResched = true;
409 }
410 else if (sendSize != 0)
411 {
412 if (poll)
413 return SCE_KERNEL_ERROR_MPP_FULL;
414 else
415 {
416 m->AddSendWaitingThread(__KernelGetCurThread(), curSendAddr, sendSize, waitMode, resultAddr);
417 needsWait = true;
418 return 0;
419 }
420 }
421 }
422
423 // We didn't wait, so update the number of bytes transferred now.
424 if (Memory::IsValidAddress(resultAddr))
425 Memory::Write_U32(curSendAddr - sendBufAddr, resultAddr);
426
427 return 0;
428 }
429
__KernelReceiveMsgPipe(MsgPipe * m,u32 receiveBufAddr,u32 receiveSize,int waitMode,u32 resultAddr,u32 timeoutPtr,bool cbEnabled,bool poll,bool & needsResched,bool & needsWait)430 static int __KernelReceiveMsgPipe(MsgPipe *m, u32 receiveBufAddr, u32 receiveSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool poll, bool &needsResched, bool &needsWait)
431 {
432 u32 curReceiveAddr = receiveBufAddr;
433 SceUID uid = m->GetUID();
434
435 // MsgPipe buffer size is 0, receiving directly from waiting send threads
436 if (m->nmp.bufSize == 0)
437 {
438 m->SortSendThreads();
439
440 // While they're still sending waiting threads (which can send data)
441 while (!m->sendWaitingThreads.empty() && receiveSize != 0)
442 {
443 MsgPipeWaitingThread *thread = &m->sendWaitingThreads.front();
444
445 // For send threads, "freeSize" is "free to be read".
446 u32 bytesToReceive = std::min(thread->freeSize, receiveSize);
447 if (bytesToReceive > 0)
448 {
449 thread->ReadBuffer(curReceiveAddr, bytesToReceive);
450 receiveSize -= bytesToReceive;
451 curReceiveAddr += bytesToReceive;
452
453 if (thread->freeSize == 0 || thread->waitMode == SCE_KERNEL_MPW_ASAP)
454 {
455 thread->Complete(uid, 0);
456 m->sendWaitingThreads.erase(m->sendWaitingThreads.begin());
457 needsResched = true;
458 thread = NULL;
459 }
460 }
461 }
462
463 // All data hasn't been received and (mode isn't ASAP or nothing was received)
464 if (receiveSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curReceiveAddr == receiveBufAddr))
465 {
466 if (poll)
467 {
468 // Generally, result is not updated in this case. But for a 0 size buffer in ASAP mode, it is.
469 if (Memory::IsValidAddress(resultAddr) && waitMode == SCE_KERNEL_MPW_ASAP)
470 Memory::Write_U32(curReceiveAddr - receiveBufAddr, resultAddr);
471 return SCE_KERNEL_ERROR_MPP_EMPTY;
472 }
473 else
474 {
475 m->AddReceiveWaitingThread(__KernelGetCurThread(), curReceiveAddr, receiveSize, waitMode, resultAddr);
476 needsWait = true;
477 return 0;
478 }
479 }
480 }
481 // Getting data from the MsgPipe buffer
482 else
483 {
484 if (receiveSize > (u32) m->nmp.bufSize)
485 {
486 ERROR_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): size %d too large for buffer", uid, receiveSize);
487 return SCE_KERNEL_ERROR_ILLEGAL_SIZE;
488 }
489
490 while (m->GetUsedSize() > 0)
491 {
492 u32 bytesToReceive = std::min(receiveSize, m->GetUsedSize());
493 if (bytesToReceive != 0)
494 {
495 Memory::Memcpy(curReceiveAddr, m->buffer, bytesToReceive, "MsgPipeReceive");
496 m->nmp.freeSize += bytesToReceive;
497 memmove(Memory::GetPointer(m->buffer), Memory::GetPointer(m->buffer) + bytesToReceive, m->GetUsedSize());
498 curReceiveAddr += bytesToReceive;
499 receiveSize -= bytesToReceive;
500
501 m->CheckSendThreads();
502 }
503 else
504 break;
505 }
506
507 if (receiveSize != 0 && (waitMode != SCE_KERNEL_MPW_ASAP || curReceiveAddr == receiveBufAddr))
508 {
509 if (poll)
510 return SCE_KERNEL_ERROR_MPP_EMPTY;
511 else
512 {
513 m->AddReceiveWaitingThread(__KernelGetCurThread(), curReceiveAddr, receiveSize, waitMode, resultAddr);
514 needsWait = true;
515 return 0;
516 }
517 }
518 }
519
520 if (Memory::IsValidAddress(resultAddr))
521 Memory::Write_U32(curReceiveAddr - receiveBufAddr, resultAddr);
522
523 return 0;
524 }
525
__KernelMsgPipeBeginCallback(SceUID threadID,SceUID prevCallbackId)526 static void __KernelMsgPipeBeginCallback(SceUID threadID, SceUID prevCallbackId)
527 {
528 u32 error;
529 u32 waitValue = __KernelGetWaitValue(threadID, error);
530 u32 timeoutPtr = __KernelGetWaitTimeoutPtr(threadID, error);
531 SceUID uid = __KernelGetWaitID(threadID, WAITTYPE_MSGPIPE, error);
532 MsgPipe *ko = uid == 0 ? NULL : kernelObjects.Get<MsgPipe>(uid, error);
533
534 switch (waitValue)
535 {
536 case MSGPIPE_WAIT_VALUE_SEND:
537 if (ko)
538 {
539 auto result = HLEKernel::WaitBeginCallback<MsgPipeWaitingThread>(threadID, prevCallbackId, waitTimer, ko->sendWaitingThreads, ko->pausedSendWaits, timeoutPtr != 0);
540 if (result == HLEKernel::WAIT_CB_SUCCESS)
541 DEBUG_LOG(SCEKERNEL, "sceKernelSendMsgPipeCB: Suspending wait for callback");
542 else if (result == HLEKernel::WAIT_CB_BAD_WAIT_DATA)
543 ERROR_LOG_REPORT(SCEKERNEL, "sceKernelSendMsgPipeCB: wait not found to pause for callback");
544 }
545 else
546 WARN_LOG_REPORT(SCEKERNEL, "sceKernelSendMsgPipeCB: beginning callback with bad wait id?");
547 break;
548
549 case MSGPIPE_WAIT_VALUE_RECV:
550 if (ko)
551 {
552 auto result = HLEKernel::WaitBeginCallback<MsgPipeWaitingThread>(threadID, prevCallbackId, waitTimer, ko->receiveWaitingThreads, ko->pausedReceiveWaits, timeoutPtr != 0);
553 if (result == HLEKernel::WAIT_CB_SUCCESS)
554 DEBUG_LOG(SCEKERNEL, "sceKernelReceiveMsgPipeCB: Suspending wait for callback");
555 else if (result == HLEKernel::WAIT_CB_BAD_WAIT_DATA)
556 ERROR_LOG_REPORT(SCEKERNEL, "sceKernelReceiveMsgPipeCB: wait not found to pause for callback");
557 }
558 else
559 WARN_LOG_REPORT(SCEKERNEL, "sceKernelReceiveMsgPipeCB: beginning callback with bad wait id?");
560 break;
561
562 default:
563 ERROR_LOG_REPORT(SCEKERNEL, "__KernelMsgPipeBeginCallback: Unexpected wait value");
564 }
565 }
566
__KernelCheckResumeMsgPipeSend(MsgPipe * m,MsgPipeWaitingThread & waitInfo,u32 & error,int result,bool & wokeThreads)567 static bool __KernelCheckResumeMsgPipeSend(MsgPipe *m, MsgPipeWaitingThread &waitInfo, u32 &error, int result, bool &wokeThreads)
568 {
569 if (!waitInfo.IsStillWaiting(m->GetUID()))
570 return true;
571
572 bool needsResched = false;
573 bool needsWait = false;
574
575 result = __KernelSendMsgPipe(m, waitInfo.bufAddr, waitInfo.bufSize, waitInfo.waitMode, waitInfo.transferredBytes.ptr, 0, true, false, needsResched, needsWait);
576
577 if (needsResched)
578 hleReSchedule(true, "msgpipe data sent");
579
580 // Could not wake up. May have sent some stuff.
581 if (needsWait)
582 return false;
583
584 waitInfo.Complete(m->GetUID(), result);
585 wokeThreads = true;
586 return true;
587 }
588
__KernelCheckResumeMsgPipeReceive(MsgPipe * m,MsgPipeWaitingThread & waitInfo,u32 & error,int result,bool & wokeThreads)589 static bool __KernelCheckResumeMsgPipeReceive(MsgPipe *m, MsgPipeWaitingThread &waitInfo, u32 &error, int result, bool &wokeThreads)
590 {
591 if (!waitInfo.IsStillWaiting(m->GetUID()))
592 return true;
593
594 bool needsResched = false;
595 bool needsWait = false;
596
597 result = __KernelReceiveMsgPipe(m, waitInfo.bufAddr, waitInfo.bufSize, waitInfo.waitMode, waitInfo.transferredBytes.ptr, 0, true, false, needsResched, needsWait);
598
599 if (needsResched)
600 hleReSchedule(true, "msgpipe data received");
601
602 if (needsWait)
603 return false;
604
605 waitInfo.Complete(m->GetUID(), result);
606 wokeThreads = true;
607 return true;
608 }
609
__KernelMsgPipeEndCallback(SceUID threadID,SceUID prevCallbackId)610 static void __KernelMsgPipeEndCallback(SceUID threadID, SceUID prevCallbackId) {
611 u32 error;
612 u32 waitValue = __KernelGetWaitValue(threadID, error);
613 SceUID uid = __KernelGetWaitID(threadID, WAITTYPE_MSGPIPE, error);
614 MsgPipe *ko = uid == 0 ? NULL : kernelObjects.Get<MsgPipe>(uid, error);
615
616 if (ko == NULL) {
617 ERROR_LOG_REPORT(SCEKERNEL, "__KernelMsgPipeEndCallback: Invalid object");
618 return;
619 }
620
621 switch (waitValue) {
622 case MSGPIPE_WAIT_VALUE_SEND:
623 {
624 MsgPipeWaitingThread dummy;
625 auto result = HLEKernel::WaitEndCallback<MsgPipe, WAITTYPE_MSGPIPE, MsgPipeWaitingThread>(threadID, prevCallbackId, waitTimer, __KernelCheckResumeMsgPipeSend, dummy, ko->sendWaitingThreads, ko->pausedSendWaits);
626 if (result == HLEKernel::WAIT_CB_RESUMED_WAIT) {
627 DEBUG_LOG(SCEKERNEL, "sceKernelSendMsgPipeCB: Resuming wait from callback");
628 } else if (result == HLEKernel::WAIT_CB_TIMED_OUT) {
629 // It was re-added to the the waiting threads list, but it timed out. Let's remove it.
630 ko->RemoveSendWaitingThread(threadID);
631 }
632 }
633 break;
634
635 case MSGPIPE_WAIT_VALUE_RECV:
636 {
637 MsgPipeWaitingThread dummy;
638 auto result = HLEKernel::WaitEndCallback<MsgPipe, WAITTYPE_MSGPIPE, MsgPipeWaitingThread>(threadID, prevCallbackId, waitTimer, __KernelCheckResumeMsgPipeReceive, dummy, ko->receiveWaitingThreads, ko->pausedReceiveWaits);
639 if (result == HLEKernel::WAIT_CB_RESUMED_WAIT) {
640 DEBUG_LOG(SCEKERNEL, "sceKernelReceiveMsgPipeCB: Resuming wait from callback");
641 } else if (result == HLEKernel::WAIT_CB_TIMED_OUT) {
642 // It was re-added to the the waiting threads list, but it timed out. Let's remove it.
643 ko->RemoveReceiveWaitingThread(threadID);
644 }
645 }
646 break;
647
648 default:
649 ERROR_LOG_REPORT(SCEKERNEL, "__KernelMsgPipeEndCallback: Unexpected wait value");
650 }
651 }
652
__KernelMsgPipeInit()653 void __KernelMsgPipeInit()
654 {
655 waitTimer = CoreTiming::RegisterEvent("MsgPipeTimeout", __KernelMsgPipeTimeout);
656
657 __KernelRegisterWaitTypeFuncs(WAITTYPE_MSGPIPE, __KernelMsgPipeBeginCallback, __KernelMsgPipeEndCallback);
658 }
659
__KernelMsgPipeDoState(PointerWrap & p)660 void __KernelMsgPipeDoState(PointerWrap &p)
661 {
662 auto s = p.Section("sceKernelMsgPipe", 1);
663 if (!s)
664 return;
665
666 Do(p, waitTimer);
667 CoreTiming::RestoreRegisterEvent(waitTimer, "MsgPipeTimeout", __KernelMsgPipeTimeout);
668 }
669
sceKernelCreateMsgPipe(const char * name,int partition,u32 attr,u32 size,u32 optionsPtr)670 int sceKernelCreateMsgPipe(const char *name, int partition, u32 attr, u32 size, u32 optionsPtr)
671 {
672 if (!name)
673 {
674 WARN_LOG_REPORT(SCEKERNEL, "%08x=sceKernelCreateMsgPipe(): invalid name", SCE_KERNEL_ERROR_NO_MEMORY);
675 return SCE_KERNEL_ERROR_NO_MEMORY;
676 }
677 if (partition < 1 || partition > 9 || partition == 7)
678 {
679 WARN_LOG_REPORT(SCEKERNEL, "%08x=sceKernelCreateMsgPipe(): invalid partition %d", SCE_KERNEL_ERROR_ILLEGAL_ARGUMENT, partition);
680 return SCE_KERNEL_ERROR_ILLEGAL_ARGUMENT;
681 }
682 // We only support user right now.
683 if (partition != 2 && partition != 6)
684 {
685 WARN_LOG_REPORT(SCEKERNEL, "%08x=sceKernelCreateMsgPipe(): invalid partition %d", SCE_KERNEL_ERROR_ILLEGAL_PERM, partition);
686 return SCE_KERNEL_ERROR_ILLEGAL_PERM;
687 }
688 if ((attr & ~SCE_KERNEL_MPA_KNOWN) >= 0x100)
689 {
690 WARN_LOG_REPORT(SCEKERNEL, "%08x=sceKernelCreateEventFlag(%s): invalid attr parameter: %08x", SCE_KERNEL_ERROR_ILLEGAL_ATTR, name, attr);
691 return SCE_KERNEL_ERROR_ILLEGAL_ATTR;
692 }
693
694 u32 memBlockPtr = 0;
695 if (size != 0)
696 {
697 // We ignore the upalign to 256.
698 u32 allocSize = size;
699 memBlockPtr = userMemory.Alloc(allocSize, (attr & SCE_KERNEL_MPA_HIGHMEM) != 0, "MsgPipe");
700 if (memBlockPtr == (u32)-1)
701 {
702 ERROR_LOG(SCEKERNEL, "%08x=sceKernelCreateEventFlag(%s): Failed to allocate %i bytes for buffer", SCE_KERNEL_ERROR_NO_MEMORY, name, size);
703 return SCE_KERNEL_ERROR_NO_MEMORY;
704 }
705 }
706
707 MsgPipe *m = new MsgPipe();
708 SceUID id = kernelObjects.Create(m);
709
710 m->nmp.size = sizeof(NativeMsgPipe);
711 strncpy(m->nmp.name, name, KERNELOBJECT_MAX_NAME_LENGTH);
712 m->nmp.name[KERNELOBJECT_MAX_NAME_LENGTH] = 0;
713 m->nmp.attr = attr;
714 m->nmp.bufSize = size;
715 m->nmp.freeSize = size;
716 m->nmp.numSendWaitThreads = 0;
717 m->nmp.numReceiveWaitThreads = 0;
718
719 m->buffer = memBlockPtr;
720
721 DEBUG_LOG(SCEKERNEL, "%d=sceKernelCreateMsgPipe(%s, part=%d, attr=%08x, size=%d, opt=%08x)", id, name, partition, attr, size, optionsPtr);
722
723 if (optionsPtr != 0)
724 {
725 u32 optionsSize = Memory::Read_U32(optionsPtr);
726 if (optionsSize > 4)
727 WARN_LOG_REPORT(SCEKERNEL, "sceKernelCreateMsgPipe(%s) unsupported options parameter, size = %d", name, optionsSize);
728 }
729
730 return id;
731 }
732
sceKernelDeleteMsgPipe(SceUID uid)733 int sceKernelDeleteMsgPipe(SceUID uid)
734 {
735 hleEatCycles(900);
736
737 u32 error;
738 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
739 if (!m)
740 {
741 ERROR_LOG(SCEKERNEL, "sceKernelDeleteMsgPipe(%i) - ERROR %08x", uid, error);
742 return error;
743 }
744
745 hleEatCycles(3100);
746 if (!m->sendWaitingThreads.empty() || !m->receiveWaitingThreads.empty())
747 hleEatCycles(4000);
748
749 for (size_t i = 0; i < m->sendWaitingThreads.size(); i++)
750 m->sendWaitingThreads[i].Cancel(uid, SCE_KERNEL_ERROR_WAIT_DELETE);
751 for (size_t i = 0; i < m->receiveWaitingThreads.size(); i++)
752 m->receiveWaitingThreads[i].Cancel(uid, SCE_KERNEL_ERROR_WAIT_DELETE);
753
754 DEBUG_LOG(SCEKERNEL, "sceKernelDeleteMsgPipe(%i)", uid);
755 return kernelObjects.Destroy<MsgPipe>(uid);
756 }
757
__KernelValidateSendMsgPipe(SceUID uid,u32 sendBufAddr,u32 sendSize,int waitMode,u32 resultAddr,bool tryMode=false)758 static int __KernelValidateSendMsgPipe(SceUID uid, u32 sendBufAddr, u32 sendSize, int waitMode, u32 resultAddr, bool tryMode = false)
759 {
760 if (sendSize & 0x80000000)
761 {
762 ERROR_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): illegal size %d", uid, sendSize);
763 return SCE_KERNEL_ERROR_ILLEGAL_ADDR;
764 }
765
766 if (sendSize != 0 && !Memory::IsValidAddress(sendBufAddr))
767 {
768 ERROR_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): bad buffer address %08x (should crash?)", uid, sendBufAddr);
769 return SCE_KERNEL_ERROR_ILLEGAL_ADDR;
770 }
771
772 if (waitMode != SCE_KERNEL_MPW_ASAP && waitMode != SCE_KERNEL_MPW_FULL)
773 {
774 ERROR_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): invalid wait mode %d", uid, waitMode);
775 return SCE_KERNEL_ERROR_ILLEGAL_MODE;
776 }
777
778 if (!tryMode)
779 {
780 if (!__KernelIsDispatchEnabled())
781 {
782 WARN_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): dispatch disabled", uid);
783 return SCE_KERNEL_ERROR_CAN_NOT_WAIT;
784 }
785 if (__IsInInterrupt())
786 {
787 WARN_LOG(SCEKERNEL, "__KernelSendMsgPipe(%d): in interrupt", uid);
788 return SCE_KERNEL_ERROR_ILLEGAL_CONTEXT;
789 }
790 }
791
792 return 0;
793 }
794
__KernelSendMsgPipe(MsgPipe * m,u32 sendBufAddr,u32 sendSize,int waitMode,u32 resultAddr,u32 timeoutPtr,bool cbEnabled,bool poll)795 static int __KernelSendMsgPipe(MsgPipe *m, u32 sendBufAddr, u32 sendSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool poll)
796 {
797 hleEatCycles(2400);
798
799 bool needsResched = false;
800 bool needsWait = false;
801
802 int result = __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr, cbEnabled, poll, needsResched, needsWait);
803
804 if (needsResched)
805 hleReSchedule(cbEnabled, "msgpipe data sent");
806
807 if (needsWait)
808 {
809 if (__KernelSetMsgPipeTimeout(timeoutPtr))
810 __KernelWaitCurThread(WAITTYPE_MSGPIPE, m->GetUID(), MSGPIPE_WAIT_VALUE_SEND, timeoutPtr, cbEnabled, "msgpipe send waited");
811 else
812 result = SCE_KERNEL_ERROR_WAIT_TIMEOUT;
813 }
814 return result;
815 }
816
sceKernelSendMsgPipe(SceUID uid,u32 sendBufAddr,u32 sendSize,u32 waitMode,u32 resultAddr,u32 timeoutPtr)817 int sceKernelSendMsgPipe(SceUID uid, u32 sendBufAddr, u32 sendSize, u32 waitMode, u32 resultAddr, u32 timeoutPtr)
818 {
819 u32 error = __KernelValidateSendMsgPipe(uid, sendBufAddr, sendSize, waitMode, resultAddr);
820 if (error != 0) {
821 return error;
822 }
823 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
824 if (!m) {
825 ERROR_LOG(SCEKERNEL, "sceKernelSendMsgPipe(%i) - ERROR %08x", uid, error);
826 return error;
827 }
828
829 DEBUG_LOG(SCEKERNEL, "sceKernelSendMsgPipe(id=%i, addr=%08x, size=%i, mode=%i, result=%08x, timeout=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr);
830 return __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr, false, false);
831 }
832
sceKernelSendMsgPipeCB(SceUID uid,u32 sendBufAddr,u32 sendSize,u32 waitMode,u32 resultAddr,u32 timeoutPtr)833 int sceKernelSendMsgPipeCB(SceUID uid, u32 sendBufAddr, u32 sendSize, u32 waitMode, u32 resultAddr, u32 timeoutPtr)
834 {
835 u32 error = __KernelValidateSendMsgPipe(uid, sendBufAddr, sendSize, waitMode, resultAddr);
836 if (error != 0) {
837 return error;
838 }
839 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
840 if (!m) {
841 ERROR_LOG(SCEKERNEL, "sceKernelSendMsgPipeCB(%i) - ERROR %08x", uid, error);
842 return error;
843 }
844
845 DEBUG_LOG(SCEKERNEL, "sceKernelSendMsgPipeCB(id=%i, addr=%08x, size=%i, mode=%i, result=%08x, timeout=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr);
846 // TODO: Verify callback behavior.
847 hleCheckCurrentCallbacks();
848 return __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, timeoutPtr, true, false);
849 }
850
sceKernelTrySendMsgPipe(SceUID uid,u32 sendBufAddr,u32 sendSize,u32 waitMode,u32 resultAddr)851 int sceKernelTrySendMsgPipe(SceUID uid, u32 sendBufAddr, u32 sendSize, u32 waitMode, u32 resultAddr)
852 {
853 u32 error = __KernelValidateSendMsgPipe(uid, sendBufAddr, sendSize, waitMode, resultAddr, true);
854 if (error != 0) {
855 return error;
856 }
857 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
858 if (!m) {
859 ERROR_LOG(SCEKERNEL, "sceKernelTrySendMsgPipe(%i) - ERROR %08x", uid, error);
860 return error;
861 }
862
863 DEBUG_LOG(SCEKERNEL, "sceKernelTrySendMsgPipe(id=%i, addr=%08x, size=%i, mode=%i, result=%08x)", uid, sendBufAddr, sendSize, waitMode, resultAddr);
864 return __KernelSendMsgPipe(m, sendBufAddr, sendSize, waitMode, resultAddr, 0, false, true);
865 }
866
__KernelValidateReceiveMsgPipe(SceUID uid,u32 receiveBufAddr,u32 receiveSize,int waitMode,u32 resultAddr,bool tryMode=false)867 static int __KernelValidateReceiveMsgPipe(SceUID uid, u32 receiveBufAddr, u32 receiveSize, int waitMode, u32 resultAddr, bool tryMode = false)
868 {
869 if (receiveSize & 0x80000000)
870 {
871 ERROR_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): illegal size %d", uid, receiveSize);
872 return SCE_KERNEL_ERROR_ILLEGAL_ADDR;
873 }
874
875 if (receiveSize != 0 && !Memory::IsValidAddress(receiveBufAddr))
876 {
877 ERROR_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): bad buffer address %08x (should crash?)", uid, receiveBufAddr);
878 return SCE_KERNEL_ERROR_ILLEGAL_ADDR;
879 }
880
881 if (waitMode != SCE_KERNEL_MPW_ASAP && waitMode != SCE_KERNEL_MPW_FULL)
882 {
883 ERROR_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): invalid wait mode %d", uid, waitMode);
884 return SCE_KERNEL_ERROR_ILLEGAL_MODE;
885 }
886
887 if (!tryMode)
888 {
889 if (!__KernelIsDispatchEnabled())
890 {
891 WARN_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): dispatch disabled", uid);
892 return SCE_KERNEL_ERROR_CAN_NOT_WAIT;
893 }
894 if (__IsInInterrupt())
895 {
896 WARN_LOG(SCEKERNEL, "__KernelReceiveMsgPipe(%d): in interrupt", uid);
897 return SCE_KERNEL_ERROR_ILLEGAL_CONTEXT;
898 }
899 }
900
901 return 0;
902 }
903
__KernelReceiveMsgPipe(MsgPipe * m,u32 receiveBufAddr,u32 receiveSize,int waitMode,u32 resultAddr,u32 timeoutPtr,bool cbEnabled,bool poll)904 static int __KernelReceiveMsgPipe(MsgPipe *m, u32 receiveBufAddr, u32 receiveSize, int waitMode, u32 resultAddr, u32 timeoutPtr, bool cbEnabled, bool poll)
905 {
906 bool needsResched = false;
907 bool needsWait = false;
908
909 int result = __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr, cbEnabled, poll, needsResched, needsWait);
910
911 if (needsResched)
912 hleReSchedule(cbEnabled, "msgpipe data received");
913
914 if (needsWait)
915 {
916 if (__KernelSetMsgPipeTimeout(timeoutPtr))
917 __KernelWaitCurThread(WAITTYPE_MSGPIPE, m->GetUID(), MSGPIPE_WAIT_VALUE_RECV, timeoutPtr, cbEnabled, "msgpipe receive waited");
918 else
919 return SCE_KERNEL_ERROR_WAIT_TIMEOUT;
920 }
921 return result;
922 }
923
sceKernelReceiveMsgPipe(SceUID uid,u32 receiveBufAddr,u32 receiveSize,u32 waitMode,u32 resultAddr,u32 timeoutPtr)924 int sceKernelReceiveMsgPipe(SceUID uid, u32 receiveBufAddr, u32 receiveSize, u32 waitMode, u32 resultAddr, u32 timeoutPtr)
925 {
926 u32 error = __KernelValidateReceiveMsgPipe(uid, receiveBufAddr, receiveSize, waitMode, resultAddr);
927 if (error != 0) {
928 return error;
929 }
930 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
931 if (!m) {
932 ERROR_LOG(SCEKERNEL, "sceKernelReceiveMsgPipe(%i) - ERROR %08x", uid, error);
933 return error;
934 }
935
936 DEBUG_LOG(SCEKERNEL, "sceKernelReceiveMsgPipe(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr);
937 return __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr, false, false);
938 }
939
sceKernelReceiveMsgPipeCB(SceUID uid,u32 receiveBufAddr,u32 receiveSize,u32 waitMode,u32 resultAddr,u32 timeoutPtr)940 int sceKernelReceiveMsgPipeCB(SceUID uid, u32 receiveBufAddr, u32 receiveSize, u32 waitMode, u32 resultAddr, u32 timeoutPtr)
941 {
942 u32 error = __KernelValidateReceiveMsgPipe(uid, receiveBufAddr, receiveSize, waitMode, resultAddr);
943 if (error != 0) {
944 return error;
945 }
946 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
947 if (!m) {
948 ERROR_LOG(SCEKERNEL, "sceKernelReceiveMsgPipeCB(%i) - ERROR %08x", uid, error);
949 return error;
950 }
951
952 DEBUG_LOG(SCEKERNEL, "sceKernelReceiveMsgPipeCB(%i, %08x, %i, %i, %08x, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr);
953 // TODO: Verify callback behavior.
954 hleCheckCurrentCallbacks();
955 return __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, timeoutPtr, true, false);
956 }
957
sceKernelTryReceiveMsgPipe(SceUID uid,u32 receiveBufAddr,u32 receiveSize,u32 waitMode,u32 resultAddr)958 int sceKernelTryReceiveMsgPipe(SceUID uid, u32 receiveBufAddr, u32 receiveSize, u32 waitMode, u32 resultAddr)
959 {
960 u32 error = __KernelValidateReceiveMsgPipe(uid, receiveBufAddr, receiveSize, waitMode, resultAddr, true);
961 if (error != 0) {
962 return error;
963 }
964 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
965 if (!m) {
966 ERROR_LOG(SCEKERNEL, "sceKernelTryReceiveMsgPipe(%i) - ERROR %08x", uid, error);
967 return error;
968 }
969
970 DEBUG_LOG(SCEKERNEL, "sceKernelTryReceiveMsgPipe(%i, %08x, %i, %i, %08x)", uid, receiveBufAddr, receiveSize, waitMode, resultAddr);
971 return __KernelReceiveMsgPipe(m, receiveBufAddr, receiveSize, waitMode, resultAddr, 0, false, true);
972 }
973
sceKernelCancelMsgPipe(SceUID uid,u32 numSendThreadsAddr,u32 numReceiveThreadsAddr)974 int sceKernelCancelMsgPipe(SceUID uid, u32 numSendThreadsAddr, u32 numReceiveThreadsAddr)
975 {
976 hleEatCycles(900);
977
978 u32 error;
979 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
980 if (!m)
981 {
982 ERROR_LOG(SCEKERNEL, "sceKernelCancelMsgPipe(%i) - ERROR %08x", uid, error);
983 return error;
984 }
985
986 hleEatCycles(1100);
987 if (!m->sendWaitingThreads.empty() || !m->receiveWaitingThreads.empty())
988 hleEatCycles(4000);
989
990 if (Memory::IsValidAddress(numSendThreadsAddr))
991 Memory::Write_U32((u32) m->sendWaitingThreads.size(), numSendThreadsAddr);
992 if (Memory::IsValidAddress(numReceiveThreadsAddr))
993 Memory::Write_U32((u32) m->receiveWaitingThreads.size(), numReceiveThreadsAddr);
994
995 for (size_t i = 0; i < m->sendWaitingThreads.size(); i++)
996 m->sendWaitingThreads[i].Cancel(uid, SCE_KERNEL_ERROR_WAIT_CANCEL);
997 m->sendWaitingThreads.clear();
998 for (size_t i = 0; i < m->receiveWaitingThreads.size(); i++)
999 m->receiveWaitingThreads[i].Cancel(uid, SCE_KERNEL_ERROR_WAIT_CANCEL);
1000 m->receiveWaitingThreads.clear();
1001
1002 // And now the entire buffer is free.
1003 m->nmp.freeSize = m->nmp.bufSize;
1004
1005 DEBUG_LOG(SCEKERNEL, "sceKernelCancelMsgPipe(%i, %i, %i)", uid, numSendThreadsAddr, numReceiveThreadsAddr);
1006 return 0;
1007 }
1008
sceKernelReferMsgPipeStatus(SceUID uid,u32 statusPtr)1009 int sceKernelReferMsgPipeStatus(SceUID uid, u32 statusPtr)
1010 {
1011 u32 error;
1012 MsgPipe *m = kernelObjects.Get<MsgPipe>(uid, error);
1013 if (m)
1014 {
1015 if (!Memory::IsValidAddress(statusPtr))
1016 {
1017 ERROR_LOG(SCEKERNEL, "sceKernelReferMsgPipeStatus(%i, %08x): invalid address", uid, statusPtr);
1018 return -1;
1019 }
1020
1021 DEBUG_LOG(SCEKERNEL, "sceKernelReferMsgPipeStatus(%i, %08x)", uid, statusPtr);
1022
1023 // Clean up any that have timed out.
1024 m->SortReceiveThreads();
1025 m->SortSendThreads();
1026
1027 m->nmp.numSendWaitThreads = (int) m->sendWaitingThreads.size();
1028 m->nmp.numReceiveWaitThreads = (int) m->receiveWaitingThreads.size();
1029 if (Memory::Read_U32(statusPtr) != 0)
1030 Memory::WriteStruct(statusPtr, &m->nmp);
1031 return 0;
1032 }
1033 else
1034 {
1035 DEBUG_LOG(SCEKERNEL, "sceKernelReferMsgPipeStatus(%i, %08x): bad message pipe", uid, statusPtr);
1036 return error;
1037 }
1038 }
1039