1 /* 2 * IThreadPool.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FLOW_ITHREADPOOL_H 22 #define FLOW_ITHREADPOOL_H 23 #pragma once 24 25 #include "flow/flow.h" 26 27 // The IThreadPool interface represents a thread pool suitable for doing blocking disk-intensive work 28 // (as opposed to a one-thread-per-core pool for CPU-intensive work) 29 30 // Normally a thread pool is created by g_network->createThreadPool(), and different networks may have 31 // different implementations (for example, in simulation the thread pool will only be simulated and will 32 // not actually create threads). 33 34 // Once created, the caller must add at least one thread with addThread(), passing a user-defined instance 35 // of IThreadPoolReceiver that will do the work. init() is called on it on the new thread 36 37 // Then the caller calls post() as many times as desired. Each call will invoke the given thread action on 38 // any one of the thread pool receivers passed to addThread(). 39 40 // TypedAction<> is a utility subclass to make it easier to create thread actions and receivers. 41 42 // ThreadReturnPromise<> can be safely use to pass return values from thread actions back to the g_network thread 43 44 class IThreadPoolReceiver { 45 public: ~IThreadPoolReceiver()46 virtual ~IThreadPoolReceiver() {} 47 virtual void init() = 0; 48 }; 49 50 struct ThreadAction { 51 virtual void operator()(IThreadPoolReceiver*) = 0; // self-destructs 52 virtual void cancel() = 0; 53 virtual double getTimeEstimate() = 0; // for simulation 54 }; 55 typedef ThreadAction* PThreadAction; 56 57 class IThreadPool { 58 public: ~IThreadPool()59 virtual ~IThreadPool() {} 60 virtual Future<Void> getError() = 0; // asynchronously throws an error if there is an internal error 61 virtual void addThread( IThreadPoolReceiver* userData ) = 0; 62 virtual void post( PThreadAction action ) = 0; 63 virtual Future<Void> stop() = 0; isCoro()64 virtual bool isCoro() const { return false; } 65 virtual void addref() = 0; 66 virtual void delref() = 0; 67 }; 68 69 template <class Object, class ActionType> 70 class TypedAction : public ThreadAction { 71 public: operator()72 virtual void operator()(IThreadPoolReceiver* p) { 73 Object* o = (Object*)p; 74 o->action(*(ActionType*)this); 75 delete (ActionType*)this; 76 } cancel()77 virtual void cancel() { 78 delete (ActionType*)this; 79 } 80 }; 81 82 template <class T> 83 class ThreadReturnPromise : NonCopyable { 84 public: ThreadReturnPromise()85 ThreadReturnPromise() {} ~ThreadReturnPromise()86 ~ThreadReturnPromise() { if (promise.isValid()) sendError( broken_promise() ); } 87 getFuture()88 Future<T> getFuture() { // Call only on the originating thread! 89 return promise.getFuture(); 90 } 91 send(T const & t)92 void send( T const& t ) { // Can be called safely from another thread. Call send or sendError at most once. 93 Promise<Void> signal; 94 tagAndForward( &promise, t, signal.getFuture() ); 95 g_network->onMainThread( std::move(signal), g_network->getCurrentTask() | 1 ); 96 } sendError(Error const & e)97 void sendError( Error const& e ) { // Can be called safely from another thread. Call send or sendError at most once. 98 Promise<Void> signal; 99 tagAndForwardError( &promise, e, signal.getFuture() ); 100 g_network->onMainThread( std::move(signal), g_network->getCurrentTask() | 1 ); 101 } 102 private: 103 Promise<T> promise; 104 }; 105 106 Reference<IThreadPool> createGenericThreadPool(); 107 108 109 #endif