1 #include <switch.h>
2 #include <zmq.hpp>
3 #include <exception>
4 #include <stdexcept>
5 #include <memory>
6
7 #include "mod_event_zmq.h"
8
9 namespace mod_event_zmq {
10
11 // Handles publishing events out to clients
12 class ZmqEventPublisher {
13 public:
ZmqEventPublisher(zmq::context_t & context)14 ZmqEventPublisher(zmq::context_t &context) :
15 _publisher(context, ZMQ_PUB)
16 {
17 _publisher.bind("tcp://*:5556");
18
19 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
20 }
21
PublishEvent(const switch_event_t * event)22 void PublishEvent(const switch_event_t *event) {
23 // Serialize the event into a JSON string
24 char* pjson;
25 switch_event_serialize_json(const_cast<switch_event_t*>(event), &pjson);
26
27 // Use the JSON string as the message body
28 zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
29
30 // Send the message
31 _publisher.send(msg);
32 }
33
34 private:
free_message_data(void * data,void * hint)35 static void free_message_data(void *data, void *hint) {
36 free (data);
37 }
38
39 zmq::socket_t _publisher;
40 };
41
42 class char_msg : public zmq::message_t {
43 public:
char_msg()44 char_msg() : zmq::message_t(sizeof(char)) { }
char_msg(char data)45 char_msg(char data) : zmq::message_t(sizeof(char)) {
46 *char_data() = data;
47 }
48
char_data()49 char* char_data() {
50 return static_cast<char*>(this->data());
51 }
52 };
53
54 // Handles global inititalization and teardown of the module
55 class ZmqModule {
56 public:
ZmqModule(switch_loadable_module_interface_t ** module_interface,switch_memory_pool_t * pool)57 ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
58 _context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) {
59
60 // Set up the term messaging connection
61 _term_rep.bind(TERM_URI);
62 _term_req.connect(TERM_URI);
63
64 // Subscribe to all switch events of any subclass
65 // Store a pointer to ourself in the user data
66 if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast<void*>(&_publisher), &_node)
67 != SWITCH_STATUS_SUCCESS) {
68 throw std::runtime_error("Couldn't bind to switch events.");
69 }
70 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to events\n");
71
72 // Create our module interface registration
73 *module_interface = switch_loadable_module_create_module_interface(pool, modname);
74
75 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module loaded\n");
76 }
77
Listen()78 void Listen() {
79 // All we do is sit here and block the run loop thread so it doesn't return
80 // it seems that if you want to keep your module running you can't return from the run loop
81
82 char_msg msg;
83 while(true) {
84 // Listen for term message
85 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n");
86 _term_rep.recv(&msg);
87 if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) {
88 // Ack term message
89 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n");
90
91 *msg.char_data() = MODULE_TERM_ACK_MESSAGE;
92 _term_rep.send(msg);
93
94 break;
95 }
96 }
97 }
98
Shutdown()99 void Shutdown() {
100 // Send term message
101 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n");
102 char_msg msg(MODULE_TERM_REQ_MESSAGE);
103 _term_req.send(msg);
104
105 while(true) {
106 // Wait for the term ack message
107 _term_req.recv(&msg);
108 if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) {
109 // Continue shutdown
110 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n");
111 break;
112 }
113 }
114 }
115
~ZmqModule()116 ~ZmqModule() {
117 // Unsubscribe from the switch events
118 switch_event_unbind(&_node);
119 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
120 }
121
122 private:
123 // Dispatches events to the publisher
event_handler(switch_event_t * event)124 static void event_handler(switch_event_t *event) {
125 try {
126 ZmqEventPublisher *publisher = static_cast<ZmqEventPublisher*>(event->bind_user_data);
127 publisher->PublishEvent(event);
128 } catch(std::exception ex) {
129 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
130 } catch(...) { // Exceptions must not propogate to C caller
131 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown error publishing event via 0MQ\n");
132 }
133 }
134
135 switch_event_node_t *_node;
136
137 zmq::context_t _context;
138 zmq::socket_t _term_rep;
139 zmq::socket_t _term_req;
140
141 ZmqEventPublisher _publisher;
142 };
143
144 //*****************************//
145 // GLOBALS //
146 //*****************************//
147 std::auto_ptr<ZmqModule> module;
148
149
150 //*****************************//
151 // Module interface funtions //
152 //*****************************//
SWITCH_MODULE_LOAD_FUNCTION(load)153 SWITCH_MODULE_LOAD_FUNCTION(load) {
154 try {
155 module.reset(new ZmqModule(module_interface, pool));
156 return SWITCH_STATUS_SUCCESS;
157 } catch(...) { // Exceptions must not propogate to C caller
158 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error loading 0MQ module\n");
159 return SWITCH_STATUS_GENERR;
160 }
161
162 }
163
SWITCH_MODULE_RUNTIME_FUNCTION(runtime)164 SWITCH_MODULE_RUNTIME_FUNCTION(runtime) {
165 try {
166 // Begin listening for clients
167 module->Listen();
168 } catch(std::exception &ex) {
169 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error listening for clients: %s\n", ex.what());
170 } catch(...) { // Exceptions must not propogate to C caller
171 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error listening for clients\n");
172 }
173
174 // Tell the switch to stop calling this runtime loop
175 return SWITCH_STATUS_TERM;
176 }
177
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown)178 SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
179 try {
180 // Tell the module to shutdown
181 module->Shutdown();
182
183 // Free the module object
184 module.reset();
185 } catch(std::exception &ex) {
186 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what());
187 } catch(...) { // Exceptions must not propogate to C caller
188 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n");
189 }
190 return SWITCH_STATUS_SUCCESS;
191 }
192
193 }
194