1 /*
2 * Copyright (C) 2007 Raphael Coeffic
3 *
4 * This file is part of SEMS, a free SIP media server.
5 *
6 * SEMS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version. This program is released under
10 * the GPL with the additional exemption that compiling, linking,
11 * and/or using OpenSSL is allowed.
12 *
13 * For a license to use the SEMS software under conditions
14 * other than those described here, or to purchase support for this
15 * software, please contact iptel.org by e-mail at the following addresses:
16 * info@iptel.org
17 *
18 * SEMS is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with this program; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 */
27
28 #include "AmEventDispatcher.h"
29 #include "AmSipEvent.h"
30 #include "AmConfig.h"
31 #include "sip/hash.h"
32
hash(const string & s1)33 unsigned int AmEventDispatcher::hash(const string& s1)
34 {
35 return hashlittle(s1.c_str(),s1.length(),0)
36 & (EVENT_DISPATCHER_BUCKETS-1);
37 }
38
hash(const string & s1,const string s2)39 unsigned int AmEventDispatcher::hash(const string& s1, const string s2)
40 {
41 unsigned int h=0;
42
43 h = hashlittle(s1.c_str(),s1.length(),h);
44 h = hashlittle(s2.c_str(),s2.length(),h);
45
46 return h & (EVENT_DISPATCHER_BUCKETS-1);
47 }
48
49 AmEventDispatcher* AmEventDispatcher::_instance=NULL;
50
instance()51 AmEventDispatcher* AmEventDispatcher::instance()
52 {
53 return _instance ? _instance : ((_instance = new AmEventDispatcher()));
54 }
55
56
addEventQueue(const string & local_tag,AmEventQueueInterface * q)57 bool AmEventDispatcher::addEventQueue(const string& local_tag,
58 AmEventQueueInterface* q)
59 {
60 unsigned int queue_bucket = hash(local_tag);
61
62 queues_mut[queue_bucket].lock();
63
64 if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) {
65 queues_mut[queue_bucket].unlock();
66 return false;
67 }
68
69 queues[queue_bucket][local_tag] = QueueEntry(q);
70 queues_mut[queue_bucket].unlock();
71
72 return true;
73 }
74
75
76 /** @return false on error */
addEventQueue(const string & local_tag,AmEventQueueInterface * q,const string & callid,const string & remote_tag,const string & via_branch)77 bool AmEventDispatcher::addEventQueue(const string& local_tag,
78 AmEventQueueInterface* q,
79 const string& callid,
80 const string& remote_tag,
81 const string& via_branch)
82 {
83 if(local_tag.empty () ||callid.empty() || remote_tag.empty() | via_branch.empty()) {
84 ERROR("local_tag, callid, remote_tag or via_branch is empty");
85 return false;
86 }
87
88 unsigned int queue_bucket = hash(local_tag);
89
90 queues_mut[queue_bucket].lock();
91
92 if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) {
93 queues_mut[queue_bucket].unlock();
94 return false;
95 }
96
97 // try to find via id_lookup
98 string id = callid+remote_tag;
99 if(AmConfig::AcceptForkedDialogs){
100 id += via_branch;
101 }
102 unsigned int id_bucket = hash(id);
103
104 id_lookup_mut[id_bucket].lock();
105
106 if (id_lookup[id_bucket].find(id) !=
107 id_lookup[id_bucket].end()) {
108 id_lookup_mut[id_bucket].unlock();
109 queues_mut[queue_bucket].unlock();
110 return false;
111 }
112
113 queues[queue_bucket][local_tag] = QueueEntry(q,id);
114 id_lookup[id_bucket][id] = local_tag;
115
116 id_lookup_mut[id_bucket].unlock();
117 queues_mut[queue_bucket].unlock();
118
119 return true;
120 }
121
delEventQueue(const string & local_tag)122 AmEventQueueInterface* AmEventDispatcher::delEventQueue(const string& local_tag)
123 {
124 AmEventQueueInterface* q = NULL;
125 unsigned int queue_bucket = hash(local_tag);
126
127 queues_mut[queue_bucket].lock();
128
129 EvQueueMapIter qi = queues[queue_bucket].find(local_tag);
130 if(qi != queues[queue_bucket].end()) {
131
132 QueueEntry qe(qi->second);
133 queues[queue_bucket].erase(qi);
134 q = qe.q;
135
136 if(!qe.id.empty()) {
137 unsigned int id_bucket = hash(qe.id);
138
139 id_lookup_mut[id_bucket].lock();
140
141 DictIter di = id_lookup[id_bucket].find(qe.id);
142 if(di != id_lookup[id_bucket].end()) {
143 id_lookup[id_bucket].erase(di);
144 }
145
146 id_lookup_mut[id_bucket].unlock();
147 }
148 }
149 queues_mut[queue_bucket].unlock();
150
151 return q;
152 }
153
post(const string & local_tag,AmEvent * ev)154 bool AmEventDispatcher::post(const string& local_tag, AmEvent* ev)
155 {
156 bool posted = false;
157
158 unsigned int queue_bucket = hash(local_tag);
159
160 queues_mut[queue_bucket].lock();
161
162 EvQueueMapIter it = queues[queue_bucket].find(local_tag);
163 if(it != queues[queue_bucket].end()){
164 it->second.q->postEvent(ev);
165 posted = true;
166 }
167
168 queues_mut[queue_bucket].unlock();
169
170 return posted;
171 }
172
173
post(const string & callid,const string & remote_tag,const string & via_branch,AmEvent * ev)174 bool AmEventDispatcher::post(const string& callid,
175 const string& remote_tag,
176 const string& via_branch,
177 AmEvent* ev)
178 {
179 string id = callid+remote_tag;
180 if(AmConfig::AcceptForkedDialogs){
181 id += via_branch;
182 }
183 unsigned int id_bucket = hash(id);
184
185 id_lookup_mut[id_bucket].lock();
186
187 DictIter di = id_lookup[id_bucket].find(id);
188 if (di == id_lookup[id_bucket].end()) {
189 id_lookup_mut[id_bucket].unlock();
190 return false;
191 }
192 string local_tag = di->second;
193 id_lookup_mut[id_bucket].unlock();
194
195 return post(local_tag, ev);
196 }
197
broadcast(AmEvent * ev)198 bool AmEventDispatcher::broadcast(AmEvent* ev)
199 {
200 if (!ev)
201 return false;
202
203 bool posted = false;
204 for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
205 queues_mut[i].lock();
206
207 EvQueueMapIter it = queues[i].begin();
208 while (it != queues[i].end()) {
209 EvQueueMapIter this_evq = it;
210 it++;
211 queues_mut[i].unlock();
212 this_evq->second.q->postEvent(ev->clone());
213 queues_mut[i].lock();
214 posted = true;
215 }
216 queues_mut[i].unlock();
217 }
218
219 delete ev;
220
221 return posted;
222 }
223
empty()224 bool AmEventDispatcher::empty() {
225 bool res = true;
226 for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
227 queues_mut[i].lock();
228 res = res&queues[i].empty();
229 queues_mut[i].unlock();
230 if (!res)
231 break;
232 }
233 return res;
234 }
235
dump()236 void AmEventDispatcher::dump()
237 {
238 DBG("*** dumping Event dispatcher buckets ***\n");
239 for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
240 queues_mut[i].lock();
241 if(!queues[i].empty()) {
242 DBG("queues[%zu].size() = %zu",i,queues[i].size());
243 for(EvQueueMapIter it = queues[i].begin();
244 it != queues[i].end(); it++){
245 DBG("\t%s -> %p\n",it->first.c_str(),it->second.q);
246 }
247 }
248 queues_mut[i].unlock();
249
250 id_lookup_mut[i].lock();
251 if(!id_lookup[i].empty()) {
252 DBG("id_lookup[%zu].size() = %zu",i,id_lookup[i].size());
253 }
254 id_lookup_mut[i].unlock();
255 }
256 DBG("*** End of Event dispatcher bucket dump ***\n");
257 }
258
dispose()259 void AmEventDispatcher::dispose()
260 {
261 if(_instance != NULL) {
262 // todo: add locking here
263 _instance->dump();
264
265 delete _instance;
266 _instance = NULL;
267 }
268 }
269
270 /** this function optimizes posting of SIP Requests
271 - if the session does not exist, no event need to be created (req copied) */
postSipRequest(const AmSipRequest & req)272 bool AmEventDispatcher::postSipRequest(const AmSipRequest& req)
273 {
274 // get local tag
275 bool posted = false;
276 string callid = req.callid;
277 string remote_tag = req.from_tag;
278
279 string id = callid+remote_tag;
280 if(AmConfig::AcceptForkedDialogs){
281 id += req.via_branch;
282 }
283 unsigned int id_bucket = hash(id);
284
285 id_lookup_mut[id_bucket].lock();
286
287 DictIter di = id_lookup[id_bucket].find(id);
288 if (di == id_lookup[id_bucket].end()) {
289 id_lookup_mut[id_bucket].unlock();
290 return false;
291 }
292 string local_tag = di->second;
293 id_lookup_mut[id_bucket].unlock();
294
295 // post(local_tag)
296 unsigned int queue_bucket = hash(local_tag);
297
298 queues_mut[queue_bucket].lock();
299
300 EvQueueMapIter it = queues[queue_bucket].find(local_tag);
301 if(it != queues[queue_bucket].end()){
302 it->second.q->postEvent(new AmSipRequestEvent(req));
303 posted = true;
304 }
305
306 queues_mut[queue_bucket].unlock();
307
308 return posted;
309 }
310