1 /*
2  * $Id: AmSessionProcessor.cpp 1585 2009-10-28 22:31:08Z sayer $
3  *
4  * Copyright (C) 2010 Stefan Sayer
5  *
6  * This file is part of SEMS, a free SIP media server.
7  *
8  * SEMS is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version. This program is released under
12  * the GPL with the additional exemption that compiling, linking,
13  * and/or using OpenSSL is allowed.
14  *
15  * For a license to use the SEMS software under conditions
16  * other than those described here, or to purchase support for this
17  * software, please contact iptel.org by e-mail at the following addresses:
18  *    info@iptel.org
19  *
20  * SEMS is distributed in the hope that it will be useful,
21  * but WITHOUT ANY WARRANTY; without even the implied warranty of
22  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  * GNU General Public License for more details.
24  *
25  * You should have received a copy of the GNU General Public License
26  * along with this program; if not, write to the Free Software
27  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
28  */
29 
30 #ifdef SESSION_THREADPOOL
31 
32 #include "AmSessionProcessor.h"
33 #include "AmSession.h"
34 
35 #include <vector>
36 #include <list>
37 
38 vector<AmSessionProcessorThread*> AmSessionProcessor::threads;
39 AmMutex AmSessionProcessor::threads_mut;
40 
41 vector<AmSessionProcessorThread*>::iterator
42 AmSessionProcessor::threads_it = AmSessionProcessor::threads.begin();
43 
getProcessorThread()44 AmSessionProcessorThread* AmSessionProcessor::getProcessorThread() {
45   threads_mut.lock();
46   if (!threads.size()) {
47     ERROR("requesting Session processing thread but none available\n");
48     threads_mut.unlock();
49     return NULL;
50   }
51 
52   // round robin
53   if (threads_it == threads.end())
54     threads_it = threads.begin();
55 
56   AmSessionProcessorThread* res = *threads_it;
57   threads_it++;
58   threads_mut.unlock();
59   return res;
60 }
61 
addThreads(unsigned int num_threads)62 void AmSessionProcessor::addThreads(unsigned int num_threads) {
63   DBG("starting %u session processor threads\n", num_threads);
64   threads_mut.lock();
65   for (unsigned int i=0; i < num_threads;i++) {
66     threads.push_back(new AmSessionProcessorThread());
67     threads.back()->start();
68   }
69   threads_it = threads.begin();
70   DBG("now %zd session processor threads running\n",  threads.size());
71   threads_mut.unlock();
72 }
73 
74 
AmSessionProcessorThread()75 AmSessionProcessorThread::AmSessionProcessorThread()
76   : events(this), runcond(false)
77 {
78 }
79 
~AmSessionProcessorThread()80 AmSessionProcessorThread::~AmSessionProcessorThread() {
81 }
82 
notify(AmEventQueue * sender)83 void AmSessionProcessorThread::notify(AmEventQueue* sender) {
84   process_sessions_mut.lock();
85   runcond.set(true);
86   process_sessions.insert(sender);
87   process_sessions_mut.unlock();
88 }
89 
run()90 void AmSessionProcessorThread::run() {
91 
92   stop_requested = false;
93   while(!stop_requested.get()){
94 
95     runcond.wait_for();
96 
97     DBG("running processing loop\n");
98 
99     process_sessions_mut.lock();
100     runcond.set(false);
101     // get the list of session s that need processing
102     std::set<AmEventQueue*> pending_process_sessions
103       = process_sessions;
104     process_sessions.clear();
105     process_sessions_mut.unlock();
106 
107     // process control events (AmSessionProcessorThreadAddEvent)
108     events.processEvents();
109 
110     // startup all new sessions
111     if (!startup_sessions.empty()) {
112       DBG("starting up %zd sessions\n", startup_sessions.size());
113 
114       for (std::vector<AmSession*>::iterator it=
115 	     startup_sessions.begin();
116 	   it != startup_sessions.end(); it++) {
117 
118 	DBG("starting up [%s|%s]: [%p]\n",
119 	    (*it)->getCallID().c_str(), (*it)->getLocalTag().c_str(),*it);
120 	if ((*it)->startup()) {
121 	  sessions.push_back(*it); // startup successful
122 	  // make sure this session is being processed for startup events
123 	  pending_process_sessions.insert(*it);
124 	}
125       }
126 
127       startup_sessions.clear();
128     }
129 
130     std::vector<AmSession*> fin_sessions;
131 
132     DBG("processing events for  up to %zd sessions\n",
133 	pending_process_sessions.size());
134 
135     std::list<AmSession*>::iterator it=sessions.begin();
136     while (it != sessions.end()) {
137       if ((pending_process_sessions.find(*it)!=
138 	      pending_process_sessions.end()) &&
139 	  (!(*it)->processingCycle())) {
140 	fin_sessions.push_back(*it);
141 	std::list<AmSession*>::iterator d_it = it;
142 	it++;
143 	sessions.erase(d_it);
144       } else {
145 	it++;
146       }
147     }
148 
149     if (fin_sessions.size()) {
150       DBG("finalizing %zd sessions\n", fin_sessions.size());
151       for (std::vector<AmSession*>::iterator it=fin_sessions.begin();
152 	   it != fin_sessions.end(); it++) {
153 	DBG("finalizing session [%p/%s/%s]\n",
154 	    *it, (*it)->getCallID().c_str(), (*it)->getLocalTag().c_str());
155 	(*it)->finalize();
156       }
157     }
158   }
159 }
160 
on_stop()161 void AmSessionProcessorThread::on_stop() {
162   INFO("requesting session to stop.\n");
163   stop_requested.set(true);
164 }
165 
166 // AmEventHandler interface
process(AmEvent * e)167 void AmSessionProcessorThread::process(AmEvent* e) {
168   AmSessionProcessorThreadAddEvent* add_ev =
169     dynamic_cast<AmSessionProcessorThreadAddEvent*>(e);
170 
171   if (NULL==add_ev) {
172     ERROR("received wrong event in AmSessionProcessorThread\n");
173     return;
174   }
175 
176   startup_sessions.push_back(add_ev->s);
177 }
178 
startSession(AmSession * s)179 void AmSessionProcessorThread::startSession(AmSession* s) {
180   // register us to be notified if some event comes to the session
181   s->setEventNotificationSink(this);
182 
183   // add this to be scheduled
184   events.postEvent(new AmSessionProcessorThreadAddEvent(s));
185 
186   // trigger processing of events already in queue at startup
187   notify(s);
188 
189   // wakeup the thread
190   runcond.set(true);
191 }
192 
193 #endif
194