1 /*
2  * Copyright (C) 2012 Frafos GmbH
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "AmEventQueueProcessor.h"
29 #include "AmEventQueue.h"
30 
31 #include <deque>
32 using std::deque;
33 
AmEventQueueProcessor()34 AmEventQueueProcessor::AmEventQueueProcessor()
35 {
36   threads_it = threads.begin();
37 }
38 
~AmEventQueueProcessor()39 AmEventQueueProcessor::~AmEventQueueProcessor()
40 {
41   threads_mut.lock();
42   threads_it = threads.begin();
43   while(threads_it != threads.end()) {
44     (*threads_it)->stop();
45     (*threads_it)->join();
46     delete (*threads_it);
47     threads_it++;
48   }
49   threads_mut.unlock();
50 }
51 
getWorker()52 EventQueueWorker* AmEventQueueProcessor::getWorker()
53 {
54   threads_mut.lock();
55   if (!threads.size()) {
56     ERROR("requesting EventQueue processing thread but none available\n");
57     threads_mut.unlock();
58     return NULL;
59   }
60 
61   // round robin
62   if (threads_it == threads.end())
63     threads_it = threads.begin();
64 
65   EventQueueWorker* res = *threads_it;
66   threads_it++;
67   threads_mut.unlock();
68 
69   return res;
70 }
71 
startEventQueue(AmEventQueue * q)72 int AmEventQueueProcessor::startEventQueue(AmEventQueue* q)
73 {
74   EventQueueWorker* worker = getWorker();
75   if(!worker) return -1;
76 
77   worker->startEventQueue(q);
78   return 0;
79 }
80 
addThreads(unsigned int num_threads)81 void AmEventQueueProcessor::addThreads(unsigned int num_threads)
82 {
83   DBG("starting %u session processor threads\n", num_threads);
84   threads_mut.lock();
85   for (unsigned int i=0; i < num_threads;i++) {
86     threads.push_back(new EventQueueWorker());
87     threads.back()->start();
88   }
89   threads_it = threads.begin();
90   DBG("now %zd session processor threads running\n",  threads.size());
91   threads_mut.unlock();
92 }
93 
94 
EventQueueWorker()95 EventQueueWorker::EventQueueWorker()
96   : runcond(false)
97 {
98 }
99 
~EventQueueWorker()100 EventQueueWorker::~EventQueueWorker() {
101 }
102 
notify(AmEventQueue * sender)103 void EventQueueWorker::notify(AmEventQueue* sender)
104 {
105   process_queues_mut.lock();
106   process_queues.push_back(sender);
107   inc_ref(sender);
108   runcond.set(true);
109   process_queues_mut.unlock();
110 }
111 
run()112 void EventQueueWorker::run()
113 {
114   stop_requested = false;
115   while(!stop_requested.get()){
116 
117     runcond.wait_for();
118 
119     DBG("running processing loop\n");
120     process_queues_mut.lock();
121     while(!process_queues.empty()) {
122 
123       AmEventQueue* ev_q = process_queues.front();
124       process_queues.pop_front();
125       process_queues_mut.unlock();
126 
127       if(!ev_q->processingCycle()) {
128 	ev_q->setEventNotificationSink(NULL);
129 	if(!ev_q->is_finalized())
130 	  ev_q->finalize();
131       }
132       dec_ref(ev_q);
133 
134       process_queues_mut.lock();
135     }
136 
137     runcond.set(false);
138     process_queues_mut.unlock();
139   }
140 }
141 
on_stop()142 void EventQueueWorker::on_stop()
143 {
144   INFO("requesting worker to stop.\n");
145   stop_requested.set(true);
146   runcond.set(true);
147 }
148 
startEventQueue(AmEventQueue * q)149 void EventQueueWorker::startEventQueue(AmEventQueue* q)
150 {
151   if(q->startup())
152     // register us to be notified if some event comes to the session
153     q->setEventNotificationSink(this);
154 }
155