1 /* -*- Mode: Java; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3  * This file is part of the LibreOffice project.
4  *
5  * This Source Code Form is subject to the terms of the Mozilla Public
6  * License, v. 2.0. If a copy of the MPL was not distributed with this
7  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8  *
9  * This file incorporates work covered by the following license notice:
10  *
11  *   Licensed to the Apache Software Foundation (ASF) under one or more
12  *   contributor license agreements. See the NOTICE file distributed
13  *   with this work for additional information regarding copyright
14  *   ownership. The ASF licenses this file to you under the Apache
15  *   License, Version 2.0 (the "License"); you may not use this file
16  *   except in compliance with the License. You may obtain a copy of
17  *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
18  */
19 
20 package com.sun.star.lib.uno.environments.remote;
21 
22 import com.sun.star.lib.uno.typedesc.MethodDescription;
23 import com.sun.star.lib.uno.typedesc.TypeDescription;
24 import org.junit.Test;
25 import static org.junit.Assert.*;
26 
27 public class ThreadPool_Test {
testDispose()28     @Test public void testDispose() throws InterruptedException {
29         IThreadPool iThreadPool = ThreadPoolManager.create();
30         TestThread testThread = new TestThread(iThreadPool);
31 
32         ThreadId threadId = null;
33 
34         // start the test thread
35         synchronized(testThread) {
36             testThread.start();
37 
38             testThread.wait();
39 
40             threadId = testThread._threadId;
41 
42             // let the thread attach and enter the threadpool
43             testThread.notifyAll();
44         }
45 
46         String message = "blabla";
47 
48         // terminate the test thread
49         synchronized(testThread) {
50             // put reply job
51             iThreadPool.dispose(new RuntimeException(message));
52 
53             testThread.wait();
54         }
55 
56         testThread.join();
57 
58 /*TODO: below test fails with "expected:<blabla> but was:<null>":
59         assertEquals(message, testThread._message);
60 */
61     }
62 
testThreadAsync()63     @Test public void testThreadAsync() throws InterruptedException {
64         TestWorkAt workAt = new TestWorkAt();
65 
66         ThreadId threadId = ThreadId.createFresh();
67 
68         // queue asyncs
69         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
70             Thread.yield(); // force scheduling
71             putJob(workAt, false, threadId, "increment");
72         }
73 
74         synchronized(workAt) {
75             putJob(workAt, false, threadId, "notifyme");
76 
77             while(!workAt._notified) {
78                 workAt.wait();
79             }
80         }
81 
82         assertEquals(TestWorkAt.MESSAGES, workAt._counter);
83     }
84 
testDynamicThreadSync()85     @Test public void testDynamicThreadSync() throws InterruptedException {
86         TestWorkAt workAt = new TestWorkAt();
87 
88         ThreadId threadId = ThreadId.createFresh();
89 
90         // queue asyncs
91         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
92             Thread.yield(); // force scheduling
93             putJob(workAt, true, threadId, "increment");
94         }
95 
96         synchronized(workAt) {
97             putJob(workAt, true, threadId, "notifyme");
98 
99             while(!workAt._notified) {
100                 workAt.wait();
101             }
102         }
103 
104         assertEquals(TestWorkAt.MESSAGES, workAt._counter);
105     }
106 
testStaticThreadSync()107     @Test public void testStaticThreadSync() throws InterruptedException {
108         TestWorkAt workAt = new TestWorkAt();
109 
110         TestThread testThread = new TestThread();
111 
112         ThreadId threadId = null;
113 
114         // start the test thread
115         synchronized(testThread) {
116             testThread.start();
117 
118             testThread.wait();
119 
120             threadId = testThread._threadId;
121 
122             // let the thread attach and enter the threadpool
123             testThread.notifyAll();
124         }
125 
126         // queue syncs
127         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
128             Thread.yield(); // force scheduling
129             putJob(workAt, true, threadId, "increment");
130         }
131 
132         // terminate the test thread
133         synchronized(testThread) {
134             // put reply job
135             putJob(workAt, true, threadId, null);
136 
137             testThread.wait();
138         }
139 
140         testThread.join();
141 
142         assertEquals(TestWorkAt.MESSAGES, workAt._counter);
143     }
144 
testDynamicThreadAsyncSyncOrder()145     @Test public void testDynamicThreadAsyncSyncOrder()
146         throws InterruptedException
147     {
148         TestWorkAt workAt = new TestWorkAt();
149 
150         ThreadId threadId = ThreadId.createFresh();
151 
152         // queue asyncs
153         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
154             Thread.yield(); // force scheduling
155             putJob(workAt, false, threadId, "asyncCall");
156         }
157 
158         // queue syncs
159         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
160             Thread.yield(); // force scheduling
161             putJob(workAt, true, threadId, "syncCall");
162         }
163 
164         synchronized(workAt) {
165             putJob(workAt, true, threadId, "notifyme");
166 
167             while(!workAt._notified) {
168                 workAt.wait();
169             }
170         }
171 
172         assertTrue(workAt.passedAsyncTest());
173     }
174 
testStaticThreadAsyncSyncOrder()175     @Test public void testStaticThreadAsyncSyncOrder()
176         throws InterruptedException
177     {
178         TestWorkAt workAt = new TestWorkAt();
179 
180         TestThread testThread = new TestThread();
181 
182         // start the test thread
183         synchronized(testThread) {
184             testThread.start();
185 
186             testThread.wait();
187         }
188 
189         ThreadId threadId = testThread._threadId;
190 
191         // queue asyncs
192         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
193             Thread.yield(); // force scheduling
194             putJob(workAt, false, threadId, "asyncCall");
195         }
196 
197         // let the thread attach and enter the threadpool
198         synchronized(testThread) {
199             testThread.notifyAll();
200         }
201 
202         // queue syncs
203         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
204             Thread.yield(); // force scheduling
205             putJob(workAt, true, threadId, "syncCall");
206         }
207 
208         // terminate the test thread
209         synchronized(testThread) {
210             // put reply job
211             putJob(workAt, true, threadId, null);
212 
213             testThread.wait();
214         }
215 
216         testThread.join();
217 
218         assertTrue(workAt.passedAsyncTest());
219     }
220 
testStress()221     @Test public void testStress() throws InterruptedException {
222         TestWorkAt workAt = new TestWorkAt();
223         for (int i = 0; i < TestWorkAt.MESSAGES; ++i) {
224             Thread.yield(); // force scheduling
225             ThreadId threadID = ThreadId.createFresh();
226             putJob(workAt, true, threadID, "increment");
227             putJob(workAt, false, threadID, "increment");
228         }
229         synchronized (workAt) {
230             while (workAt._counter < 2 * TestWorkAt.MESSAGES) {
231                 workAt.wait();
232             }
233         }
234 
235         abstract class Stress extends Thread {
236             private Stress(int count) {
237                 this.count = count;
238             }
239 
240             @Override
241             public void run() {
242                 try {
243                     for (int i = 0; i < count; ++i) {
244                         runTest();
245                     }
246                 } catch (Throwable e) {
247                     e.printStackTrace(System.err);
248                 }
249             }
250 
251             protected abstract void runTest() throws InterruptedException;
252 
253             private final int count;
254         }
255 
256         Stress stress1 = new Stress(50) {
257                 @Override
258                 protected void runTest() throws InterruptedException {
259                     testThreadAsync();
260                 }
261             };
262         stress1.start();
263 
264         Stress stress2 = new Stress(50) {
265                 @Override
266                 protected void runTest() throws InterruptedException {
267                     testDynamicThreadSync();
268                 }
269             };
270         stress2.start();
271 
272         Stress stress3 = new Stress(50) {
273                 @Override
274                 protected void runTest() throws InterruptedException {
275                     testStaticThreadSync();
276                 }
277             };
278         stress3.start();
279 
280         Stress stress4 = new Stress(50) {
281                 @Override
282                 protected void runTest() throws InterruptedException {
283                     testDynamicThreadAsyncSyncOrder();
284                 }
285             };
286         stress4.start();
287 
288         Stress stress5 = new Stress(50) {
289                 @Override
290                 protected void runTest() throws InterruptedException {
291                     testStaticThreadAsyncSyncOrder();
292                 }
293             };
294         stress5.start();
295 
296         Stress stress6 = new Stress(500) {
297                 @Override
298                 protected void runTest() throws InterruptedException {
299                     testDispose();
300                 }
301             };
302         stress6.start();
303 
304         stress1.join();
305         stress2.join();
306         stress3.join();
307         stress4.join();
308         stress5.join();
309         stress6.join();
310     }
311 
testAsyncSync()312     @Test public void testAsyncSync() throws InterruptedException {
313         TestWorkAt workAt = new TestWorkAt();
314         ThreadId threadId = ThreadId.createFresh();
315         MyWorkAt myWorkAt = new MyWorkAt( workAt );
316 
317         // queue asyncs
318         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
319             if( i == 2 )
320             {
321                 putJob( myWorkAt, false , threadId, "asyncCall" );
322             }
323             putJob(workAt, false, threadId, "asyncCall");
324         }
325 
326         synchronized(workAt) {
327             putJob(workAt, false, threadId, "notifyme");
328 
329             while(!workAt._notified) {
330                 workAt.wait();
331             }
332         }
333 
334         assertEquals(TestWorkAt.MESSAGES, workAt._async_counter);
335         assertTrue(myWorkAt._success);
336     }
337 
putJob(TestIWorkAt iWorkAt, boolean synchron, ThreadId threadId, String operation)338     private static void putJob(TestIWorkAt iWorkAt, boolean synchron,
339                                ThreadId threadId, String operation) {
340         __iThreadPool.putJob(
341             new Job(iWorkAt, __iReceiver,
342                     new Message(
343                         threadId, operation != null, "oid", __workAt_td,
344                         (operation == null
345                          ? null
346                          : ((MethodDescription)
347                             __workAt_td.getMethodDescription(operation))),
348                         synchron, null, false, null, null)));
349     }
350 
351     private static final class TestThread extends Thread {
352         ThreadId _threadId;
353         IThreadPool _iThreadPool;
354 
TestThread()355         TestThread() {
356             this(__iThreadPool);
357         }
358 
TestThread(IThreadPool iThreadPool)359         TestThread(IThreadPool iThreadPool) {
360             _iThreadPool = iThreadPool;
361         }
362 
363         @Override
run()364         public void run() {
365             _threadId = _iThreadPool.getThreadId();
366 
367 
368             try {
369                 synchronized(this) {
370                     // notify that we are running
371                     notify();
372 
373                     _iThreadPool.attach();
374 
375                     // wait until we should continue
376                     wait();
377                 }
378 
379                 _iThreadPool.enter();
380             }
381             catch(Throwable throwable) {
382             }
383 
384             _iThreadPool.detach();
385 
386             synchronized(this) {
387                 // notify the listeners that we are dying
388                 notifyAll();
389             }
390         }
391     }
392 
393     private static final class MyWorkAt implements TestIWorkAt {
MyWorkAt( TestWorkAt async_WorkAt )394         public MyWorkAt( TestWorkAt async_WorkAt ) {
395             _async_WorkAt = async_WorkAt;
396         }
397 
syncCall()398         public void syncCall() throws Throwable
399         {
400             Message iMessage = new Message(
401                 __iThreadPool.getThreadId(), false, "oid", __workAt_td, null,
402                 false, null, false, null, null);
403 
404             // marshal reply
405             ThreadPool_Test.__iThreadPool.putJob(
406                 new Job(this, ThreadPool_Test. __iReceiver, iMessage));
407         }
408 
asyncCall()409         public  void asyncCall() throws Throwable {
410             for (int i = 0 ; i < 5 ; ++i) {
411                 ThreadPool_Test.__iThreadPool.attach();
412                 ThreadPool_Test.putJob(this, true, __iThreadPool.getThreadId(),
413                                        "syncCall");
414                 // wait for reply
415                 ThreadPool_Test.__iThreadPool.enter();
416                 ThreadPool_Test.__iThreadPool.detach();
417             }
418             // async must have waited for this call
419             _success = _async_WorkAt._async_counter == 2;
420         }
421 
increment()422         public void increment() throws Throwable {}
423 
notifyme()424         public void notifyme() {}
425 
426         public boolean _success = false;
427 
428         private final TestWorkAt _async_WorkAt;
429     }
430 
431     private static final IThreadPool __iThreadPool = ThreadPoolManager.create();
432     private static final IReceiver __iReceiver = new TestReceiver();
433     private static final TypeDescription __workAt_td
434     = TypeDescription.getTypeDescription(TestIWorkAt.class);
435 }
436 
437 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
438