1 /*
2  * Copyright (C) 2007 Raphael Coeffic
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "AmEventDispatcher.h"
29 #include "AmSipEvent.h"
30 #include "AmConfig.h"
31 #include "sip/hash.h"
32 
hash(const string & s1)33 unsigned int AmEventDispatcher::hash(const string& s1)
34 {
35   return hashlittle(s1.c_str(),s1.length(),0)
36     & (EVENT_DISPATCHER_BUCKETS-1);
37 }
38 
hash(const string & s1,const string s2)39 unsigned int AmEventDispatcher::hash(const string& s1, const string s2)
40 {
41     unsigned int h=0;
42 
43     h = hashlittle(s1.c_str(),s1.length(),h);
44     h = hashlittle(s2.c_str(),s2.length(),h);
45 
46     return h & (EVENT_DISPATCHER_BUCKETS-1);
47 }
48 
49 AmEventDispatcher* AmEventDispatcher::_instance=NULL;
50 
instance()51 AmEventDispatcher* AmEventDispatcher::instance()
52 {
53   return _instance ? _instance : ((_instance = new AmEventDispatcher()));
54 }
55 
56 
addEventQueue(const string & local_tag,AmEventQueueInterface * q)57 bool AmEventDispatcher::addEventQueue(const string& local_tag,
58 				      AmEventQueueInterface* q)
59 {
60     unsigned int queue_bucket = hash(local_tag);
61 
62     queues_mut[queue_bucket].lock();
63 
64     if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) {
65       queues_mut[queue_bucket].unlock();
66       return false;
67     }
68 
69     queues[queue_bucket][local_tag] = QueueEntry(q);
70     queues_mut[queue_bucket].unlock();
71 
72     return true;
73 }
74 
75 
76 /** @return false on error */
addEventQueue(const string & local_tag,AmEventQueueInterface * q,const string & callid,const string & remote_tag,const string & via_branch)77 bool AmEventDispatcher::addEventQueue(const string& local_tag,
78 				      AmEventQueueInterface* q,
79 				      const string& callid,
80 				      const string& remote_tag,
81 				      const string& via_branch)
82 {
83     if(local_tag.empty () ||callid.empty() || remote_tag.empty() | via_branch.empty()) {
84       ERROR("local_tag, callid, remote_tag or via_branch is empty");
85       return false;
86     }
87 
88     unsigned int queue_bucket = hash(local_tag);
89 
90     queues_mut[queue_bucket].lock();
91 
92     if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) {
93       queues_mut[queue_bucket].unlock();
94       return false;
95     }
96 
97     // try to find via id_lookup
98     string id = callid+remote_tag;
99     if(AmConfig::AcceptForkedDialogs){
100       id += via_branch;
101     }
102     unsigned int id_bucket = hash(id);
103 
104     id_lookup_mut[id_bucket].lock();
105 
106     if (id_lookup[id_bucket].find(id) !=
107 	id_lookup[id_bucket].end()) {
108       id_lookup_mut[id_bucket].unlock();
109       queues_mut[queue_bucket].unlock();
110       return false;
111     }
112 
113     queues[queue_bucket][local_tag] = QueueEntry(q,id);
114     id_lookup[id_bucket][id] = local_tag;
115 
116     id_lookup_mut[id_bucket].unlock();
117     queues_mut[queue_bucket].unlock();
118 
119     return true;
120 }
121 
delEventQueue(const string & local_tag)122 AmEventQueueInterface* AmEventDispatcher::delEventQueue(const string& local_tag)
123 {
124     AmEventQueueInterface* q = NULL;
125     unsigned int queue_bucket = hash(local_tag);
126 
127     queues_mut[queue_bucket].lock();
128 
129     EvQueueMapIter qi = queues[queue_bucket].find(local_tag);
130     if(qi != queues[queue_bucket].end()) {
131 
132       QueueEntry qe(qi->second);
133       queues[queue_bucket].erase(qi);
134       q = qe.q;
135 
136       if(!qe.id.empty()) {
137 	unsigned int id_bucket = hash(qe.id);
138 
139 	id_lookup_mut[id_bucket].lock();
140 
141 	DictIter di = id_lookup[id_bucket].find(qe.id);
142 	if(di != id_lookup[id_bucket].end()) {
143 	  id_lookup[id_bucket].erase(di);
144 	}
145 
146 	id_lookup_mut[id_bucket].unlock();
147       }
148     }
149     queues_mut[queue_bucket].unlock();
150 
151     return q;
152 }
153 
post(const string & local_tag,AmEvent * ev)154 bool AmEventDispatcher::post(const string& local_tag, AmEvent* ev)
155 {
156     bool posted = false;
157 
158     unsigned int queue_bucket = hash(local_tag);
159 
160     queues_mut[queue_bucket].lock();
161 
162     EvQueueMapIter it = queues[queue_bucket].find(local_tag);
163     if(it != queues[queue_bucket].end()){
164 	it->second.q->postEvent(ev);
165 	posted = true;
166     }
167 
168     queues_mut[queue_bucket].unlock();
169 
170     return posted;
171 }
172 
173 
post(const string & callid,const string & remote_tag,const string & via_branch,AmEvent * ev)174 bool AmEventDispatcher::post(const string& callid,
175 			     const string& remote_tag,
176 			     const string& via_branch,
177 			     AmEvent* ev)
178 {
179     string id = callid+remote_tag;
180     if(AmConfig::AcceptForkedDialogs){
181       id += via_branch;
182     }
183     unsigned int id_bucket = hash(id);
184 
185     id_lookup_mut[id_bucket].lock();
186 
187     DictIter di = id_lookup[id_bucket].find(id);
188     if (di == id_lookup[id_bucket].end()) {
189       id_lookup_mut[id_bucket].unlock();
190       return false;
191     }
192     string local_tag = di->second;
193     id_lookup_mut[id_bucket].unlock();
194 
195     return post(local_tag, ev);
196 }
197 
broadcast(AmEvent * ev)198 bool AmEventDispatcher::broadcast(AmEvent* ev)
199 {
200     if (!ev)
201       return false;
202 
203     bool posted = false;
204     for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
205       queues_mut[i].lock();
206 
207       EvQueueMapIter it = queues[i].begin();
208       while (it != queues[i].end()) {
209 	EvQueueMapIter this_evq = it;
210 	it++;
211 	queues_mut[i].unlock();
212 	this_evq->second.q->postEvent(ev->clone());
213 	queues_mut[i].lock();
214 	posted = true;
215       }
216       queues_mut[i].unlock();
217     }
218 
219     delete ev;
220 
221     return posted;
222 }
223 
empty()224 bool AmEventDispatcher::empty() {
225     bool res = true;
226     for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
227       queues_mut[i].lock();
228       res = res&queues[i].empty();
229       queues_mut[i].unlock();
230       if (!res)
231 	break;
232     }
233     return res;
234 }
235 
dump()236 void AmEventDispatcher::dump()
237 {
238     DBG("*** dumping Event dispatcher buckets ***\n");
239     for (size_t i=0;i<EVENT_DISPATCHER_BUCKETS;i++) {
240       queues_mut[i].lock();
241       if(!queues[i].empty()) {
242 	DBG("queues[%zu].size() = %zu",i,queues[i].size());
243 	for(EvQueueMapIter it = queues[i].begin();
244 	    it != queues[i].end(); it++){
245 	  DBG("\t%s -> %p\n",it->first.c_str(),it->second.q);
246 	}
247       }
248       queues_mut[i].unlock();
249 
250       id_lookup_mut[i].lock();
251       if(!id_lookup[i].empty()) {
252 	DBG("id_lookup[%zu].size() = %zu",i,id_lookup[i].size());
253       }
254       id_lookup_mut[i].unlock();
255     }
256     DBG("*** End of Event dispatcher bucket dump ***\n");
257 }
258 
dispose()259 void AmEventDispatcher::dispose()
260 {
261   if(_instance != NULL) {
262     // todo: add locking here
263     _instance->dump();
264 
265     delete _instance;
266     _instance = NULL;
267   }
268 }
269 
270 /** this function optimizes posting of SIP Requests
271     - if the session does not exist, no event need to be created (req copied) */
postSipRequest(const AmSipRequest & req)272 bool AmEventDispatcher::postSipRequest(const AmSipRequest& req)
273 {
274     // get local tag
275     bool posted = false;
276     string callid = req.callid;
277     string remote_tag = req.from_tag;
278 
279     string id = callid+remote_tag;
280     if(AmConfig::AcceptForkedDialogs){
281       id += req.via_branch;
282     }
283     unsigned int id_bucket = hash(id);
284 
285     id_lookup_mut[id_bucket].lock();
286 
287     DictIter di = id_lookup[id_bucket].find(id);
288     if (di == id_lookup[id_bucket].end()) {
289       id_lookup_mut[id_bucket].unlock();
290       return false;
291     }
292     string local_tag = di->second;
293     id_lookup_mut[id_bucket].unlock();
294 
295     // post(local_tag)
296     unsigned int queue_bucket = hash(local_tag);
297 
298     queues_mut[queue_bucket].lock();
299 
300     EvQueueMapIter it = queues[queue_bucket].find(local_tag);
301     if(it != queues[queue_bucket].end()){
302 	it->second.q->postEvent(new AmSipRequestEvent(req));
303 	posted = true;
304     }
305 
306     queues_mut[queue_bucket].unlock();
307 
308     return posted;
309 }
310