1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef __EVENT_H__
20 #define __EVENT_H__
21 
22 #include <string>
23 #include <set>
24 #include <deque>
25 #include <algorithm>
26 #ifdef GCC4
27 #   include <tr1/memory>
28 using namespace std::tr1;
29 #else
30 #   include <boost/shared_ptr.hpp>
31 using namespace boost;
32 #endif
33 
34 #include "log.h"
35 #include "blockingqueue.h"
36 #include "mutex.h"
37 #include "thread.h"
38 
39 using namespace std;
40 using namespace zk;
41 
42 namespace zkfuse {
43 
44 //forward declaration of EventSource
45 template<typename E>
46 class EventSource;
47 
48 /**
49  * \brief This interface is implemented by an observer
50  * \brief of a particular {@link EventSource}.
51  */
52 template<typename E>
53 class EventListener {
54     public:
55 
56         /**
57          * \brief This method is invoked whenever an event
58          * \brief has been received by the event source being observed.
59          *
60          * @param source the source the triggered the event
61          * @param e      the actual event being triggered
62          */
63         virtual void eventReceived(const EventSource<E> &source, const E &e) = 0;
64 };
65 
66 /**
67  * \brief This class represents a source of events.
68  *
69  * <p>
70  * Each source can have many observers (listeners) attached to it
71  * and in case of an event, this source may propagate the event
72  * using {@link #fireEvent} method.
73  */
74 template<typename E>
75 class EventSource {
76     public:
77 
78         /**
79          * \brief The type corresponding to the list of registered event listeners.
80          */
81         typedef set<EventListener<E> *> EventListeners;
82 
83         /**
84          * \brief Registers a new event listener.
85          *
86          * @param listener the listener to be added to the set of listeners
87          */
addListener(EventListener<E> * listener)88         void addListener(EventListener<E> *listener) {
89             m_listeners.insert( listener );
90         }
91 
92         /**
93          * \brief Removes an already registered listener.
94          *
95          * @param listener the listener to be removed
96          */
removeListener(EventListener<E> * listener)97         void removeListener(EventListener<E> *listener) {
98             m_listeners.erase( listener );
99         }
100 
101         /**
102          * \brief Destructor.
103          */
~EventSource()104         virtual ~EventSource() {}
105 
106     protected:
107 
108         /**
109          * \brief Fires the given event to all registered listeners.
110          *
111          * <p>
112          * This method essentially iterates over all listeners
113          * and invokes {@link fireEvent(EventListener<E> *listener, const E &event)}
114          * for each element. All derived classes are free to
115          * override the method to provide better error handling
116          * than the default implementation.
117          *
118          * @param event the event to be propagated to all listeners
119          */
120         void fireEvent(const E &event);
121 
122         /**
123          * \brief Sends an event to the given listener.
124          *
125          * @param listener the listener to whom pass the event
126          * @param event the event to be handled
127          */
128         virtual void fireEvent(EventListener<E> *listener, const E &event);
129 
130     private:
131 
132         /**
133          * The set of registered event listeners.
134          */
135         EventListeners m_listeners;
136 
137 };
138 
139 /**
140  * \brief The interface of a generic event wrapper.
141  */
142 class AbstractEventWrapper {
143     public:
144 
145         /**
146          * \brief Destructor.
147          */
~AbstractEventWrapper()148         virtual ~AbstractEventWrapper() {}
149 
150         /**
151          * \brief Returns the underlying wrapee's data.
152          */
153         virtual void *getWrapee() = 0;
154 };
155 
156 /**
157  * \brief A template based implementation of {@link AbstractEventWrapper}.
158  */
159 template<typename E>
160 class EventWrapper : public AbstractEventWrapper {
161     public:
EventWrapper(const E & e)162         EventWrapper(const E &e) : m_e(e) {
163         }
getWrapee()164         void *getWrapee() {
165             return &m_e;
166         }
167     private:
168         E m_e;
169 };
170 
171 /**
172  * \brief This class represents a generic event.
173  */
174 class GenericEvent {
175     public:
176 
177         /**
178          * \brief Constructor.
179          */
GenericEvent()180         GenericEvent() : m_type(0) {}
181 
182         /**
183          * \brief Constructor.
184          *
185          * @param type the type of this event
186          * @param eventWarpper the wrapper around event's data
187          */
GenericEvent(int type,AbstractEventWrapper * eventWrapper)188         GenericEvent(int type, AbstractEventWrapper *eventWrapper) :
189             m_type(type), m_eventWrapper(eventWrapper) {
190         }
191 
192         /**
193          * \brief Returns the type of this event.
194          *
195          * @return type of this event
196          */
getType()197         int getType() const { return m_type; }
198 
199         /**
200          * \brief Returns the event's data.
201          *
202          * @return the event's data
203          */
getEvent()204         void *getEvent() const { return m_eventWrapper->getWrapee(); }
205 
206     private:
207 
208         /**
209          * The event type.
210          */
211         int m_type;
212 
213         /**
214          * The event represented as abstract wrapper.
215          */
216         boost::shared_ptr<AbstractEventWrapper> m_eventWrapper;
217 
218 };
219 
220 /**
221  * \brief This class adapts {@link EventListener} to a generic listener.
222  * Essentially this class listens on incoming events and fires them
223  * as {@link GenericEvent}s.
224  */
225 template<typename E, const int type>
226 class EventListenerAdapter : public virtual EventListener<E>,
227                              public virtual EventSource<GenericEvent>
228 {
229     public:
230 
231         /**
232          * \brief Constructor.
233          *
234          * @param eventSource the source on which register this listener
235          */
EventListenerAdapter(EventSource<E> & eventSource)236         EventListenerAdapter(EventSource<E> &eventSource) {
237             eventSource.addListener(this);
238         }
239 
eventReceived(const EventSource<E> & source,const E & e)240         void eventReceived(const EventSource<E> &source, const E &e) {
241             AbstractEventWrapper *wrapper = new EventWrapper<E>(e);
242             GenericEvent event(type, wrapper);
243             fireEvent( event );
244         }
245 
246 };
247 
248 /**
249  * \brief This class provides an adapter between an asynchronous and synchronous
250  * \brief event handling.
251  *
252  * <p>
253  * This class queues up all received events and exposes them through
254  * {@link #getNextEvent()} method.
255  */
256 template<typename E>
257 class SynchronousEventAdapter : public EventListener<E> {
258     public:
259 
eventReceived(const EventSource<E> & source,const E & e)260         void eventReceived(const EventSource<E> &source, const E &e) {
261             m_queue.put( e );
262         }
263 
264         /**
265          * \brief Returns the next available event from the underlying queue,
266          * \brief possibly blocking, if no data is available.
267          *
268          * @return the next available event
269          */
getNextEvent()270         E getNextEvent() {
271             return m_queue.take();
272         }
273 
274         /**
275          * \brief Returns whether there are any events in the queue or not.
276          *
277          * @return true if there is at least one event and
278          *         the next call to {@link #getNextEvent} won't block
279          */
hasEvents()280         bool hasEvents() const {
281             return (m_queue.empty() ? false : true);
282         }
283 
284         /**
285          * \brief Destructor.
286          */
~SynchronousEventAdapter()287         virtual ~SynchronousEventAdapter() {}
288 
289     private:
290 
291         /**
292          * The blocking queue of all events received so far.
293          */
294         BlockingQueue<E> m_queue;
295 
296 };
297 
298 /**
299  * This typedef defines the type of a timer Id.
300  */
301 typedef int32_t TimerId;
302 
303 /**
304  * This class represents a timer event parametrized by the user's data type.
305  */
306 template<typename T>
307 class TimerEvent {
308     public:
309 
310         /**
311          * \brief Constructor.
312          *
313          * @param id the ID of this event
314          * @param alarmTime when this event is to be triggered
315          * @param userData the user data associated with this event
316          */
TimerEvent(TimerId id,int64_t alarmTime,const T & userData)317         TimerEvent(TimerId id, int64_t alarmTime, const T &userData) :
318             m_id(id), m_alarmTime(alarmTime), m_userData(userData)
319         {}
320 
321         /**
322          * \brief Constructor.
323          */
TimerEvent()324         TimerEvent() : m_id(-1), m_alarmTime(-1) {}
325 
326         /**
327          * \brief Returns the ID.
328          *
329          * @return the ID of this event
330          */
getID()331         TimerId getID() const { return m_id; }
332 
333         /**
334          * \brief Returns the alarm time.
335          *
336          * @return the alarm time
337          */
getAlarmTime()338         int64_t getAlarmTime() const { return m_alarmTime; }
339 
340         /**
341          * \brief Returns the user's data.
342          *
343          * @return the user's data
344          */
getUserData()345         T const &getUserData() const { return m_userData; }
346 
347         /**
348          * \brief Returns whether the given alarm time is less than this event's
349          * \brief time.
350          */
351         bool operator<(const int64_t alarmTime) const {
352             return m_alarmTime < alarmTime;
353         }
354 
355     private:
356 
357         /**
358          * The ID of ths event.
359          */
360         TimerId m_id;
361 
362         /**
363          * The time at which this event triggers.
364          */
365         int64_t m_alarmTime;
366 
367         /**
368          * The user specific data associated with this event.
369          */
370         T m_userData;
371 
372 };
373 
374 template<typename T>
375 class Timer : public EventSource<TimerEvent<T> > {
376     public:
377 
378         /**
379          * \brief Constructor.
380          */
Timer()381         Timer() : m_currentEventID(0), m_terminating(false) {
382             m_workerThread.Create( *this, &Timer<T>::sendAlarms );
383         }
384 
385         /**
386          * \brief Destructor.
387          */
~Timer()388         ~Timer() {
389             m_terminating = true;
390             m_lock.notify();
391             m_workerThread.Join();
392         }
393 
394         /**
395          * \brief Schedules the given event <code>timeFromNow</code> milliseconds.
396          *
397          * @param timeFromNow time from now, in milliseconds, when the event
398          *                    should be triggered
399          * @param userData the user data associated with the timer event
400          *
401          * @return the ID of the newly created timer event
402          */
scheduleAfter(int64_t timeFromNow,const T & userData)403         TimerId scheduleAfter(int64_t timeFromNow, const T &userData) {
404             return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData );
405         }
406 
407         /**
408          * \brief Schedules an event at the given time.
409          *
410          * @param absTime absolute time, in milliseconds, at which the event
411          *                should be triggered; the time is measured
412          *                from Jan 1st, 1970
413          * @param userData the user data associated with the timer event
414          *
415          * @return the ID of the newly created timer event
416          */
scheduleAt(int64_t absTime,const T & userData)417         TimerId scheduleAt(int64_t absTime, const T &userData) {
418             m_lock.lock();
419             typename QueueType::iterator pos =
420                     lower_bound( m_queue.begin(), m_queue.end(), absTime );
421             TimerId id = m_currentEventID++;
422             TimerEvent<T> event(id, absTime, userData);
423             m_queue.insert( pos, event );
424             m_lock.notify();
425             m_lock.unlock();
426             return id;
427         }
428 
429         /**
430          * \brief Returns the current time since Jan 1, 1970, in milliseconds.
431          *
432          * @return the current time in milliseconds
433          */
getCurrentTimeMillis()434         static int64_t getCurrentTimeMillis() {
435             struct timeval now;
436             gettimeofday( &now, NULL );
437             return now.tv_sec * 1000LL + now.tv_usec / 1000;
438         }
439 
440         /**
441          * \brief Cancels the given timer event.
442          *
443          *
444          * @param eventID the ID of the event to be canceled
445          *
446          * @return whether the event has been canceled
447          */
cancelAlarm(TimerId eventID)448         bool cancelAlarm(TimerId eventID) {
449             bool canceled = false;
450             m_lock.lock();
451             typename QueueType::iterator i;
452             for (i = m_queue.begin(); i != m_queue.end(); ++i) {
453                 if (eventID == i->getID()) {
454                     m_queue.erase( i );
455                     canceled = true;
456                     break;
457                 }
458             }
459             m_lock.unlock();
460             return canceled;
461         }
462 
463         /**
464          * Executes the main loop of the worker thread.
465          */
sendAlarms()466         void sendAlarms() {
467             //iterate until terminating
468             while (!m_terminating) {
469                 m_lock.lock();
470                 //1 step - wait until there is an event in the queue
471                 if (m_queue.empty()) {
472                     //wait up to 100ms to get next event
473                     m_lock.wait( 100 );
474                 }
475                 bool fire = false;
476                 if (!m_queue.empty()) {
477                     //retrieve the event from the queue and send it
478                     TimerEvent<T> event = m_queue.front();
479                     //check whether we can send it right away
480                     int64_t timeToWait =
481                         event.getAlarmTime() - getCurrentTimeMillis();
482                     if (timeToWait <= 0) {
483                         m_queue.pop_front();
484                         //we fire only if it's still in the queue and alarm
485                         //time has just elapsed (in case the top event
486                         //is canceled)
487                         fire = true;
488                     } else {
489                         m_lock.wait( timeToWait );
490                     }
491                     m_lock.unlock();
492                     if (fire) {
493                         fireEvent( event );
494                     }
495                 } else {
496                     m_lock.unlock();
497                 }
498             }
499         }
500 
501     private:
502 
503         /**
504          * The type of timer events queue.
505          */
506         typedef deque<TimerEvent<T> > QueueType;
507 
508         /**
509          * The current event ID, auto-incremented each time a new event
510          * is created.
511          */
512         TimerId m_currentEventID;
513 
514         /**
515          * The queue of timer events sorted by {@link TimerEvent#alarmTime}.
516          */
517         QueueType m_queue;
518 
519         /**
520          * The lock used to guard {@link #m_queue}.
521          */
522         Lock m_lock;
523 
524         /**
525          * The thread that triggers alarms.
526          */
527         CXXThread<Timer<T> > m_workerThread;
528 
529         /**
530          * Whether {@link #m_workerThread}  is terminating.
531          */
532         volatile bool m_terminating;
533 
534 };
535 
536 template<typename E>
fireEvent(const E & event)537 void EventSource<E>::fireEvent(const E &event) {
538     for (typename EventListeners::iterator i = m_listeners.begin();
539          i != m_listeners.end();
540          ++i)
541     {
542         fireEvent( *i, event );
543     }
544 }
545 
546 template<typename E>
fireEvent(EventListener<E> * listener,const E & event)547 void EventSource<E>::fireEvent(EventListener<E> *listener, const E &event) {
548     listener->eventReceived( *this, event );
549 }
550 
551 }   /* end of 'namespace zkfuse' */
552 
553 #endif /* __EVENT_H__ */
554