1 #region Copyright notice and license
2 
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Collections.Concurrent;
21 using System.Collections.Generic;
22 using System.Threading;
23 using System.Threading.Tasks;
24 using BenchmarkDotNet.Attributes;
25 using Grpc.Core;
26 
27 namespace Grpc.Microbenchmarks
28 {
29 
30     // common base-type for tests that need to run with some level of concurrency;
31     // note there's nothing *special* about this type - it is just to save some
32     // boilerplate
33 
34     [ClrJob, CoreJob] // test .NET Core and .NET Framework
35     [MemoryDiagnoser] // allocations
36     public abstract class CommonThreadedBase
37     {
38         protected virtual bool NeedsEnvironment => true;
39 
40         [Params(1, 2, 4, 6)]
41         public int ThreadCount { get; set; }
42 
43         protected GrpcEnvironment Environment { get; private set; }
44 
45         private List<Thread> workers;
46 
47         private List<BlockingCollection<Action>> dispatchQueues;
48 
49         [GlobalSetup]
Setup()50         public virtual void Setup()
51         {
52             dispatchQueues = new List<BlockingCollection<Action>>();
53             workers = new List<Thread>();
54             for (int i = 0; i < ThreadCount; i++)
55             {
56                 var dispatchQueue = new BlockingCollection<Action>();
57                 var thread = new Thread(new ThreadStart(() => WorkerThreadBody(dispatchQueue)));
58                 thread.Name = string.Format("threaded benchmark worker {0}", i);
59                 thread.Start();
60                 workers.Add(thread);
61                 dispatchQueues.Add(dispatchQueue);
62             }
63 
64             if (NeedsEnvironment) Environment = GrpcEnvironment.AddRef();
65         }
66 
67         [GlobalCleanup]
Cleanup()68         public virtual void Cleanup()
69         {
70             for (int i = 0; i < ThreadCount; i++)
71             {
72                 dispatchQueues[i].Add(null);  // null action request termination of the worker thread.
73                 workers[i].Join();
74             }
75 
76             if (Environment != null)
77             {
78                 Environment = null;
79                 GrpcEnvironment.ReleaseAsync().Wait();
80             }
81         }
82 
83         /// <summary>
84         /// Runs the operation in parallel (once on each worker thread).
85         /// This method tries to incur as little
86         /// overhead as possible, but there is some inherent overhead
87         /// that is hard to avoid (thread hop etc.). Therefore it is strongly
88         /// recommended that the benchmarked operation runs long enough to
89         /// make this overhead negligible.
90         /// </summary>
RunConcurrent(Action operation)91         protected void RunConcurrent(Action operation)
92         {
93             var workItemTasks = new Task[ThreadCount];
94             for (int i = 0; i < ThreadCount; i++)
95             {
96                 var tcs = new TaskCompletionSource<object>();
97                 var workItem = new Action(() =>
98                 {
99                     try
100                     {
101                         operation();
102                         tcs.SetResult(null);
103                     }
104                     catch (Exception e)
105                     {
106                         tcs.SetException(e);
107                     }
108                 });
109                 workItemTasks[i] = tcs.Task;
110                 dispatchQueues[i].Add(workItem);
111             }
112             Task.WaitAll(workItemTasks);
113         }
114 
WorkerThreadBody(BlockingCollection<Action> dispatchQueue)115         private void WorkerThreadBody(BlockingCollection<Action> dispatchQueue)
116         {
117             while(true)
118             {
119                 var workItem = dispatchQueue.Take();
120                 if (workItem == null)
121                 {
122                     // stop the worker if null action was provided
123                     break;
124                 }
125                 workItem();
126             }
127         }
128     }
129 }
130