1 /*
2  * Copyright (C) 2012 Frafos GmbH
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #ifndef _AmEventQueueProcessor_h_
29 #define _AmEventQueueProcessor_h_
30 
31 #include "AmThread.h"
32 #include "AmEventQueue.h"
33 
34 #include <deque>
35 #include <vector>
36 using std::deque;
37 using std::vector;
38 
39 class EventQueueWorker
40 : public AmThread,
41   public AmEventNotificationSink
42 {
43   AmSharedVar<bool> stop_requested;
44 
45   AmCondition<bool> runcond;
46   std::deque<AmEventQueue*> process_queues;
47   AmMutex process_queues_mut;
48 
49  public:
50   EventQueueWorker();
51   ~EventQueueWorker();
52 
53   // AmThread interface
54   void run();
55   void on_stop();
56 
57   // AmEventNotificationSink interface
58   void notify(AmEventQueue* sender);
59 
60   void startEventQueue(AmEventQueue* q);
61 };
62 
63 /**
64  * The event queue processor processes event queues
65  * using a pool of workers.
66  *
67  * Note: the queue's ref-count should be increased
68  * before binding the queue to the processor. Also,
69  * the ref-count should be decreased at some place
70  * during or after finalize(), so that the queue
71  * gets disposed correctly.
72  */
73 class AmEventQueueProcessor {
74 
75   typedef vector<EventQueueWorker*> Workers;
76 
77   Workers threads;
78   AmMutex threads_mut;
79   Workers::iterator threads_it;
80 
81  public:
82   AmEventQueueProcessor();
83   ~AmEventQueueProcessor();
84 
85   void addThreads(unsigned int num_threads);
86 
87   EventQueueWorker* getWorker();
88   int startEventQueue(AmEventQueue* q);
89 };
90 
91 #endif // _AmEventQueueProcessor_h_
92