1 /*
2  * IThreadPool.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FLOW_ITHREADPOOL_H
22 #define FLOW_ITHREADPOOL_H
23 #pragma once
24 
25 #include "flow/flow.h"
26 
27 // The IThreadPool interface represents a thread pool suitable for doing blocking disk-intensive work
28 // (as opposed to a one-thread-per-core pool for CPU-intensive work)
29 
30 // Normally a thread pool is created by g_network->createThreadPool(), and different networks may have
31 // different implementations (for example, in simulation the thread pool will only be simulated and will
32 // not actually create threads).
33 
34 // Once created, the caller must add at least one thread with addThread(), passing a user-defined instance
35 // of IThreadPoolReceiver that will do the work.  init() is called on it on the new thread
36 
37 // Then the caller calls post() as many times as desired.  Each call will invoke the given thread action on
38 // any one of the thread pool receivers passed to addThread().
39 
40 // TypedAction<> is a utility subclass to make it easier to create thread actions and receivers.
41 
42 // ThreadReturnPromise<> can be safely use to pass return values from thread actions back to the g_network thread
43 
44 class IThreadPoolReceiver {
45 public:
~IThreadPoolReceiver()46 	virtual ~IThreadPoolReceiver() {}
47 	virtual void init() = 0;
48 };
49 
50 struct ThreadAction {
51 	virtual void operator()(IThreadPoolReceiver*) = 0;		// self-destructs
52 	virtual void cancel() = 0;
53 	virtual double getTimeEstimate() = 0;                   // for simulation
54 };
55 typedef ThreadAction* PThreadAction;
56 
57 class IThreadPool {
58 public:
~IThreadPool()59 	virtual ~IThreadPool() {}
60 	virtual Future<Void> getError() = 0;  // asynchronously throws an error if there is an internal error
61 	virtual void addThread( IThreadPoolReceiver* userData ) = 0;
62 	virtual void post( PThreadAction action ) = 0;
63 	virtual Future<Void> stop() = 0;
isCoro()64 	virtual bool isCoro() const { return false; }
65 	virtual void addref() = 0;
66 	virtual void delref() = 0;
67 };
68 
69 template <class Object, class ActionType>
70 class TypedAction : public ThreadAction {
71 public:
operator()72 	virtual void operator()(IThreadPoolReceiver* p) {
73 		Object* o = (Object*)p;
74 		o->action(*(ActionType*)this);
75 		delete (ActionType*)this;
76 	}
cancel()77 	virtual void cancel() {
78 		delete (ActionType*)this;
79 	}
80 };
81 
82 template <class T>
83 class ThreadReturnPromise : NonCopyable {
84 public:
ThreadReturnPromise()85 	ThreadReturnPromise() {}
~ThreadReturnPromise()86 	~ThreadReturnPromise() { if (promise.isValid()) sendError( broken_promise() ); }
87 
getFuture()88 	Future<T> getFuture() {  // Call only on the originating thread!
89 		return promise.getFuture();
90 	}
91 
send(T const & t)92 	void send( T const& t ) {  // Can be called safely from another thread.  Call send or sendError at most once.
93 		Promise<Void> signal;
94 		tagAndForward( &promise, t, signal.getFuture() );
95 		g_network->onMainThread( std::move(signal), g_network->getCurrentTask() | 1 );
96 	}
sendError(Error const & e)97 	void sendError( Error const& e ) {  // Can be called safely from another thread.  Call send or sendError at most once.
98 		Promise<Void> signal;
99 		tagAndForwardError( &promise, e, signal.getFuture() );
100 		g_network->onMainThread( std::move(signal), g_network->getCurrentTask() | 1 );
101 	}
102 private:
103 	Promise<T> promise;
104 };
105 
106 Reference<IThreadPool>	createGenericThreadPool();
107 
108 
109 #endif