1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Exercises various transports, combined with the buffered/framed wrappers.
22  *
23  * Originally ported from the C++ version, with Windows support code added.
24  */
25 module transport_test;
26 
27 import core.atomic;
28 import core.time : Duration;
29 import core.thread : Thread;
30 import std.conv : to;
31 import std.datetime;
32 import std.exception : enforce;
33 static import std.file;
34 import std.getopt;
35 import std.random : rndGen, uniform, unpredictableSeed;
36 import std.socket;
37 import std.stdio;
38 import std.string;
39 import std.typetuple;
40 import thrift.transport.base;
41 import thrift.transport.buffered;
42 import thrift.transport.framed;
43 import thrift.transport.file;
44 import thrift.transport.http;
45 import thrift.transport.memory;
46 import thrift.transport.socket;
47 import thrift.transport.zlib;
48 
49 /*
50  * Size generation helpers – used to be able to run the same testing code
51  * with both constant and random total/chunk sizes.
52  */
53 
54 interface SizeGenerator {
55   size_t nextSize();
56   string toString();
57 }
58 
59 class ConstantSizeGenerator : SizeGenerator {
this(size_t value)60   this(size_t value) {
61     value_ = value;
62   }
63 
nextSize()64   override size_t nextSize() {
65     return value_;
66   }
67 
toString()68   override string toString() const {
69     return to!string(value_);
70   }
71 
72 private:
73   size_t value_;
74 }
75 
76 class RandomSizeGenerator : SizeGenerator {
this(size_t min,size_t max)77   this(size_t min, size_t max) {
78     min_ = min;
79     max_ = max;
80   }
81 
nextSize()82   override size_t nextSize() {
83     return uniform!"[]"(min_, max_);
84   }
85 
toString()86   override string toString() const {
87     return format("rand(%s, %s)", min_, max_);
88   }
89 
min()90   size_t min() const @property {
91     return min_;
92   }
93 
max()94   size_t max() const @property {
95     return max_;
96   }
97 
98 private:
99   size_t min_;
100   size_t max_;
101 }
102 
103 
104 /*
105  * Classes to set up coupled transports
106  */
107 
108 /**
109  * Helper class to represent a coupled pair of transports.
110  *
111  * Data written to the output transport can be read from the input transport.
112  *
113  * This is used as the base class for the various coupled transport
114  * implementations. It shouldn't be used directly.
115  */
116 class CoupledTransports(Transport) if (isTTransport!Transport) {
117   Transport input;
118   Transport output;
119 }
120 
isCoupledTransports(T)121 template isCoupledTransports(T) {
122   static if (is(T _ : CoupledTransports!U, U)) {
123     enum isCoupledTransports = true;
124   } else {
125     enum isCoupledTransports = false;
126   }
127 }
128 
129 /**
130  * Helper template class for creating coupled transports that wrap
131  * another transport.
132  */
133 class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if (
134   isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports
135 ) : CoupledTransports!WrapperTransport {
this()136   this() {
137     inner_ = new InnerCoupledTransports();
138     if (inner_.input) {
139       input = new WrapperTransport(inner_.input);
140     }
141     if (inner_.output) {
142       output = new WrapperTransport(inner_.output);
143     }
144   }
145 
~this()146   ~this() {
147     destroy(inner_);
148   }
149 
150 private:
151   InnerCoupledTransports inner_;
152 }
153 
154 import thrift.internal.codegen : PApply;
155 alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports;
156 alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports;
157 alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports;
158 
159 /**
160  * Coupled TMemoryBuffers.
161  */
162 class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer {
this()163   this() {
164     buf = new TMemoryBuffer;
165     input = buf;
166     output = buf;
167   }
168 
169   TMemoryBuffer buf;
170 }
171 
172 /**
173  * Coupled TSockets.
174  */
175 class CoupledSocketTransports : CoupledTransports!TSocket {
this()176   this() {
177     auto sockets = socketPair();
178     input = new TSocket(sockets[0]);
179     output = new TSocket(sockets[1]);
180   }
181 
~this()182   ~this() {
183     input.close();
184     output.close();
185   }
186 }
187 
188 /**
189  * Coupled TFileTransports
190  */
191 class CoupledFileTransports : CoupledTransports!TTransport {
this()192   this() {
193     // We actually need the file name of the temp file here, so we can't just
194     // use the usual tempfile facilities.
195     do {
196       fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front);
197       rndGen().popFront();
198     } while (std.file.exists(fileName_));
199 
200     writefln("Using temp file: %s", fileName_);
201 
202     auto writer = new TFileWriterTransport(fileName_);
203     writer.open();
204     output = writer;
205 
206     // Wait until the file has been created.
207     writer.flush();
208 
209     auto reader = new TFileReaderTransport(fileName_);
210     reader.open();
211     reader.readTimeout(dur!"msecs"(-1));
212     input = reader;
213   }
214 
~this()215   ~this() {
216     input.close();
217     output.close();
218     std.file.remove(fileName_);
219   }
220 
221   static string tmpDir;
222 
223 private:
224   string fileName_;
225 }
226 
227 
228 /*
229  * Test functions
230  */
231 
232 /**
233  * Test interleaved write and read calls.
234  *
235  * Generates a buffer totalSize bytes long, then writes it to the transport,
236  * and verifies the written data can be read back correctly.
237  *
238  * Mode of operation:
239  * - call wChunkGenerator to figure out how large of a chunk to write
240  *   - call wSizeGenerator to get the size for individual write() calls,
241  *     and do this repeatedly until the entire chunk is written.
242  * - call rChunkGenerator to figure out how large of a chunk to read
243  *   - call rSizeGenerator to get the size for individual read() calls,
244  *     and do this repeatedly until the entire chunk is read.
245  * - repeat until the full buffer is written and read back,
246  *   then compare the data read back against the original buffer
247  *
248  *
249  * - If any of the size generators return 0, this means to use the maximum
250  *   possible size.
251  *
252  * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
253  *   there are never more than maxOutstanding bytes waiting to be read back.
254  */
255 void testReadWrite(CoupledTransports)(
256   size_t totalSize,
257   SizeGenerator wSizeGenerator,
258   SizeGenerator rSizeGenerator,
259   SizeGenerator wChunkGenerator,
260   SizeGenerator rChunkGenerator,
261   size_t maxOutstanding
262 ) if (
263   isCoupledTransports!CoupledTransports
264 ) {
265   scope transports = new CoupledTransports;
266   assert(transports.input);
267   assert(transports.output);
268 
269   auto wbuf = new ubyte[totalSize];
270   auto rbuf = new ubyte[totalSize];
271 
272   // Store some data in wbuf.
foreach(i,ref b;wbuf)273   foreach (i, ref b; wbuf) {
274     b = i & 0xff;
275   }
276 
277   size_t totalWritten;
278   size_t totalRead;
279   while (totalRead < totalSize) {
280     // Determine how large a chunk of data to write.
281     auto wChunkSize = wChunkGenerator.nextSize();
282     if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) {
283       wChunkSize = totalSize - totalWritten;
284     }
285 
286     // Make sure (totalWritten - totalRead) + wChunkSize is less than
287     // maxOutstanding.
288     if (maxOutstanding > 0 &&
289         wChunkSize > maxOutstanding - (totalWritten - totalRead)) {
290       wChunkSize = maxOutstanding - (totalWritten - totalRead);
291     }
292 
293     // Write the chunk.
294     size_t chunkWritten = 0;
295     while (chunkWritten < wChunkSize) {
296       auto writeSize = wSizeGenerator.nextSize();
297       if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) {
298         writeSize = wChunkSize - chunkWritten;
299       }
300 
301       transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]);
302       chunkWritten += writeSize;
303       totalWritten += writeSize;
304     }
305 
306     // Flush the data, so it will be available in the read transport
307     // Don't flush if wChunkSize is 0. (This should only happen if
308     // totalWritten == totalSize already, and we're only reading now.)
309     if (wChunkSize > 0) {
310       transports.output.flush();
311     }
312 
313     // Determine how large a chunk of data to read back.
314     auto rChunkSize = rChunkGenerator.nextSize();
315     if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) {
316       rChunkSize = totalWritten - totalRead;
317     }
318 
319     // Read the chunk.
320     size_t chunkRead;
321     while (chunkRead < rChunkSize) {
322       auto readSize = rSizeGenerator.nextSize();
323       if (readSize == 0 || readSize > rChunkSize - chunkRead) {
324         readSize = rChunkSize - chunkRead;
325       }
326 
327       size_t bytesRead;
328       try {
329         bytesRead = transports.input.read(
330           rbuf[totalRead .. totalRead + readSize]);
catch(TTransportException e)331       } catch (TTransportException e) {
332         throw new Exception(format(`read(pos = %s, size = %s) threw ` ~
333           `exception "%s"; written so far: %s/%s bytes`, totalRead, readSize,
334           e.msg, totalWritten, totalSize));
335       }
336 
337       enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~
338         `written so far: %s/%s bytes`, totalRead, readSize, bytesRead,
339         totalWritten, totalSize));
340 
341       chunkRead += bytesRead;
342       totalRead += bytesRead;
343     }
344   }
345 
346   // make sure the data read back is identical to the data written
347   if (rbuf != wbuf) {
348     stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]);
349     stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length);
350   }
351   enforce(rbuf == wbuf);
352 }
353 
354 void testReadPartAvailable(CoupledTransports)() if (
355   isCoupledTransports!CoupledTransports
356 ) {
357   scope transports = new CoupledTransports;
358   assert(transports.input);
359   assert(transports.output);
360 
361   ubyte[10] writeBuf = 'a';
362   ubyte[10] readBuf;
363 
364   // Attemping to read 10 bytes when only 9 are available should return 9
365   // immediately.
366   transports.output.write(writeBuf[0 .. 9]);
367   transports.output.flush();
368 
369   auto t = Trigger(dur!"seconds"(3), transports.output, 1);
370   auto bytesRead = transports.input.read(readBuf);
371   enforce(t.fired == 0);
372   enforce(bytesRead == 9);
373 }
374 
375 void testReadPartialMidframe(CoupledTransports)() if (
376   isCoupledTransports!CoupledTransports
377 ) {
378   scope transports = new CoupledTransports;
379   assert(transports.input);
380   assert(transports.output);
381 
382   ubyte[13] writeBuf = 'a';
383   ubyte[14] readBuf;
384 
385   // Attempt to read 10 bytes, when only 9 are available, but after we have
386   // already read part of the data that is available.  This exercises a
387   // different code path for several of the transports.
388   //
389   // For transports that add their own framing (e.g., TFramedTransport and
390   // TFileTransport), the two flush calls break up the data in to a 10 byte
391   // frame and a 3 byte frame.  The first read then puts us partway through the
392   // first frame, and then we attempt to read past the end of that frame, and
393   // through the next frame, too.
394   //
395   // For buffered transports that perform read-ahead (e.g.,
396   // TBufferedTransport), the read-ahead will most likely see all 13 bytes
397   // written on the first read.  The next read will then attempt to read past
398   // the end of the read-ahead buffer.
399   //
400   // Flush 10 bytes, then 3 bytes.  This creates 2 separate frames for
401   // transports that track framing internally.
402   transports.output.write(writeBuf[0 .. 10]);
403   transports.output.flush();
404   transports.output.write(writeBuf[10 .. 13]);
405   transports.output.flush();
406 
407   // Now read 4 bytes, so that we are partway through the written data.
408   auto bytesRead = transports.input.read(readBuf[0 .. 4]);
409   enforce(bytesRead == 4);
410 
411   // Now attempt to read 10 bytes.  Only 9 more are available.
412   //
413   // We should be able to get all 9 bytes, but it might take multiple read
414   // calls, since it is valid for read() to return fewer bytes than requested.
415   // (Most transports do immediately return 9 bytes, but the framing transports
416   // tend to only return to the end of the current frame, which is 6 bytes in
417   // this case.)
418   size_t totalRead = 0;
419   while (totalRead < 9) {
420     auto t = Trigger(dur!"seconds"(3), transports.output, 1);
421     bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]);
422     enforce(t.fired == 0);
423     enforce(bytesRead > 0);
424     totalRead += bytesRead;
425     enforce(totalRead <= 9);
426   }
427 
428   enforce(totalRead == 9);
429 }
430 
431 void testBorrowPartAvailable(CoupledTransports)() if (
432   isCoupledTransports!CoupledTransports
433 ) {
434   scope transports = new CoupledTransports;
435   assert(transports.input);
436   assert(transports.output);
437 
438   ubyte[9] writeBuf = 'a';
439   ubyte[10] readBuf;
440 
441   // Attemping to borrow 10 bytes when only 9 are available should return NULL
442   // immediately.
443   transports.output.write(writeBuf);
444   transports.output.flush();
445 
446   auto t = Trigger(dur!"seconds"(3), transports.output, 1);
447   auto borrowLen = readBuf.length;
448   auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen);
449   enforce(t.fired == 0);
450   enforce(borrowedBuf is null);
451 }
452 
453 void testReadNoneAvailable(CoupledTransports)() if (
454   isCoupledTransports!CoupledTransports
455 ) {
456   scope transports = new CoupledTransports;
457   assert(transports.input);
458   assert(transports.output);
459 
460   // Attempting to read when no data is available should either block until
461   // some data is available, or fail immediately.  (e.g., TSocket blocks,
462   // TMemoryBuffer just fails.)
463   //
464   // If the transport blocks, it should succeed once some data is available,
465   // even if less than the amount requested becomes available.
466   ubyte[10] readBuf;
467 
468   auto t = Trigger(dur!"seconds"(1), transports.output, 2);
469   t.add(dur!"seconds"(1), transports.output, 8);
470 
471   auto bytesRead = transports.input.read(readBuf);
472   if (bytesRead == 0) {
473     enforce(t.fired == 0);
474   } else {
475     enforce(t.fired == 1);
476     enforce(bytesRead == 2);
477   }
478 }
479 
480 void testBorrowNoneAvailable(CoupledTransports)() if (
481   isCoupledTransports!CoupledTransports
482 ) {
483   scope transports = new CoupledTransports;
484   assert(transports.input);
485   assert(transports.output);
486 
487   ubyte[16] writeBuf = 'a';
488 
489   // Attempting to borrow when no data is available should fail immediately
490   auto t = Trigger(dur!"seconds"(1), transports.output, 10);
491 
492   auto borrowLen = 10;
493   auto borrowedBuf = transports.input.borrow(null, borrowLen);
494   enforce(borrowedBuf is null);
495   enforce(t.fired == 0);
496 }
497 
498 
499 void doRwTest(CoupledTransports)(
500   size_t totalSize,
501   SizeGenerator wSizeGen,
502   SizeGenerator rSizeGen,
503   SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0),
504   SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0),
505   size_t maxOutstanding = 0
506 ) if (
507   isCoupledTransports!CoupledTransports
508 ) {
509   totalSize = cast(size_t)(totalSize * g_sizeMultiplier);
510 
scope(failure)511   scope(failure) {
512     writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)",
513       CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen,
514       wChunkSizeGen, rChunkSizeGen, maxOutstanding);
515   }
516 
517   testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen,
518     wChunkSizeGen, rChunkSizeGen, maxOutstanding);
519 }
520 
521 void doBlockingTest(CoupledTransports)() if (
522   isCoupledTransports!CoupledTransports
523 ) {
writeFailure(string name)524   void writeFailure(string name) {
525     writefln("Test failed for %s: %s()", CoupledTransports.stringof, name);
526   }
527 
528   {
529     scope(failure) writeFailure("testReadPartAvailable");
530     testReadPartAvailable!CoupledTransports();
531   }
532 
533   {
534     scope(failure) writeFailure("testReadPartialMidframe");
535     testReadPartialMidframe!CoupledTransports();
536   }
537 
538   {
539     scope(failure) writeFailure("testReadNoneAvaliable");
540     testReadNoneAvailable!CoupledTransports();
541   }
542 
543   {
544     scope(failure) writeFailure("testBorrowPartAvailable");
545     testBorrowPartAvailable!CoupledTransports();
546   }
547 
548   {
549     scope(failure) writeFailure("testBorrowNoneAvailable");
550     testBorrowNoneAvailable!CoupledTransports();
551   }
552 }
553 
getGenerator(T)554 SizeGenerator getGenerator(T)(T t) {
555   static if (is(T : SizeGenerator)) {
556     return t;
557   } else {
558     return new ConstantSizeGenerator(t);
559   }
560 }
561 
562 template WrappedTransports(T) if (isCoupledTransports!T) {
563   alias TypeTuple!(
564     T,
565     CoupledBufferedTransports!T,
566     CoupledFramedTransports!T,
567     CoupledZlibTransports!T
568   ) WrappedTransports;
569 }
570 
571 void testRw(C, R, S)(
572   size_t totalSize,
573   R wSize,
574   S rSize
575 ) if (
576   isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
577   is(typeof(getGenerator(rSize)))
578 ) {
579   testRw!C(totalSize, wSize, rSize, 0, 0, 0);
580 }
581 
582 void testRw(C, R, S, T, U)(
583   size_t totalSize,
584   R wSize,
585   S rSize,
586   T wChunkSize,
587   U rChunkSize,
588   size_t maxOutstanding = 0
589 ) if (
590   isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
591   is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) &&
592   is(typeof(getGenerator(rChunkSize)))
593 ) {
594   foreach (T; WrappedTransports!C) {
595     doRwTest!T(
596       totalSize,
597       getGenerator(wSize),
598       getGenerator(rSize),
599       getGenerator(wChunkSize),
600       getGenerator(rChunkSize),
601       maxOutstanding
602     );
603   }
604 }
605 
606 void testBlocking(C)() if (isCoupledTransports!C) {
607   foreach (T; WrappedTransports!C) {
608     doBlockingTest!T();
609   }
610 }
611 
612 // A quick hack, for the sake of brevity…
613 float g_sizeMultiplier = 1;
614 
version(Posix)615 version (Posix) {
616   immutable defaultTempDir = "/tmp";
617 } else version (Windows) {
618   import core.sys.windows.windows;
619   extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer);
620 
621   string defaultTempDir() @property {
622     char[MAX_PATH + 1] dir;
623     enforce(GetTempPathA(dir.length, dir.ptr));
624     return to!string(dir.ptr)[0 .. $ - 1];
625   }
626 } else static assert(false);
627 
main(string[]args)628 void main(string[] args) {
629   int seed = unpredictableSeed();
630   string tmpDir = defaultTempDir;
631 
632   getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier,
633     "tmp-dir", &tmpDir);
634   enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative.");
635 
636   writefln("Using seed: %s", seed);
637   rndGen().seed(seed);
638   CoupledFileTransports.tmpDir = tmpDir;
639 
640   auto rand4k = new RandomSizeGenerator(1, 4096);
641 
642   /*
643    * We do the basically the same set of tests for each transport type,
644    * although we tweak the parameters in some places.
645    */
646 
647   // TMemoryBuffer tests
648   testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0);
649   testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k);
650   testRw!CoupledMemoryBuffers(1024 * 256, 167, 163);
651   testRw!CoupledMemoryBuffers(1024 * 16, 1, 1);
652 
653   testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k);
654   testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k);
655   testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k);
656   testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k);
657 
658   testBlocking!CoupledMemoryBuffers();
659 
660   // TSocket tests
661   enum socketMaxOutstanding = 4096;
662   testRw!CoupledSocketTransports(1024 * 1024, 0, 0,
663           0, 0, socketMaxOutstanding);
664   testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
665           0, 0, socketMaxOutstanding);
666   testRw!CoupledSocketTransports(1024 * 256, 167, 163,
667           0, 0, socketMaxOutstanding);
668   // Doh.  Apparently writing to a socket has some additional overhead for
669   // each send() call.  If we have more than ~400 outstanding 1-byte write
670   // requests, additional send() calls start blocking.
671   testRw!CoupledSocketTransports(1024 * 16, 1, 1,
672           0, 0, 250);
673   testRw!CoupledSocketTransports(1024 * 256, 0, 0,
674           rand4k, rand4k, socketMaxOutstanding);
675   testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
676           rand4k, rand4k, socketMaxOutstanding);
677   testRw!CoupledSocketTransports(1024 * 256, 167, 163,
678           rand4k, rand4k, socketMaxOutstanding);
679   testRw!CoupledSocketTransports(1024 * 16, 1, 1,
680           rand4k, rand4k, 250);
681 
682   testBlocking!CoupledSocketTransports();
683 
684   // File transport tests.
685 
686   // Cannot write more than the frame size at once.
687   enum maxWriteAtOnce = 1024 * 1024 * 16 - 4;
688 
689   testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0);
690   testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k);
691   testRw!CoupledFileTransports(1024 * 256, 167, 163);
692   testRw!CoupledFileTransports(1024 * 16, 1, 1);
693 
694   testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k);
695   testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k);
696   testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k);
697   testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k);
698 
699   testBlocking!CoupledFileTransports();
700 }
701 
702 
703 /*
704  * Timer handling code for use in tests that check the transport blocking
705  * semantics.
706  *
707  * The implementation has been hacked together in a hurry and wastes a lot of
708  * threads, but speed should not be the concern here.
709  */
710 
711 struct Trigger {
thisTrigger712   this(Duration timeout, TTransport transport, size_t writeLength) {
713     mutex_ = new Mutex;
714     cancelCondition_ = new Condition(mutex_);
715     info_ = new Info(timeout, transport, writeLength);
716     startThread();
717   }
718 
~thisTrigger719   ~this() {
720     synchronized (mutex_) {
721       info_ = null;
722       cancelCondition_.notifyAll();
723     }
724     if (thread_) thread_.join();
725   }
726 
thisTrigger727   @disable this(this) { assert(0); }
728 
addTrigger729   void add(Duration timeout, TTransport transport, size_t writeLength) {
730     synchronized (mutex_) {
731       auto info = new Info(timeout, transport, writeLength);
732       if (info_) {
733         auto prev = info_;
734         while (prev.next) prev = prev.next;
735         prev.next = info;
736       } else {
737         info_ = info;
738         startThread();
739       }
740     }
741   }
742 
firedTrigger743   @property short fired() {
744     return atomicLoad(fired_);
745   }
746 
747 private:
timerThreadTrigger748   void timerThread() {
749     // KLUDGE: Make sure the std.concurrency mbox is initialized on the timer
750     // thread to be able to unblock the file transport.
751     import std.concurrency;
752     thisTid;
753 
754     synchronized (mutex_) {
755       while (info_) {
756         auto cancelled = cancelCondition_.wait(info_.timeout);
757         if (cancelled) {
758           info_ = null;
759           break;
760         }
761 
762         atomicOp!"+="(fired_, 1);
763 
764         // Write some data to the transport to unblock it.
765         auto buf = new ubyte[info_.writeLength];
766         buf[] = 'b';
767         info_.transport.write(buf);
768         info_.transport.flush();
769 
770         info_ = info_.next;
771       }
772     }
773 
774     thread_ = null;
775   }
776 
startThreadTrigger777   void startThread() {
778     thread_ = new Thread(&timerThread);
779     thread_.start();
780   }
781 
782   struct Info {
thisTrigger::Info783     this(Duration timeout, TTransport transport, size_t writeLength) {
784       this.timeout = timeout;
785       this.transport = transport;
786       this.writeLength = writeLength;
787     }
788 
789     Duration timeout;
790     TTransport transport;
791     size_t writeLength;
792     Info* next;
793   }
794 
795   Info* info_;
796   Thread thread_;
797   shared short fired_;
798 
799   import core.sync.mutex;
800   Mutex mutex_;
801   import core.sync.condition;
802   Condition cancelCondition_;
803 }
804