1 /*
2 * $Id: AmSessionProcessor.cpp 1585 2009-10-28 22:31:08Z sayer $
3 *
4 * Copyright (C) 2010 Stefan Sayer
5 *
6 * This file is part of SEMS, a free SIP media server.
7 *
8 * SEMS is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version. This program is released under
12 * the GPL with the additional exemption that compiling, linking,
13 * and/or using OpenSSL is allowed.
14 *
15 * For a license to use the SEMS software under conditions
16 * other than those described here, or to purchase support for this
17 * software, please contact iptel.org by e-mail at the following addresses:
18 * info@iptel.org
19 *
20 * SEMS is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with this program; if not, write to the Free Software
27 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 */
29
30 #ifdef SESSION_THREADPOOL
31
32 #include "AmSessionProcessor.h"
33 #include "AmSession.h"
34
35 #include <vector>
36 #include <list>
37
38 vector<AmSessionProcessorThread*> AmSessionProcessor::threads;
39 AmMutex AmSessionProcessor::threads_mut;
40
41 vector<AmSessionProcessorThread*>::iterator
42 AmSessionProcessor::threads_it = AmSessionProcessor::threads.begin();
43
getProcessorThread()44 AmSessionProcessorThread* AmSessionProcessor::getProcessorThread() {
45 threads_mut.lock();
46 if (!threads.size()) {
47 ERROR("requesting Session processing thread but none available\n");
48 threads_mut.unlock();
49 return NULL;
50 }
51
52 // round robin
53 if (threads_it == threads.end())
54 threads_it = threads.begin();
55
56 AmSessionProcessorThread* res = *threads_it;
57 threads_it++;
58 threads_mut.unlock();
59 return res;
60 }
61
addThreads(unsigned int num_threads)62 void AmSessionProcessor::addThreads(unsigned int num_threads) {
63 DBG("starting %u session processor threads\n", num_threads);
64 threads_mut.lock();
65 for (unsigned int i=0; i < num_threads;i++) {
66 threads.push_back(new AmSessionProcessorThread());
67 threads.back()->start();
68 }
69 threads_it = threads.begin();
70 DBG("now %zd session processor threads running\n", threads.size());
71 threads_mut.unlock();
72 }
73
74
AmSessionProcessorThread()75 AmSessionProcessorThread::AmSessionProcessorThread()
76 : events(this), runcond(false)
77 {
78 }
79
~AmSessionProcessorThread()80 AmSessionProcessorThread::~AmSessionProcessorThread() {
81 }
82
notify(AmEventQueue * sender)83 void AmSessionProcessorThread::notify(AmEventQueue* sender) {
84 process_sessions_mut.lock();
85 runcond.set(true);
86 process_sessions.insert(sender);
87 process_sessions_mut.unlock();
88 }
89
run()90 void AmSessionProcessorThread::run() {
91
92 stop_requested = false;
93 while(!stop_requested.get()){
94
95 runcond.wait_for();
96
97 DBG("running processing loop\n");
98
99 process_sessions_mut.lock();
100 runcond.set(false);
101 // get the list of session s that need processing
102 std::set<AmEventQueue*> pending_process_sessions
103 = process_sessions;
104 process_sessions.clear();
105 process_sessions_mut.unlock();
106
107 // process control events (AmSessionProcessorThreadAddEvent)
108 events.processEvents();
109
110 // startup all new sessions
111 if (!startup_sessions.empty()) {
112 DBG("starting up %zd sessions\n", startup_sessions.size());
113
114 for (std::vector<AmSession*>::iterator it=
115 startup_sessions.begin();
116 it != startup_sessions.end(); it++) {
117
118 DBG("starting up [%s|%s]: [%p]\n",
119 (*it)->getCallID().c_str(), (*it)->getLocalTag().c_str(),*it);
120 if ((*it)->startup()) {
121 sessions.push_back(*it); // startup successful
122 // make sure this session is being processed for startup events
123 pending_process_sessions.insert(*it);
124 }
125 }
126
127 startup_sessions.clear();
128 }
129
130 std::vector<AmSession*> fin_sessions;
131
132 DBG("processing events for up to %zd sessions\n",
133 pending_process_sessions.size());
134
135 std::list<AmSession*>::iterator it=sessions.begin();
136 while (it != sessions.end()) {
137 if ((pending_process_sessions.find(*it)!=
138 pending_process_sessions.end()) &&
139 (!(*it)->processingCycle())) {
140 fin_sessions.push_back(*it);
141 std::list<AmSession*>::iterator d_it = it;
142 it++;
143 sessions.erase(d_it);
144 } else {
145 it++;
146 }
147 }
148
149 if (fin_sessions.size()) {
150 DBG("finalizing %zd sessions\n", fin_sessions.size());
151 for (std::vector<AmSession*>::iterator it=fin_sessions.begin();
152 it != fin_sessions.end(); it++) {
153 DBG("finalizing session [%p/%s/%s]\n",
154 *it, (*it)->getCallID().c_str(), (*it)->getLocalTag().c_str());
155 (*it)->finalize();
156 }
157 }
158 }
159 }
160
on_stop()161 void AmSessionProcessorThread::on_stop() {
162 INFO("requesting session to stop.\n");
163 stop_requested.set(true);
164 }
165
166 // AmEventHandler interface
process(AmEvent * e)167 void AmSessionProcessorThread::process(AmEvent* e) {
168 AmSessionProcessorThreadAddEvent* add_ev =
169 dynamic_cast<AmSessionProcessorThreadAddEvent*>(e);
170
171 if (NULL==add_ev) {
172 ERROR("received wrong event in AmSessionProcessorThread\n");
173 return;
174 }
175
176 startup_sessions.push_back(add_ev->s);
177 }
178
startSession(AmSession * s)179 void AmSessionProcessorThread::startSession(AmSession* s) {
180 // register us to be notified if some event comes to the session
181 s->setEventNotificationSink(this);
182
183 // add this to be scheduled
184 events.postEvent(new AmSessionProcessorThreadAddEvent(s));
185
186 // trigger processing of events already in queue at startup
187 notify(s);
188
189 // wakeup the thread
190 runcond.set(true);
191 }
192
193 #endif
194