test-pool.C

For a user to use a thread pool, she probably needs to derive a Thread class from Dv::Thread::Pool::Thread and a Factory class from Dv::Thread::Factory, implementing Dv::Thread::Factory::create

Warning:
The constructor of the Dv::Thread::Factory-derived class should call the Dv::Thread::Factory::init() function and its desctructor should call Dv::Thread::Factory::finit(). See Dv::Thread::Pool::initialize and Dv::Thread::Pool::finalize for the motivation.
// $Id: test-pool.C,v 1.13 2008/11/18 13:30:42 dvermeir Exp $

#include <dvutil/debug.h>
#include <dvthread/pool.h>
#include <dvthread/logstream.h>


class Worker; // Forward declaration.


// The interface for submitting 'jobs' to the pool is through
// Factory::delegate.
class Factory: public Dv::Thread::Factory {
  public:
    Factory(size_t n, const std::string& name, Dv::Debug& dbg): 
        Dv::Thread::Factory(n, name, 0, &dbg), count_(0), data_(0),
        count_monitor_("count-monitor") {
      // Fully initialize the Dv::Thread::Pool object.
          log() << FUNCTION_S << std::endl;
      if (! initialize(5000) ) 
        throw std::runtime_error(FUNCTION_S + ": exception : cannot initialize pool");
    }

    ~Factory() {
      // The follwing call is not strictly necessary.
      finalize(); // Kill all threads which will call Factory::finalize
    }

    
    // Submit a job to the 'pool': store input data 'i' ( it will later
    // be retrieved by the worker using Worker::setup_for_work) and
    // ask Dv::Thread::Pool to delegate to a worker.
    void delegate(int i) {
      data_ = i;
      pool().delegate(-1); // -1: wait indefinitely for a free worker
    }

    // Used by a Worker::setup_for_work
    int data() const {

      return data_;
    }

    size_t count() {
      Dv::Thread::Lock lock(count_monitor_);
      return count_;
    }

    size_t inc_count() {
      Dv::Thread::Lock lock(count_monitor_);
      return ++count_;
    }

  protected:
    Dv::Thread::Pool::Thread* create();

  private:
    size_t count_;
    int data_;
    Dv::Thread::Monitor count_monitor_;
};

class Worker: public Dv::Thread::Pool::Thread {
  public:

    struct Init: public Dv::Thread::Pool::Thread::Init {
      Init(const Worker* w): w_(w) {
        w->log(1) << FUNCTION_S << std::endl;
      }
      ~Init() {
        w_->log(1) << FUNCTION_S << std::endl;
      }
      const Worker* w_;
    };

    Dv::shared_ptr<Dv::Thread::Pool::Thread::Init>
    init() const {
      log() << FUNCTION_S << std::endl;
      return Dv::shared_ptr<Dv::Thread::Pool::Thread::Init>(new Init(this));
    }

    Worker(Factory& factory): Dv::Thread::Pool::Thread(factory.pool()), factory_(factory), data_(0) { 
    }

    ~Worker() {
      log(1) << "WORKER " << sid() << " DESTROYED" << std::endl;
    }

    void setup_for_work() { 
      data_ = factory_.data();
    }

    void work() { 
      log() << factory_.inc_count() << ", data = " << data_ << std::endl; 
    }
    
  private:
    Factory& factory_;
    int data_;
};

Dv::Thread::Pool::Thread*
Factory::create() { 
  return new Worker(*this); 
} 

int
main(int argc, char* argv[]) {
  try {
    static const size_t NT = 10;
    static const size_t NR = 10 * NT;
    // static const size_t NT = 2;
    // static const size_t NR = 2 * NT;
    Dv::Thread::logstream log(std::cerr, "test-stream");
    Dv::Debug debug(&log, 0);

    Factory factory(NT, "Test", debug);
    for (size_t i = 0; (i<NR); ++i) {
      factory.delegate(i);
    }
    do {
      log << "factory.count() = " << factory.count() << std::endl;
      Dv::Thread::Thread::sleep(1000);
    }
    while (factory.count() != NR);
    // log << "bye" << std::endl;
    return 0;
  }
  catch (std::exception& e) { 
    std::cerr << "fatal: " << e.what() << std::endl;
    return 1;
  }
}

dvthread-0.13.4 [11 December, 2009]