1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module client_pool_test;
20 
21 import core.sync.semaphore : Semaphore;
22 import core.time : Duration, dur;
23 import core.thread : Thread;
24 import std.algorithm;
25 import std.array;
26 import std.conv;
27 import std.exception;
28 import std.getopt;
29 import std.range;
30 import std.stdio;
31 import std.typecons;
32 import std.variant : Variant;
33 import thrift.base;
34 import thrift.async.libevent;
35 import thrift.async.socket;
36 import thrift.codegen.base;
37 import thrift.codegen.async_client;
38 import thrift.codegen.async_client_pool;
39 import thrift.codegen.client;
40 import thrift.codegen.client_pool;
41 import thrift.codegen.processor;
42 import thrift.protocol.base;
43 import thrift.protocol.binary;
44 import thrift.server.base;
45 import thrift.server.simple;
46 import thrift.server.transport.socket;
47 import thrift.transport.base;
48 import thrift.transport.buffered;
49 import thrift.transport.socket;
50 import thrift.util.cancellation;
51 import thrift.util.future;
52 
53 // We use this as our RPC-layer exception here to make sure socket/… problems
54 // (that would usually considered to be RPC layer faults) cause the tests to
55 // fail, even though we are testing the RPC exception handling.
56 class TestServiceException : TException {
57   int port;
58 }
59 
60 interface TestService {
61   int getPort();
62   alias .TestServiceException TestServiceException;
63   enum methodMeta = [TMethodMeta("getPort", [],
64     [TExceptionMeta("a", 1, "TestServiceException")])];
65 }
66 
67 // Use some derived service, just to check that the pools handle inheritance
68 // correctly.
69 interface ExTestService : TestService {
70   int[] getPortInArray();
71   enum methodMeta = [TMethodMeta("getPortInArray", [],
72     [TExceptionMeta("a", 1, "TestServiceException")])];
73 }
74 
75 class ExTestHandler : ExTestService {
this(ushort port,Duration delay,bool failing,bool trace)76   this(ushort port, Duration delay, bool failing, bool trace) {
77     this.port = port;
78     this.delay = delay;
79     this.failing = failing;
80     this.trace = trace;
81   }
82 
getPort()83   override int getPort() {
84     if (trace) {
85       stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
86         delay, failing);
87     }
88     sleep();
89     failIfEnabled();
90     return port;
91   }
92 
getPortInArray()93   override int[] getPortInArray() {
94     return [getPort()];
95   }
96 
97   ushort port;
98   Duration delay;
99   bool failing;
100   bool trace;
101 
102 private:
sleep()103   void sleep() {
104     if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
105   }
106 
failIfEnabled()107   void failIfEnabled() {
108     if (!failing) return;
109 
110     auto e = new TestServiceException;
111     e.port = port;
112     throw e;
113   }
114 }
115 
116 class ServerPreServeHandler : TServerEventHandler {
this(Semaphore sem)117   this(Semaphore sem) {
118     sem_ = sem;
119   }
120 
preServe()121   override void preServe() {
122     sem_.notify();
123   }
124 
createContext(TProtocol input,TProtocol output)125   Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
deleteContext(Variant serverContext,TProtocol input,TProtocol output)126   void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
preProcess(Variant serverContext,TTransport transport)127   void preProcess(Variant serverContext, TTransport transport) {}
128 
129 private:
130   Semaphore sem_;
131 }
132 
133 class ServerThread : Thread {
this(ExTestHandler handler,ServerPreServeHandler serverHandler,TCancellation cancellation)134   this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
135     super(&run);
136     handler_ = handler;
137     cancellation_ = cancellation;
138     serverHandler_ = serverHandler;
139   }
140 private:
run()141   void run() {
142     try {
143       auto protocolFactory = new TBinaryProtocolFactory!();
144       auto processor = new TServiceProcessor!ExTestService(handler_);
145       auto serverTransport = new TServerSocket(handler_.port);
146       serverTransport.recvTimeout = dur!"seconds"(3);
147       auto transportFactory = new TBufferedTransportFactory;
148 
149       auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
150       server.eventHandler = serverHandler_;
151       server.serve(cancellation_);
152     } catch (Exception e) {
153       writefln("Server thread on port %s failed: %s", handler_.port, e);
154     }
155   }
156 
157   ExTestHandler handler_;
158   ServerPreServeHandler serverHandler_;
159   TCancellation cancellation_;
160 }
161 
main(string[]args)162 void main(string[] args) {
163   bool trace;
164   ushort port = 9090;
165   getopt(args, "port", &port, "trace", &trace);
166 
167   auto serverCancellation = new TCancellationOrigin;
168   scope (exit) serverCancellation.trigger();
169 
170   immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
171 
172   // semaphore that will be incremented whenever each server thread has bound and started listening
173   Semaphore sem = new Semaphore(0);
174 
175 version (none) {
176   // Cannot use this due to multiple DMD @@BUG@@s:
177   // 1. »function D main is a nested function and cannot be accessed from array«
178   //    when calling array() on the result of the outer map() – would have to
179   //    manually do the eager evaluation/array conversion.
180   // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
181   //    can be worked around by calling array() on the map result first.
182   // 3. Even when using the workarounds for the last two points, the DMD-built
183   //    executable crashes when building without (sic!) inlining enabled,
184   //    the backtrace points into the first delegate literal.
185   auto handlers = array(map!((args){
186     return new ExTestHandler(args._0, args._1, args._2, trace);
187   })(zip(
188     ports,
189     map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
190     [false, false, false, true, true, true]
191   )));
192 } else {
193   auto handlers = [
194     new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
195     new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
196     new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
197     new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
198     new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
199     new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
200   ];
201 }
202 
203   // Fire up the server threads.
204   foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
205 
206   // wait until all the handlers signal that they're ready to serve
207   foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
208 
209   syncClientPoolTest(ports, handlers);
210   asyncClientPoolTest(ports, handlers);
211   asyncFastestClientPoolTest(ports, handlers);
212   asyncAggregatorTest(ports, handlers);
213 }
214 
215 
syncClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)216 void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
217   auto clients = array(map!((a){
218     return cast(TClientBase!ExTestService)tClient!ExTestService(
219       tBinaryProtocol(new TSocket("127.0.0.1", a))
220     );
221   })(ports));
222 
223   scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
224 
225   // Try the case where the first client succeeds.
226   {
227     enforce(makePool(clients).getPort() == ports[0]);
228   }
229 
230   // Try the case where all clients fail.
231   {
232     auto pool = makePool(clients[3 .. $]);
233     auto e = cast(TCompoundOperationException)collectException(pool.getPort());
234     enforce(e);
235     enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
236       ports[3 .. $]));
237   }
238 
239   // Try the case where the first clients fail, but a later one succeeds.
240   {
241     auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
242     enforce(pool.getPortInArray() == [ports[0]]);
243   }
244 
245   // Make sure a client is properly deactivated when it has failed too often.
246   {
247     auto pool = makePool(clients);
248     pool.faultDisableCount = 1;
249     pool.faultDisableDuration = dur!"msecs"(50);
250 
251     handlers[0].failing = true;
252     enforce(pool.getPort() == ports[1]);
253 
254     handlers[0].failing = false;
255     enforce(pool.getPort() == ports[1]);
256 
257     Thread.sleep(dur!"msecs"(50));
258     enforce(pool.getPort() == ports[0]);
259   }
260 }
261 
262 auto makePool(TClientBase!ExTestService[] clients) {
263   auto p = tClientPool(clients);
264   p.permuteClients = false;
265   p.rpcFaultFilter = (Exception e) {
266     return (cast(TestServiceException)e !is null);
267   };
268   return p;
269 }
270 
271 
asyncClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)272 void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
273   auto manager = new TLibeventAsyncManager;
274   scope (exit) manager.stop(dur!"hnsecs"(0));
275 
276   auto clients = makeAsyncClients(manager, ports);
277   scope(exit) foreach (c; clients) c.transport.close();
278 
279   // Try the case where the first client succeeds.
280   {
281     enforce(makeAsyncPool(clients).getPort() == ports[0]);
282   }
283 
284   // Try the case where all clients fail.
285   {
286     auto pool = makeAsyncPool(clients[3 .. $]);
287     auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
288     enforce(e);
289     enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
290       ports[3 .. $]));
291   }
292 
293   // Try the case where the first clients fail, but a later one succeeds.
294   {
295     auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
296     enforce(pool.getPortInArray() == [ports[0]]);
297   }
298 
299   // Make sure a client is properly deactivated when it has failed too often.
300   {
301     auto pool = makeAsyncPool(clients);
302     pool.faultDisableCount = 1;
303     pool.faultDisableDuration = dur!"msecs"(50);
304 
305     handlers[0].failing = true;
306     enforce(pool.getPort() == ports[1]);
307 
308     handlers[0].failing = false;
309     enforce(pool.getPort() == ports[1]);
310 
311     Thread.sleep(dur!"msecs"(50));
312     enforce(pool.getPort() == ports[0]);
313   }
314 }
315 
316 auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
317   auto p = tAsyncClientPool(clients);
318   p.permuteClients = false;
319   p.rpcFaultFilter = (Exception e) {
320     return (cast(TestServiceException)e !is null);
321   };
322   return p;
323 }
324 
makeAsyncClients(TLibeventAsyncManager manager,in ushort[]ports)325 auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
326   // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
327   // to »function D main is a nested function and cannot be accessed from array«.
328   // Thus, we manually do the array conversion.
329   auto lazyClients = map!((a){
330     return new TAsyncClient!ExTestService(
331       new TAsyncSocket(manager, "127.0.0.1", a),
332       new TBufferedTransportFactory,
333       new TBinaryProtocolFactory!(TBufferedTransport)
334     );
335   })(ports);
336   TAsyncClientBase!ExTestService[] clients;
337   foreach (c; lazyClients) clients ~= c;
338   return clients;
339 }
340 
341 
asyncFastestClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)342 void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
343   auto manager = new TLibeventAsyncManager;
344   scope (exit) manager.stop(dur!"hnsecs"(0));
345 
346   auto clients = makeAsyncClients(manager, ports);
347   scope(exit) foreach (c; clients) c.transport.close();
348 
349   // Make sure the fastest client wins, even if they are called in some other
350   // order.
351   {
352     auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
353     enforce(result == ports[0]);
354   }
355 
356   // Try the case where all clients fail.
357   {
358     auto pool = makeAsyncFastestPool(clients[3 .. $]);
359     auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
360     enforce(e);
361     enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
362       ports[3 .. $]));
363   }
364 
365   // Try the case where the first clients fail, but a later one succeeds.
366   {
367     auto pool = makeAsyncFastestPool(clients[1 .. $]);
368     enforce(pool.getPortInArray() == [ports[1]]);
369   }
370 }
371 
372 auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
373   auto p = tAsyncFastestClientPool(clients);
374   p.rpcFaultFilter = (Exception e) {
375     return (cast(TestServiceException)e !is null);
376   };
377   return p;
378 }
379 
380 
asyncAggregatorTest(const (ushort)[]ports,ExTestHandler[]handlers)381 void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
382   auto manager = new TLibeventAsyncManager;
383   scope (exit) manager.stop(dur!"hnsecs"(0));
384 
385   auto clients = makeAsyncClients(manager, ports);
386   scope(exit) foreach (c; clients) c.transport.close();
387 
388   auto aggregator = tAsyncAggregator(
389     cast(TAsyncClientBase!ExTestService[])clients);
390 
391   // Test aggregator range interface.
392   {
393     auto range = aggregator.getPort().range(dur!"msecs"(50));
394     enforce(equal(range, ports[0 .. 2][]));
395     enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
396       ports[3 .. $ - 1]));
397     enforce(range.completedCount == 4);
398   }
399 
400   // Test default accumulator for scalars.
401   {
402     auto fullResult = aggregator.getPort().accumulate();
403     enforce(fullResult.waitGet() == ports[0 .. 3]);
404 
405     auto partialResult = aggregator.getPort().accumulate();
406     Thread.sleep(dur!"msecs"(20));
407     enforce(partialResult.finishGet() == ports[0 .. 2]);
408 
409   }
410 
411   // Test default accumulator for arrays.
412   {
413     auto fullResult = aggregator.getPortInArray().accumulate();
414     enforce(fullResult.waitGet() == ports[0 .. 3]);
415 
416     auto partialResult = aggregator.getPortInArray().accumulate();
417     Thread.sleep(dur!"msecs"(20));
418     enforce(partialResult.finishGet() == ports[0 .. 2]);
419   }
420 
421   // Test custom accumulator.
422   {
423     auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
424       return reduce!"a + b"(results);
425     })();
426     enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
427 
428     auto partialResult = aggregator.getPort().accumulate!(
429       function(int[] results, Exception[] exceptions) {
430         // Return a tuple of the parameters so we can check them outside of
431         // this function (to verify the values, we need access to »ports«, but
432         // due to DMD @@BUG5710@@, we can't use a delegate literal).f
433         return tuple(results, exceptions);
434       }
435     )();
436     Thread.sleep(dur!"msecs"(20));
437     auto resultTuple = partialResult.finishGet();
438     enforce(resultTuple[0] == ports[0 .. 2]);
439     enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),
440       ports[3 .. $ - 1]));
441   }
442 }
443