1 #include <switch.h>
2 #include <zmq.hpp>
3 #include <exception>
4 #include <stdexcept>
5 #include <memory>
6 
7 #include "mod_event_zmq.h"
8 
9 namespace mod_event_zmq {
10 
11 // Handles publishing events out to clients
12 class ZmqEventPublisher {
13 public:
ZmqEventPublisher(zmq::context_t & context)14 	ZmqEventPublisher(zmq::context_t &context) :
15 		_publisher(context, ZMQ_PUB)
16 	{
17 		_publisher.bind("tcp://*:5556");
18 
19 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
20 	}
21 
PublishEvent(const switch_event_t * event)22 	void PublishEvent(const switch_event_t *event) {
23 		// Serialize the event into a JSON string
24 		char* pjson;
25 		switch_event_serialize_json(const_cast<switch_event_t*>(event), &pjson);
26 
27 		// Use the JSON string as the message body
28 		zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
29 
30 		// Send the message
31 		_publisher.send(msg);
32 	}
33 
34 private:
free_message_data(void * data,void * hint)35 	static void free_message_data(void *data, void *hint) {
36 		free (data);
37 	}
38 
39 	zmq::socket_t _publisher;
40 };
41 
42 class char_msg : public zmq::message_t {
43 public:
char_msg()44 	char_msg() : zmq::message_t(sizeof(char)) { }
char_msg(char data)45 	char_msg(char data) : zmq::message_t(sizeof(char)) {
46 		*char_data() = data;
47 	}
48 
char_data()49 	char* char_data() {
50 		return static_cast<char*>(this->data());
51 	}
52 };
53 
54 // Handles global inititalization and teardown of the module
55 class ZmqModule {
56 public:
ZmqModule(switch_loadable_module_interface_t ** module_interface,switch_memory_pool_t * pool)57 	ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
58 		_context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) {
59 
60 		// Set up the term messaging connection
61 		_term_rep.bind(TERM_URI);
62 		_term_req.connect(TERM_URI);
63 
64 		// Subscribe to all switch events of any subclass
65 		// Store a pointer to ourself in the user data
66 		if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast<void*>(&_publisher), &_node)
67 				!= SWITCH_STATUS_SUCCESS) {
68 			throw std::runtime_error("Couldn't bind to switch events.");
69 		}
70 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to events\n");
71 
72 		// Create our module interface registration
73 		*module_interface = switch_loadable_module_create_module_interface(pool, modname);
74 
75 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module loaded\n");
76 	}
77 
Listen()78 	void Listen() {
79 		// All we do is sit here and block the run loop thread so it doesn't return
80 		// it seems that if you want to keep your module running you can't return from the run loop
81 
82 		char_msg msg;
83 		while(true) {
84 			// Listen for term message
85 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n");
86 			_term_rep.recv(&msg);
87 			if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) {
88 				// Ack term message
89 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n");
90 
91 				*msg.char_data() = MODULE_TERM_ACK_MESSAGE;
92 				_term_rep.send(msg);
93 
94 				break;
95 			}
96 		}
97 	}
98 
Shutdown()99 	void Shutdown() {
100 		// Send term message
101 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n");
102 		char_msg msg(MODULE_TERM_REQ_MESSAGE);
103 		_term_req.send(msg);
104 
105 		while(true) {
106 			// Wait for the term ack message
107 			_term_req.recv(&msg);
108 			if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) {
109 				// Continue shutdown
110 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n");
111 				break;
112 			}
113 		}
114 	}
115 
~ZmqModule()116 	~ZmqModule() {
117 		// Unsubscribe from the switch events
118 		switch_event_unbind(&_node);
119 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
120 	}
121 
122 private:
123 	// Dispatches events to the publisher
event_handler(switch_event_t * event)124 	static void event_handler(switch_event_t *event) {
125 		try {
126 			ZmqEventPublisher *publisher = static_cast<ZmqEventPublisher*>(event->bind_user_data);
127 			publisher->PublishEvent(event);
128 		} catch(std::exception ex) {
129 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
130 		} catch(...) { // Exceptions must not propogate to C caller
131 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown error publishing event via 0MQ\n");
132 		}
133 	}
134 
135 	switch_event_node_t *_node;
136 
137 	zmq::context_t _context;
138 	zmq::socket_t _term_rep;
139 	zmq::socket_t _term_req;
140 
141 	ZmqEventPublisher _publisher;
142 };
143 
144 //*****************************//
145 //           GLOBALS           //
146 //*****************************//
147 std::auto_ptr<ZmqModule> module;
148 
149 
150 //*****************************//
151 //  Module interface funtions  //
152 //*****************************//
SWITCH_MODULE_LOAD_FUNCTION(load)153 SWITCH_MODULE_LOAD_FUNCTION(load) {
154 	try {
155 		module.reset(new ZmqModule(module_interface, pool));
156 		return SWITCH_STATUS_SUCCESS;
157 	} catch(...) { // Exceptions must not propogate to C caller
158 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module\n");
159 		return SWITCH_STATUS_GENERR;
160 	}
161 
162 }
163 
SWITCH_MODULE_RUNTIME_FUNCTION(runtime)164 SWITCH_MODULE_RUNTIME_FUNCTION(runtime) {
165 	try {
166 		// Begin listening for clients
167 		module->Listen();
168 	} catch(std::exception &ex) {
169 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error listening for clients: %s\n", ex.what());
170 	} catch(...) { // Exceptions must not propogate to C caller
171 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error listening for clients\n");
172 	}
173 
174 	// Tell the switch to stop calling this runtime loop
175 	return SWITCH_STATUS_TERM;
176 }
177 
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown)178 SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
179 	try {
180 		// Tell the module to shutdown
181 		module->Shutdown();
182 
183 		// Free the module object
184 		module.reset();
185 	} catch(std::exception &ex) {
186 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what());
187 	} catch(...) { // Exceptions must not propogate to C caller
188 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n");
189 	}
190 	return SWITCH_STATUS_SUCCESS;
191 }
192 
193 }
194