00001 #ifndef DV_THREAD_POOL_H 00002 #define DV_THREAD_POOL_H 00003 // $Id: pool.h,v 1.24 2008/03/15 22:45:40 dvermeir Exp $ 00004 00005 #include <list> 00006 #include <vector> 00007 #include <algorithm> 00008 00009 #include <dvutil/debug.h> 00010 #include <dvutil/shared_ptr.h> 00011 #include <dvthread/monitor.h> 00012 #include <dvthread/lock.h> 00013 #include <dvthread/flexlock.h> 00014 00015 namespace Dv { 00016 namespace Thread { 00017 00018 class Factory; 00019 00020 /** A pool of worker threads. Normally, a pool is owned 00021 * by a Dv::Thread::Pool::Factory. 00022 * @sa Dv::Thread::Pool::Factor 00023 */ 00024 class Pool: public Monitor { 00025 public: 00026 /** Base class for threads that can be kept in a Dv::Thread::Pool. 00027 * @sa Dv::Thread::Pool::Factory::create 00028 * @sa Dv::Thread::Pool::Factory::create 00029 */ 00030 class Thread: public Dv::Thread::Thread { 00031 public: 00032 friend class Pool; 00033 friend class Init; 00034 00035 /** Constructor. 00036 * @param pool to which the thread belongs 00037 */ 00038 Thread(Pool& pool); 00039 00040 /** Destructor. */ 00041 virtual ~Thread(); 00042 00043 /** Do some work. This function is called 00044 * by Dv::Thread::Pool::Thread::main whenever the 00045 * thread is selected by Dv::Thread::Pool::delegate. 00046 * @warning if the function implements a loop, it should 00047 * check for killed() and exit the function as soon 00048 * as killed() is detected. 00049 * @sa Dv::Thread::Pool::~Pool 00050 */ 00051 virtual void work() = 0; 00052 00053 /** Set up this thread for work. Typically, 00054 * this would fill in some input data, e.g. by grabbing them 00055 * from the Thread factory (see test-pool.C in the distribution 00056 * for an example). 00057 */ 00058 virtual void setup_for_work() = 0; 00059 00060 /** Main function of the thread. 00061 * @return 1 iff an exception was raised by Thread::work 00062 * @return 0 otherwise 00063 */ 00064 int main(); 00065 00066 /** Auxiliary class which is used to support 00067 * thread specific initialization and finalization. 00068 * The user should subclass this, say 00069 * in a derived class MyInit so that the 00070 * MyInit constructor does the initialization 00071 * and the MyInit destructor does the finalization. 00072 * In addition, the user should define 00073 * a version of Dv::Thread::Pool::Thread::init 00074 * that creates a shared pointer to a newly 00075 * created MyInit object. 00076 * @sa Dv::Thread::Thread::init 00077 */ 00078 struct Init { 00079 virtual ~Init() {} 00080 typedef Dv::shared_ptr<Init> shared_ptr; 00081 }; 00082 00083 protected: 00084 00085 /** Function that creates a shared pointer 00086 * to a (class derived from) Init object. 00087 * @sa Dv::Thread::Thread::Init 00088 */ 00089 virtual Init::shared_ptr init() const { 00090 return Init::shared_ptr(new Init); 00091 } 00092 private: 00093 00094 /** Set up this thread with input for work. 00095 * This function grabs the monitor and 00096 * calls Dv::Pool::Thread::setup_for_work 00097 * @sa Dv::Pool::Thread::setup_for_work 00098 * @sa Dv::Pool::delegate 00099 */ 00100 void setup(); 00101 00102 enum { WORK = 0 /** signaled if the thread is due for work */ }; 00103 00104 /** Wait for the WORK condition to be signaled. 00105 * @return true iff the WORK condition was actually signaled 00106 * @return false if the wait timed out (after 1 second). 00107 */ 00108 bool wait() { return monitor_.wait(WORK, 1000); } 00109 00110 /** Signal the WORK condition. This function should not be called 00111 * by the Thread itself. 00112 * @sa Dv::Thread::Pool::delegate 00113 */ 00114 void signal() { 00115 Dv::Thread::Lock lock(monitor_); 00116 monitor_.signal(WORK); 00117 } 00118 00119 Monitor monitor_; 00120 Pool& pool_; 00121 00122 }; 00123 00124 friend class Factory; 00125 friend class Thread; 00126 00127 /** Delegate some work to an available worker thread. 00128 * If a thread is available, this function will among 00129 * other call its Dv::Thread::Pool::Thread::setup 00130 * function to supply it with input and then signal it. 00131 * @param msecs we are prepared to wait until a thread is 00132 * ready for work (so 0 means immediately return false 00133 * if there is no thread waiting for work). If negative, 00134 * we will wait indefinitely until a worker thread becomes 00135 * available. 00136 * @return true iff the delegation was succesful. 00137 * @return false iff no worker threads are available 00138 * @sa Dv::Thread::Pool::Thread::setup 00139 * @sa Dv::Thread::Pool::Thread::signal 00140 */ 00141 bool delegate(time_t msecs = 0); 00142 00143 /** @return the number of threads that are waiting for work. */ 00144 size_t slack() const { return free_.size(); } 00145 00146 /** Pool status */ 00147 enum Status { 00148 NONE = 0 /** not yet initialized */, 00149 INITIALIZED = 1 /** initialized but not yet finalized */, 00150 FINALIZED = 2 /** finalized */ 00151 }; 00152 00153 /** @return status of pool */ 00154 Status status() const { return status_; } 00155 00156 /** @return the number of threads in the pool. */ 00157 size_t size() const { return all_.size(); } 00158 private: 00159 /** Constructor. 00160 * Note that the actual initialisation of the threads in the 00161 * pool is done by Dv::Thread::Pool::initialize, which 00162 * is not called by this constructor. It should be called 00163 * by the constructor of the user's class derived 00164 * from Dv::Thread::Factory. 00165 * @param max_threads number of threads that will be kept in the pool. 00166 * @param factory object that is capable of creating a new thread 00167 * @param name for the pool's monitor. 00168 * @sa Dv::Thread::Factory::initialize 00169 * @sa Dv::Thread::Factory::finalize 00170 */ 00171 Pool(size_t max_threads, Factory& factory, const std::string& name); 00172 00173 /** Destructor. 00174 * Note that the actual finalisation of the threads in the 00175 * pool is done by Dv::Thread::Pool::finalize, which 00176 * is not called by this destructor. It should have been called 00177 * by the destructor of the user's class derived 00178 * from Dv::Thread::Factory. 00179 * @sa Dv::Thread::Factory::finalize 00180 * @sa Dv::Thread::Factory::initialize 00181 */ 00182 ~Pool(); 00183 00184 /** @return name of this pool (used as name for its monitor 00185 * and also as the basis for the names of the threads' monitor). 00186 */ 00187 const std::string& name() const { return name_; } 00188 00189 private: 00190 /** Initialize a Dv::Thread::Pool. This is separate from 00191 * the constructor because the function creates and 00192 * starts a number of threads using the pool's Dv::Thread::Factory::create 00193 * pure virtual function which only becomes available in 00194 * the constructor of the Dv::Thread::Factory-derived user class. 00195 * @param millisecs timeout after which it is assumed that 00196 * the initialization failed. 00197 * @return true iff the initialization succeeded 00198 * @sa Dv::Thread::Factory::init 00199 */ 00200 bool initialize(time_t millisecs); 00201 /** Finalize a Dv::Thread::Pool by killing all its threads (and joining them). 00202 * This is separate from 00203 * the destructor because the function kills 00204 * a number of threads which involves the pool's Dv::Thread::Factory::finalize 00205 * pure virtual function which only remains available in 00206 * the destructor of the Dv::Thread::Factory-derived user class. 00207 * @param millisecs timeout after which it is assumed that 00208 * the initialization failed. 00209 * @return true iff the finalization succeeded 00210 * @sa Dv::Thread::Factory::finit 00211 */ 00212 bool finalize(time_t millisecs); 00213 00214 /** Called at the beginning and the end of a thread's main function. 00215 * If the total number of live threads becomes 0, the DEAD condition 00216 * of the Pool's monitor will be signaled. Conversely, if 00217 * the total number of live threads becomes equal to the capacity 00218 * of the pool, the LIVE condition will be signaled. 00219 * @param n whether a thread started (+1) or finished (-1) 00220 * @pre -1 <= n <= +1 00221 */ 00222 void add_live(int n); 00223 void add_free(Thread* t); 00224 00225 Pool(const Pool&); 00226 Pool& operator=(const Pool&); 00227 00228 /** Factory to which the pool belongs. */ 00229 Factory& factory_; 00230 /** Vector of all threads in the pool */ 00231 std::vector<Thread*> all_; 00232 /** List of threads waiting for work. */ 00233 std::list<Thread*> free_; 00234 /** Monitor to use for controlling access to free_ */ 00235 Monitor free_monitor_; 00236 /** Name of the pool. */ 00237 std::string name_; 00238 /** Number of threads that have started their main function and 00239 * not yet finished it. 00240 */ 00241 size_t live_threads_; 00242 Status status_; 00243 }; 00244 00245 /** For a user to use a thread pool, she probably needs to derive 00246 * a Thread class from Dv::Thread::Pool::Thread and a Factory class from 00247 * Dv::Thread::Factory, implementing Dv::Thread::Factory::create 00248 * @warning 00249 * The constructor of the Dv::Thread::Factory-derived class should call 00250 * the Dv::Thread::Factory::init() function and its desctructor 00251 * should call Dv::Thread::Factory::finit(). See 00252 * Dv::Thread::Pool::initialize and 00253 * Dv::Thread::Pool::finalize for the motivation. 00254 * @example test-pool.C 00255 */ 00256 class Factory: public Dv::DebugSlave { 00257 public: 00258 friend class Pool; 00259 /** Create a new factory with assciated pool. 00260 * The constructor of the user's class derived from 00261 * this class should call Dv::Thread::Factory::initialize, 00262 * see the comments for Dv::Thread::Pool::initialize. 00263 * @param max_threads number of threads that will be kept in the pool. 00264 * @param name for the pool's monitor. 00265 * @param min_debug_level if a debug_master is connected, logging info 00266 * will only be written if the master's level is at least @a min_debug_level 00267 * @param debug_master from where debug info will be taken 00268 * @sa Dv::DebugSlave 00269 * @sa Dv::Thread::Pool::initialize 00270 */ 00271 Factory(size_t max_threads, const std::string& name, 00272 unsigned int min_debug_level = 0, Debugable* debug_master = 0): 00273 DebugSlave(min_debug_level, debug_master), pool_(max_threads, *this, name) { 00274 } 00275 00276 /* Destructor. 00277 * The destructor of the user's class derived from 00278 * this class should call Dv::Thread::Factory::finalize, 00279 * see the comments for Dv::Thread::Pool::finalize. 00280 * @sa Dv::Thread::Pool::finalize 00281 */ 00282 virtual ~Factory() { 00283 log() << FUNCTION_S << std::endl; 00284 } 00285 00286 Pool& pool() { 00287 return pool_; 00288 } 00289 00290 size_t size() const { return pool_.size(); } 00291 /** @return the number of threads that are currently waiting for work. */ 00292 size_t slack() const { return pool_.slack(); } 00293 protected: 00294 /** Create a new Thread. 00295 * @return a newly create thread, must not return 0 00296 */ 00297 virtual Pool::Thread* create() = 0; 00298 00299 /** Initialize the pool of this factory. 00300 * @sa Dv::Thread::Pool::initialize 00301 */ 00302 bool initialize(time_t millisecs = 5000) { 00303 return pool().initialize(millisecs); 00304 } 00305 00306 /** Finalize the pool of this factory. 00307 * @sa Dv::Thread::Pool::finalize 00308 */ 00309 bool finalize(time_t millisecs = 5000) { 00310 return pool().finalize(millisecs); 00311 } 00312 00313 private: 00314 Pool pool_; 00315 }; 00316 } 00317 } 00318 #endif
dvthread-0.13.4 | [11 December, 2009] |