00001
00002 #ifndef DV_THREAD_ACTORPOOL_H
00003 #define DV_THREAD_ACTORPOOL_H
00004
00005 #include <functional>
00006 #include <vector>
00007 #include <dvutil/shared_ptr.h>
00008 #include <dvthread/mailbox.h>
00009
00010 namespace Dv {
00011 namespace Thread {
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 template<typename HandlerFactory>
00026 class ActorPool: public Dv::Thread::Thread {
00027 public:
00028 typedef typename HandlerFactory::Handler Handler;
00029 typedef typename Handler::argument_type Message;
00030 typedef typename Handler::result_type Reply;
00031 typedef MailBox<Reply> ReplyBox;
00032 typedef std::pair<Message,ReplyBox*> MessageReply;
00033
00034 class Actor: public Dv::Thread::Thread {
00035 public:
00036 Actor(ActorPool<HandlerFactory>& pool, Dv::shared_ptr<Handler> handler):
00037 pool_(pool), handler_(handler) {
00038 if (!handler_)
00039 throw std::runtime_error(FUNCTION_S + ": null handler");
00040 start();
00041 }
00042
00043 Handler& handler() { return *handler_; }
00044
00045 int main() {
00046 try {
00047 while (! killed()) {
00048 try {
00049 MessageReply mr = pool_.mbox_.get();
00050 if (mr.second)
00051 mr.second->put((*handler_)(mr.first));
00052 else
00053 (*handler_)(mr.first);
00054 }
00055 catch (std::runtime_error& e) {
00056 }
00057 }
00058 std::cerr << " actor killed()" << std::endl;
00059 return 0;
00060 }
00061 catch (std::exception& e) {
00062 return -1;
00063 }
00064 catch (int e) {
00065 return e;
00066 }
00067 }
00068 private:
00069 ActorPool<HandlerFactory>& pool_;
00070 Dv::shared_ptr<Handler> handler_;
00071 };
00072
00073 ActorPool(const std::string& name, HandlerFactory& factory,
00074 size_t n_threads, size_t max_capacity = 0,
00075 unsigned int min_debug_level = 0, Debugable* debug_master=0):
00076 name_(name), factory_(factory), n_threads_(n_threads),
00077 mbox_(name + "_mbox", max_capacity, min_debug_level, debug_master) {
00078 start();
00079 }
00080
00081 ~ActorPool() {
00082 std::cerr << FUNCTION_S << std::endl;
00083 kill();
00084 wait();
00085 }
00086
00087 int main() {
00088 try {
00089 detach();
00090 for (size_t i=0; (i<n_threads_); ++i)
00091 actors_.push_back(new Actor(*this, factory_.create_handler()));
00092
00093 while (! killed())
00094 try {
00095 factory_.work();
00096 }
00097 catch (std::runtime_error& e) {
00098 }
00099
00100 std::cerr << name() << "pool killed()" << std::endl;
00101
00102 for (typename std::vector<Actor*>::const_iterator a = actors_.begin();
00103 a != actors_.end(); ++a) {
00104 (*a)->kill();
00105 }
00106 for (typename std::vector<Actor*>::iterator a = actors_.begin();
00107 a != actors_.end(); ++a) {
00108 (*a)->join();
00109 factory_.cleanup_handler((*a)->handler());
00110 delete (*a);
00111 *a = 0;
00112 }
00113
00114 std::cerr << name() << "pool exit()" << std::endl;
00115
00116 return 0;
00117 }
00118 catch (std::exception& e) {
00119 return -1;
00120 }
00121 catch (int e) {
00122 return e;
00123 }
00124 }
00125
00126 void request(const Message& m, ReplyBox* r = 0) {
00127 mbox_.put(std::make_pair(m,r));
00128 }
00129
00130 const std::string& name() const { return name_; }
00131
00132 private:
00133 const std::string name_;
00134 HandlerFactory& factory_;
00135 size_t n_threads_;
00136 std::vector<Actor*> actors_;
00137 MailBox<MessageReply> mbox_;
00138 };
00139 }
00140 }
00141 #endif