For a user to use a thread pool, she probably needs to derive a Thread class from Dv::Thread::Pool::Thread and a Factory class from Dv::Thread::Factory, implementing Dv::Thread::Factory::create
// $Id: test-pool.C,v 1.13 2008/11/18 13:30:42 dvermeir Exp $ #include <dvutil/debug.h> #include <dvthread/pool.h> #include <dvthread/logstream.h> class Worker; // Forward declaration. // The interface for submitting 'jobs' to the pool is through // Factory::delegate. class Factory: public Dv::Thread::Factory { public: Factory(size_t n, const std::string& name, Dv::Debug& dbg): Dv::Thread::Factory(n, name, 0, &dbg), count_(0), data_(0), count_monitor_("count-monitor") { // Fully initialize the Dv::Thread::Pool object. log() << FUNCTION_S << std::endl; if (! initialize(5000) ) throw std::runtime_error(FUNCTION_S + ": exception : cannot initialize pool"); } ~Factory() { // The follwing call is not strictly necessary. finalize(); // Kill all threads which will call Factory::finalize } // Submit a job to the 'pool': store input data 'i' ( it will later // be retrieved by the worker using Worker::setup_for_work) and // ask Dv::Thread::Pool to delegate to a worker. void delegate(int i) { data_ = i; pool().delegate(-1); // -1: wait indefinitely for a free worker } // Used by a Worker::setup_for_work int data() const { return data_; } size_t count() { Dv::Thread::Lock lock(count_monitor_); return count_; } size_t inc_count() { Dv::Thread::Lock lock(count_monitor_); return ++count_; } protected: Dv::Thread::Pool::Thread* create(); private: size_t count_; int data_; Dv::Thread::Monitor count_monitor_; }; class Worker: public Dv::Thread::Pool::Thread { public: struct Init: public Dv::Thread::Pool::Thread::Init { Init(const Worker* w): w_(w) { w->log(1) << FUNCTION_S << std::endl; } ~Init() { w_->log(1) << FUNCTION_S << std::endl; } const Worker* w_; }; Dv::shared_ptr<Dv::Thread::Pool::Thread::Init> init() const { log() << FUNCTION_S << std::endl; return Dv::shared_ptr<Dv::Thread::Pool::Thread::Init>(new Init(this)); } Worker(Factory& factory): Dv::Thread::Pool::Thread(factory.pool()), factory_(factory), data_(0) { } ~Worker() { log(1) << "WORKER " << sid() << " DESTROYED" << std::endl; } void setup_for_work() { data_ = factory_.data(); } void work() { log() << factory_.inc_count() << ", data = " << data_ << std::endl; } private: Factory& factory_; int data_; }; Dv::Thread::Pool::Thread* Factory::create() { return new Worker(*this); } int main(int argc, char* argv[]) { try { static const size_t NT = 10; static const size_t NR = 10 * NT; // static const size_t NT = 2; // static const size_t NR = 2 * NT; Dv::Thread::logstream log(std::cerr, "test-stream"); Dv::Debug debug(&log, 0); Factory factory(NT, "Test", debug); for (size_t i = 0; (i<NR); ++i) { factory.delegate(i); } do { log << "factory.count() = " << factory.count() << std::endl; Dv::Thread::Thread::sleep(1000); } while (factory.count() != NR); // log << "bye" << std::endl; return 0; } catch (std::exception& e) { std::cerr << "fatal: " << e.what() << std::endl; return 1; } }
dvthread-0.13.4 | [11 December, 2009] |