1 #include <nchan_websocket_publisher.h>
2 #include <nchan_types.h>
3 #include <util/nchan_output.h>
4 #include <nchan_variables.h>
5 #include <store/memory/store.h>
6 #include <store/redis/store.h>
7 #if (NGX_ZLIB)
8 #include <zlib.h>
9 #endif
10
11 static char *nchan_set_complex_value_array(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, nchan_complex_value_arr_t *chid);
12 static ngx_int_t set_complex_value_array_size1(ngx_conf_t *cf, nchan_complex_value_arr_t *chid, char *val);
13
14 static ngx_str_t DEFAULT_CHANNEL_EVENT_STRING = ngx_string("$nchan_channel_event $nchan_channel_id");
15
16 static ngx_str_t DEFAULT_SUBSCRIBER_INFO_STRING = ngx_string("$nchan_subscriber_type $remote_addr:$remote_port $http_user_agent $server_name $request_uri $pid");
17
18 nchan_store_t *default_storage_engine = &nchan_store_memory;
19 ngx_flag_t global_nchan_enabled = 0;
20 ngx_flag_t global_redis_enabled = 0;
21 ngx_flag_t global_zstream_needed = 0;
22 ngx_flag_t global_benchmark_enabled = 0;
23 void *global_owner_cycle = NULL;
24
25 #define MERGE_UNSET_CONF(conf, prev, unset, default) \
26 if (conf == unset) { \
27 conf = (prev == unset) ? default : prev; \
28 }
29
30 #define MERGE_CONF(cf, prev_cf, name) if((cf)->name == NULL) { (cf)->name = (prev_cf)->name; }
31
nchan_init_module(ngx_cycle_t * cycle)32 static ngx_int_t nchan_init_module(ngx_cycle_t *cycle) {
33 if(global_owner_cycle && global_owner_cycle != ngx_cycle) {
34 global_nchan_enabled = 0;
35 global_redis_enabled = 0;
36 global_zstream_needed = 0;
37 global_benchmark_enabled = 0;
38 }
39 global_owner_cycle = (void *)ngx_cycle;
40
41 if(!global_nchan_enabled) {
42 return NGX_OK;
43 }
44 ngx_core_conf_t *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
45 nchan_worker_processes = ccf->worker_processes;
46
47 //initialize storage engines
48 nchan_store_memory.init_module(cycle);
49 if(global_benchmark_enabled) {
50 nchan_benchmark_init_module(cycle);
51 }
52 if(global_redis_enabled) {
53 nchan_store_redis.init_module(cycle);
54 }
55 return NGX_OK;
56 }
57
nchan_init_worker(ngx_cycle_t * cycle)58 static ngx_int_t nchan_init_worker(ngx_cycle_t *cycle) {
59 if(!global_nchan_enabled) {
60 return NGX_OK;
61 }
62 if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) {
63 //not a worker, stop initializing stuff.
64 return NGX_OK;
65 }
66
67 if(nchan_store_memory.init_worker(cycle)!=NGX_OK) {
68 return NGX_ERROR;
69 }
70 if(global_benchmark_enabled) {
71 nchan_benchmark_init_worker(cycle);
72 }
73
74 if(global_redis_enabled && nchan_store_redis.init_worker(cycle)!=NGX_OK) {
75 return NGX_ERROR;
76 }
77
78 nchan_websocket_publisher_llist_init();
79 nchan_output_init();
80
81 return NGX_OK;
82 }
83
nchan_preconfig(ngx_conf_t * cf)84 static ngx_int_t nchan_preconfig(ngx_conf_t *cf) {
85 global_owner_cycle = (void *)ngx_cycle;
86 global_nchan_enabled = 0;
87 return nchan_add_variables(cf);
88 }
89
nchan_postconfig(ngx_conf_t * cf)90 static ngx_int_t nchan_postconfig(ngx_conf_t *cf) {
91 global_owner_cycle = (void *)ngx_cycle;
92 if(nchan_store_memory.init_postconfig(cf)!=NGX_OK) {
93 return NGX_ERROR;
94 }
95 if(global_redis_enabled && nchan_store_redis.init_postconfig(cf)!=NGX_OK) {
96 return NGX_ERROR;
97 }
98
99 #if (NGX_ZLIB)
100 if(global_zstream_needed) {
101 nchan_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_nchan_module);
102 nchan_common_deflate_init(mcf);
103 }
104 #endif
105
106 global_nchan_enabled = 1;
107
108 return NGX_OK;
109 }
110
111 //main config
nchan_create_main_conf(ngx_conf_t * cf)112 static void * nchan_create_main_conf(ngx_conf_t *cf) {
113 nchan_main_conf_t *mcf = ngx_pcalloc(cf->pool, sizeof(*mcf));
114 if(mcf == NULL) {
115 return NGX_CONF_ERROR;
116 }
117
118 static ngx_path_init_t nchan_temp_path = { ngx_string(NGX_HTTP_CLIENT_TEMP_PATH), { 0, 0, 0 } };
119 ngx_conf_merge_path_value(cf, &mcf->message_temp_path, NULL, &nchan_temp_path);
120
121 nchan_store_memory.create_main_conf(cf, mcf);
122 nchan_store_redis.create_main_conf(cf, mcf);
123
124 #if (NGX_ZLIB)
125 mcf->zlib_params.level = Z_DEFAULT_COMPRESSION;
126 mcf->zlib_params.windowBits = 10;
127 mcf->zlib_params.memLevel = 8;
128 mcf->zlib_params.strategy = Z_DEFAULT_STRATEGY;
129 #endif
130
131 return mcf;
132 }
133
nchan_create_srv_conf(ngx_conf_t * cf)134 static void *nchan_create_srv_conf(ngx_conf_t *cf) {
135 nchan_srv_conf_t *scf = ngx_pcalloc(cf->pool, sizeof(*scf));
136 if(scf == NULL) {
137 return NGX_CONF_ERROR;
138 }
139 scf->redis.connect_timeout = NGX_CONF_UNSET_MSEC;
140 scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_UNSET;
141 scf->redis.master_weight = NGX_CONF_UNSET;
142 scf->redis.slave_weight = NGX_CONF_UNSET;
143 scf->redis.blacklist_count = NGX_CONF_UNSET;
144 scf->redis.blacklist = NULL;
145 scf->upstream_nchan_loc_conf = NULL;
146 return scf;
147 }
148
nchan_merge_srv_conf(ngx_conf_t * cf,void * parent,void * child)149 static char *nchan_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) {
150 nchan_srv_conf_t *prev = parent, *conf = child;
151 ngx_conf_merge_msec_value(conf->redis.connect_timeout, prev->redis.connect_timeout, NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC);
152 MERGE_UNSET_CONF(conf->redis.optimize_target, prev->redis.optimize_target, NCHAN_REDIS_OPTIMIZE_UNSET, NCHAN_REDIS_OPTIMIZE_CPU);
153 ngx_conf_merge_value(conf->redis.master_weight, prev->redis.master_weight, 1);
154 ngx_conf_merge_value(conf->redis.slave_weight, prev->redis.slave_weight, 1);
155 ngx_conf_merge_value(conf->redis.blacklist_count, prev->redis.blacklist_count, 0);
156 if(conf->redis.blacklist == NULL) {
157 conf->redis.blacklist = prev->redis.blacklist;
158 }
159 return NGX_CONF_OK;
160 }
161
162 //location config stuff
nchan_create_loc_conf(ngx_conf_t * cf)163 static void *nchan_create_loc_conf(ngx_conf_t *cf) {
164 nchan_loc_conf_t *lcf = ngx_pcalloc(cf->pool, sizeof(*lcf));
165 if(lcf == NULL) {
166 return NGX_CONF_ERROR;
167 }
168
169 lcf->pub.http=0;
170 lcf->pub.websocket=0;
171
172 lcf->sub.poll=0;
173 lcf->sub.longpoll=0;
174 lcf->sub.eventsource=0;
175 lcf->sub.websocket=0;
176 lcf->sub.http_chunked=0;
177
178 // lcf->group is already zeroed
179 lcf->group.enable_accounting = NGX_CONF_UNSET;
180
181 lcf->shared_data_index=NGX_CONF_UNSET;
182
183 lcf->authorize_request_url = NULL;
184 lcf->publisher_upstream_request_url = NULL;
185 lcf->unsubscribe_request_url = NULL;
186 lcf->subscribe_request_url = NULL;
187 lcf->channel_group = NULL;
188
189 lcf->message_timeout=NGX_CONF_UNSET;
190 lcf->max_messages=NGX_CONF_UNSET;
191
192 lcf->complex_message_timeout = NULL;
193 lcf->complex_max_messages = NULL;
194
195 lcf->subscriber_first_message=NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET;
196
197 lcf->subscriber_info_string=NULL;
198 lcf->subscriber_info_location=NGX_CONF_UNSET;
199
200 lcf->subscriber_timeout=NGX_CONF_UNSET;
201 lcf->subscribe_only_existing_channel=NGX_CONF_UNSET;
202 lcf->redis_idle_channel_cache_timeout=NGX_CONF_UNSET;
203 lcf->max_channel_id_length=NGX_CONF_UNSET;
204 lcf->max_channel_subscribers=NGX_CONF_UNSET;
205 lcf->channel_timeout=NGX_CONF_UNSET;
206 lcf->storage_engine=NULL;
207
208 lcf->websocket_ping_interval=NGX_CONF_UNSET;
209
210 lcf->eventsource_ping.interval=NGX_CONF_UNSET;
211
212 lcf->msg_in_etag_only = NGX_CONF_UNSET;
213
214 lcf->allow_origin = NULL;
215 lcf->allow_credentials = NGX_CONF_UNSET;
216
217 lcf->channel_events_channel_id = NULL;
218 lcf->channel_event_string = NULL;
219
220 lcf->websocket_heartbeat.enabled=NGX_CONF_UNSET;
221
222 lcf->message_compression = NCHAN_MSG_COMPRESSION_INVALID;
223
224 lcf->longpoll_multimsg=NGX_CONF_UNSET;
225 lcf->longpoll_multimsg_use_raw_stream_separator=NGX_CONF_UNSET;
226
227 ngx_memzero(&lcf->pub_chid, sizeof(nchan_complex_value_arr_t));
228 ngx_memzero(&lcf->sub_chid, sizeof(nchan_complex_value_arr_t));
229 ngx_memzero(&lcf->pubsub_chid, sizeof(nchan_complex_value_arr_t));
230 ngx_memzero(&lcf->last_message_id, sizeof(nchan_complex_value_arr_t));
231
232 ngx_memzero(&lcf->redis, sizeof(lcf->redis));
233 lcf->redis.url_enabled=NGX_CONF_UNSET;
234 lcf->redis.ping_interval = NGX_CONF_UNSET;
235 lcf->redis.cluster_check_interval=NGX_CONF_UNSET;
236 lcf->redis.upstream_inheritable=NGX_CONF_UNSET;
237 lcf->redis.storage_mode = REDIS_MODE_CONF_UNSET;
238 lcf->redis.nostore_fastpublish = NGX_CONF_UNSET;
239 lcf->redis.privdata = NULL;
240 lcf->redis.nodeset = NULL;
241
242 lcf->request_handler = NULL;
243
244 lcf->benchmark.time = NGX_CONF_UNSET;
245 lcf->benchmark.msgs_per_minute = NGX_CONF_UNSET;
246 lcf->benchmark.msg_padding = NGX_CONF_UNSET;
247 lcf->benchmark.channels = NGX_CONF_UNSET;
248 lcf->benchmark.subscribers_per_channel = NGX_CONF_UNSET;
249 lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET;
250 lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET;
251 return lcf;
252 }
253
create_complex_value_from_ngx_str(ngx_conf_t * cf,ngx_http_complex_value_t ** dst_cv,ngx_str_t * str)254 static char * create_complex_value_from_ngx_str(ngx_conf_t *cf, ngx_http_complex_value_t **dst_cv, ngx_str_t *str) {
255 ngx_http_complex_value_t *cv;
256 ngx_http_compile_complex_value_t ccv;
257
258 cv = ngx_palloc(cf->pool, sizeof(*cv));
259 if (cv == NULL) {
260 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "unable to allocate space for complex value");
261 return NGX_CONF_ERROR;
262 }
263
264 ngx_memzero(&ccv, sizeof(ccv));
265
266 ccv.cf = cf;
267 ccv.value = str;
268 ccv.complex_value = cv;
269
270 if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
271 return NGX_CONF_ERROR;
272 }
273
274 *dst_cv = cv;
275 return NGX_CONF_OK;
276 }
277
is_pub_location(nchan_loc_conf_t * lcf)278 static int is_pub_location(nchan_loc_conf_t *lcf) {
279 return lcf->pub.http || lcf->pub.websocket;
280 }
is_sub_location(nchan_loc_conf_t * lcf)281 static int is_sub_location(nchan_loc_conf_t *lcf) {
282 nchan_conf_subscriber_types_t s = lcf->sub;
283 return s.poll || s.http_raw_stream || s.longpoll || s.http_chunked || s.http_multipart || s.eventsource || s.websocket;
284 }
is_group_location(nchan_loc_conf_t * lcf)285 static int is_group_location(nchan_loc_conf_t *lcf) {
286 return lcf->group.get || lcf->group.set || lcf->group.delete;
287 }
288
is_valid_location(ngx_conf_t * cf,nchan_loc_conf_t * lcf)289 static int is_valid_location(ngx_conf_t *cf, nchan_loc_conf_t *lcf) {
290
291 if(is_group_location(lcf)) {
292 if(is_pub_location(lcf) && is_sub_location(lcf)) {
293 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a publisher and subscriber location and also be a group access location (nchan_group + nchan_publisher, nchan_subscriber or nchan_pubsub)");
294 return 0;
295 }
296 else if(is_pub_location(lcf)) {
297 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a publisher location and also be a group access location (nchan_group + nchan_publisher)");
298 return 0;
299 }
300 else if(is_sub_location(lcf)) {
301 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a subscriber location and also be a group access location (nchan_group + nchan_subscriber)");
302 return 0;
303 }
304 }
305 return 1;
306 }
307
ngx_conf_set_redis_upstream(ngx_conf_t * cf,ngx_str_t * url,void * conf)308 static char *ngx_conf_set_redis_upstream(ngx_conf_t *cf, ngx_str_t *url, void *conf) {
309 ngx_url_t upstream_url;
310 nchan_loc_conf_t *lcf = conf;
311 if (lcf->redis.upstream) {
312 return "is duplicate";
313 }
314
315 ngx_memzero(&upstream_url, sizeof(upstream_url));
316 upstream_url.url = *url;
317 upstream_url.no_resolve = 1;
318
319 if ((lcf->redis.upstream = ngx_http_upstream_add(cf, &upstream_url, 0)) == NULL) {
320 return NGX_CONF_ERROR;
321 }
322
323 lcf->redis.enabled = 1;
324 global_redis_enabled = 1;
325 nchan_store_redis_add_active_loc_conf(cf, lcf);
326
327 return NGX_CONF_OK;
328 }
329
nchan_setup_handler(ngx_conf_t * cf,ngx_int_t (* handler)(ngx_http_request_t *))330 static char *nchan_setup_handler(ngx_conf_t *cf, ngx_int_t (*handler)(ngx_http_request_t *)) {
331 ngx_http_core_loc_conf_t *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
332 //nchan_loc_conf_t *plcf = conf;
333 clcf->handler = handler;
334 clcf->if_modified_since = NGX_HTTP_IMS_OFF;
335
336 return NGX_CONF_OK;
337 }
338
nchan_loc_conf_get_upstream_lcf(nchan_loc_conf_t * conf,nchan_loc_conf_t * prev)339 static nchan_loc_conf_t *nchan_loc_conf_get_upstream_lcf(nchan_loc_conf_t *conf, nchan_loc_conf_t *prev) {
340 nchan_redis_conf_t *rcf = &conf->redis, *prev_rcf = &prev->redis;
341 if(rcf->upstream == prev_rcf->upstream || rcf->upstream == NULL) {
342 //same or no upstream, so don't bother
343 return NULL;
344 }
345 else {
346 assert(rcf->upstream);
347 nchan_srv_conf_t *upstream_scf = ngx_http_conf_upstream_srv_conf(rcf->upstream, ngx_nchan_module);
348 if(upstream_scf && upstream_scf->upstream_nchan_loc_conf) {
349 return upstream_scf->upstream_nchan_loc_conf;
350 }
351 else {
352 //ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "nchan upstream srv_conf loc_conf ptr is null");
353 return NULL;
354 }
355 }
356 }
357
358
nchan_merge_loc_conf(ngx_conf_t * cf,void * parent,void * child)359 static char * nchan_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
360 nchan_loc_conf_t *prev = parent, *conf = child;
361 nchan_loc_conf_t *up = nchan_loc_conf_get_upstream_lcf(conf, prev);
362 //publisher types
363 ngx_conf_merge_bitmask_value(conf->pub.http, prev->pub.http, 0);
364 ngx_conf_merge_bitmask_value(conf->pub.websocket, prev->pub.websocket, 0);
365
366 //subscriber types
367 ngx_conf_merge_bitmask_value(conf->sub.poll, prev->sub.poll, 0);
368 ngx_conf_merge_bitmask_value(conf->sub.longpoll, prev->sub.longpoll, 0);
369 ngx_conf_merge_bitmask_value(conf->sub.eventsource, prev->sub.eventsource, 0);
370 ngx_conf_merge_bitmask_value(conf->sub.http_chunked, prev->sub.http_chunked, 0);
371 ngx_conf_merge_bitmask_value(conf->sub.websocket, prev->sub.websocket, 0);
372
373 //group request types
374 ngx_conf_merge_bitmask_value(conf->group.get, prev->group.get, 0);
375 ngx_conf_merge_bitmask_value(conf->group.set, prev->group.set, 0);
376 ngx_conf_merge_bitmask_value(conf->group.delete, prev->group.delete, 0);
377
378 ngx_conf_merge_value(conf->group.enable_accounting, prev->group.enable_accounting, 0);
379
380 //validate location
381 if(!is_valid_location(cf, conf)) {
382 return NGX_CONF_ERROR;
383 }
384
385 MERGE_UNSET_CONF(conf->message_compression, prev->message_compression, NCHAN_MSG_COMPRESSION_INVALID, NCHAN_MSG_NO_COMPRESSION);
386
387 ngx_conf_merge_sec_value(conf->message_timeout, prev->message_timeout, NCHAN_DEFAULT_MESSAGE_TIMEOUT);
388 ngx_conf_merge_value(conf->max_messages, prev->max_messages, NCHAN_DEFAULT_MAX_MESSAGES);
389
390 MERGE_CONF(conf, prev, complex_message_timeout);
391 MERGE_CONF(conf, prev, complex_max_messages);
392
393 if (conf->subscriber_first_message == NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET) {
394 conf->subscriber_first_message = (prev->subscriber_first_message == NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET) ? NCHAN_SUBSCRIBER_DEFAULT_FIRST_MESSAGE : prev->subscriber_first_message;
395 }
396
397 MERGE_CONF(conf, prev, subscriber_info_string);
398 if(conf->subscriber_info_string == NULL) { //still null? use the default string
399 if(create_complex_value_from_ngx_str(cf, &conf->subscriber_info_string, &DEFAULT_SUBSCRIBER_INFO_STRING) == NGX_CONF_ERROR) {
400 return NGX_CONF_ERROR;
401 }
402 }
403
404 ngx_conf_merge_value(conf->subscriber_info_location, prev->subscriber_info_location, 0);
405
406 ngx_conf_merge_sec_value(conf->websocket_ping_interval, prev->websocket_ping_interval, NCHAN_DEFAULT_SUBSCRIBER_PING_INTERVAL);
407
408 ngx_conf_merge_sec_value(conf->eventsource_ping.interval, prev->eventsource_ping.interval, NCHAN_DEFAULT_SUBSCRIBER_PING_INTERVAL);
409 ngx_conf_merge_str_value(conf->eventsource_ping.data, prev->eventsource_ping.comment, "");
410 ngx_conf_merge_str_value(conf->eventsource_ping.event, prev->eventsource_ping.event, "ping");
411 ngx_conf_merge_str_value(conf->eventsource_ping.data, prev->eventsource_ping.data, "");
412
413 ngx_conf_merge_sec_value(conf->subscriber_timeout, prev->subscriber_timeout, NCHAN_DEFAULT_SUBSCRIBER_TIMEOUT);
414 ngx_conf_merge_sec_value(conf->redis_idle_channel_cache_timeout, prev->redis_idle_channel_cache_timeout, NCHAN_DEFAULT_REDIS_IDLE_CHANNEL_CACHE_TIMEOUT);
415
416 ngx_conf_merge_value(conf->subscribe_only_existing_channel, prev->subscribe_only_existing_channel, 0);
417 ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NCHAN_MAX_CHANNEL_ID_LENGTH);
418 ngx_conf_merge_value(conf->max_channel_subscribers, prev->max_channel_subscribers, 0);
419 ngx_conf_merge_value(conf->channel_timeout, prev->channel_timeout, NCHAN_DEFAULT_CHANNEL_TIMEOUT);
420
421 ngx_conf_merge_str_value(conf->subscriber_http_raw_stream_separator, prev->subscriber_http_raw_stream_separator, "\n");
422
423 ngx_conf_merge_str_value(conf->channel_id_split_delimiter, prev->channel_id_split_delimiter, "");
424 MERGE_CONF(conf, prev, allow_origin);
425 ngx_conf_merge_value(conf->allow_credentials, prev->allow_credentials, 1);
426 ngx_conf_merge_str_value(conf->eventsource_event, prev->eventsource_event, "");
427 ngx_conf_merge_str_value(conf->custom_msgtag_header, prev->custom_msgtag_header, "");
428 ngx_conf_merge_value(conf->msg_in_etag_only, prev->msg_in_etag_only, 0);
429 ngx_conf_merge_value(conf->longpoll_multimsg, prev->longpoll_multimsg, 0);
430 ngx_conf_merge_value(conf->longpoll_multimsg_use_raw_stream_separator, prev->longpoll_multimsg_use_raw_stream_separator, 0);
431
432 ngx_conf_merge_value(conf->websocket_heartbeat.enabled, prev->websocket_heartbeat.enabled, 0);
433 MERGE_CONF(conf, prev, websocket_heartbeat.in);
434 MERGE_CONF(conf, prev, websocket_heartbeat.out);
435
436 MERGE_CONF(conf, prev, channel_events_channel_id);
437 MERGE_CONF(conf, prev, channel_event_string);
438
439 if(conf->channel_event_string == NULL) { //still null? use the default string
440 if(create_complex_value_from_ngx_str(cf, &conf->channel_event_string, &DEFAULT_CHANNEL_EVENT_STRING) == NGX_CONF_ERROR) {
441 return NGX_CONF_ERROR;
442 }
443 }
444
445 if(conf->storage_engine == NULL) {
446 conf->storage_engine = prev->storage_engine ? prev->storage_engine : default_storage_engine;
447 }
448
449 MERGE_CONF(conf, prev, authorize_request_url);
450 MERGE_CONF(conf, prev, publisher_upstream_request_url);
451 MERGE_CONF(conf, prev, unsubscribe_request_url);
452 MERGE_CONF(conf, prev, subscribe_request_url);
453 MERGE_CONF(conf, prev, channel_group);
454
455 MERGE_CONF(conf, prev, group.max_channels);
456 MERGE_CONF(conf, prev, group.max_subscribers);
457 MERGE_CONF(conf, prev, group.max_messages);
458 MERGE_CONF(conf, prev, group.max_messages_shm_bytes);
459 MERGE_CONF(conf, prev, group.max_messages_file_bytes);
460
461 if(conf->pub_chid.n == 0) {
462 conf->pub_chid = prev->pub_chid;
463 }
464 if(conf->sub_chid.n == 0) {
465 conf->sub_chid = prev->sub_chid;
466 }
467 if(conf->pubsub_chid.n == 0) {
468 conf->pubsub_chid = prev->pubsub_chid;
469 }
470 if(conf->last_message_id.n == 0) {
471 conf->last_message_id = prev->last_message_id;
472 }
473 if(conf->last_message_id.n == 0) { //if it's still null
474 ngx_str_t first_choice_msgid = ngx_string("$http_last_event_id");
475 ngx_str_t second_choice_msgid = ngx_string("$arg_last_event_id");
476
477 if(create_complex_value_from_ngx_str(cf, &conf->last_message_id.cv[0], &first_choice_msgid) == NGX_CONF_ERROR) {
478 return NGX_CONF_ERROR;
479 }
480 if(create_complex_value_from_ngx_str(cf, &conf->last_message_id.cv[1], &second_choice_msgid) == NGX_CONF_ERROR) {
481 return NGX_CONF_ERROR;
482 }
483 conf->last_message_id.n = 2;
484 }
485
486 ngx_conf_merge_value(conf->redis.url_enabled, prev->redis.url_enabled, 0);
487
488 ngx_conf_merge_value(conf->redis.upstream_inheritable, prev->redis.upstream_inheritable, 0);
489 ngx_conf_merge_str_value(conf->redis.url, prev->redis.url, NCHAN_REDIS_DEFAULT_URL);
490
491 if(up && up->redis.namespace.len > 0) { //upstream has a namespace set
492 ngx_conf_merge_str_value(conf->redis.namespace, up->redis.namespace, "");
493 }
494 else {
495 ngx_conf_merge_str_value(conf->redis.namespace, prev->redis.namespace, "");
496 }
497
498 if(up)
499 ngx_conf_merge_value(conf->redis.ping_interval, up->redis.ping_interval, NGX_CONF_UNSET);
500 ngx_conf_merge_value(conf->redis.ping_interval, prev->redis.ping_interval, NCHAN_REDIS_DEFAULT_PING_INTERVAL_TIME);
501
502 if(up)
503 ngx_conf_merge_value(conf->redis.cluster_check_interval, up->redis.cluster_check_interval, NGX_CONF_UNSET);
504 ngx_conf_merge_value(conf->redis.cluster_check_interval, prev->redis.cluster_check_interval, NCHAN_REDIS_DEFAULT_CLUSTER_CHECK_INTERVAL_TIME);
505
506 if(up)
507 ngx_conf_merge_value(conf->redis.nostore_fastpublish, up->redis.nostore_fastpublish, NGX_CONF_UNSET);
508 ngx_conf_merge_value(conf->redis.nostore_fastpublish, prev->redis.nostore_fastpublish, 0);
509
510 if(conf->redis.url_enabled) {
511 conf->redis.enabled = 1;
512 nchan_store_redis_add_active_loc_conf(cf, conf);
513 }
514 if(conf->redis.upstream_inheritable && !conf->redis.upstream && prev->redis.upstream && prev->redis.upstream_url.len > 0) {
515 conf->redis.upstream_url = prev->redis.upstream_url;
516 ngx_conf_set_redis_upstream(cf, &conf->redis.upstream_url, conf);
517 }
518
519 if(up)
520 MERGE_UNSET_CONF(conf->redis.storage_mode, up->redis.storage_mode, REDIS_MODE_CONF_UNSET, REDIS_MODE_CONF_UNSET);
521 MERGE_UNSET_CONF(conf->redis.storage_mode, prev->redis.storage_mode, REDIS_MODE_CONF_UNSET, REDIS_MODE_DISTRIBUTED);
522
523 if(prev->request_handler != NULL && conf->request_handler == NULL) {
524 conf->request_handler = prev->request_handler;
525 }
526 if(conf->request_handler != NULL) {
527 nchan_setup_handler(cf, conf->request_handler);
528 }
529
530 ngx_conf_merge_value(conf->benchmark.time, prev->benchmark.time, 10);
531 ngx_conf_merge_value(conf->benchmark.msgs_per_minute, prev->benchmark.msgs_per_minute, 120);
532 ngx_conf_merge_value(conf->benchmark.msg_padding, prev->benchmark.msg_padding, 0);
533 ngx_conf_merge_value(conf->benchmark.channels, prev->benchmark.channels, 1000);
534 ngx_conf_merge_value(conf->benchmark.subscribers_per_channel, prev->benchmark.subscribers_per_channel, 100);
535 MERGE_UNSET_CONF(conf->benchmark.subscriber_distribution, prev->benchmark.subscriber_distribution, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM);
536 MERGE_UNSET_CONF(conf->benchmark.publisher_distribution, prev->benchmark.publisher_distribution, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM);
537
538 return NGX_CONF_OK;
539 }
540
nchan_set_storage_engine(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)541 static char *nchan_set_storage_engine(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
542 nchan_loc_conf_t *lcf = conf;
543 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
544
545 if(nchan_strmatch(val, 1, "memory")) {
546 lcf->storage_engine = &nchan_store_memory;
547 }
548 else if(nchan_strmatch(val, 1, "redis")) {
549 lcf->storage_engine = &nchan_store_redis;
550 global_redis_enabled = 1;
551 }
552 else {
553 ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "invalid %V value: %V", &cmd->name, val);
554 return NGX_CONF_ERROR;
555 }
556
557 return NGX_CONF_OK;
558 }
559
560 #define WEBSOCKET_STRINGS "websocket", "ws", "websockets"
561 #define WEBSOCKET_STRINGS_N 3
562
563 #define EVENTSOURCE_STRINGS "eventsource", "event-source", "es", "sse"
564 #define EVENTSOURCE_STRINGS_N 4
565
566 #define HTTP_CHUNKED_STRINGS "chunked", "http-chunked"
567 #define HTTP_CHUNKED_STRINGS_N 2
568
569 #define HTTP_MULTIPART_STRINGS "multipart", "multipart/mixed", "http-multipart", "multipart-mixed"
570 #define HTTP_MULTIPART_STRINGS_N 4
571
572 #define LONGPOLL_STRINGS "longpoll", "long-poll"
573 #define LONGPOLL_STRINGS_N 2
574
575 #define INTERVALPOLL_STRINGS "poll", "interval-poll", "intervalpoll", "http"
576 #define INTERVALPOLL_STRINGS_N 4
577
578 #define HTTP_RAW_STREAM_STRINGS "http-raw-stream"
579 #define HTTP_RAW_STREAM_STRINGS_N 1
580
581 #define DISABLED_STRINGS "none", "off", "disabled"
582 #define DISABLED_STRINGS_N 3
583
nchan_publisher_directive_parse(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,ngx_int_t fail)584 static char *nchan_publisher_directive_parse(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_int_t fail) {
585 nchan_loc_conf_t *lcf = conf;
586 ngx_str_t *val;
587 ngx_uint_t i;
588
589
590 nchan_conf_publisher_types_t *pubt = &lcf->pub;
591
592 if(cf->args->nelts == 1){ //no arguments
593 pubt->http=1;
594 pubt->websocket=1;
595 }
596 else {
597 for(i=1; i < cf->args->nelts; i++) {
598 val = &((ngx_str_t *) cf->args->elts)[i];
599 if(nchan_strmatch(val, 1, "http")) {
600 pubt->http=1;
601 }
602 else if(nchan_strmatch(val, WEBSOCKET_STRINGS_N, WEBSOCKET_STRINGS)) {
603 pubt->websocket=1;
604 }
605 else{
606 if(fail) {
607 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
608 }
609 return NGX_CONF_ERROR;
610 }
611 }
612 }
613
614 if(!is_valid_location(cf, lcf)) {
615 return NGX_CONF_ERROR;
616 }
617 lcf->request_handler = &nchan_pubsub_handler;
618 return NGX_CONF_OK;
619 }
620
nchan_publisher_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)621 static char *nchan_publisher_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
622 return nchan_publisher_directive_parse(cf, cmd, conf, 1);
623 }
624
nchan_subscriber_directive_parse(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,ngx_int_t fail)625 static char *nchan_subscriber_directive_parse(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_int_t fail) {
626 nchan_loc_conf_t *lcf = conf;
627 ngx_str_t *val;
628 ngx_uint_t i;
629
630 nchan_conf_subscriber_types_t *subt = &lcf->sub;
631
632 if(cf->args->nelts == 1){ //no arguments
633 subt->poll=0;
634 subt->http_raw_stream = 0;
635 subt->longpoll=1;
636 subt->websocket=1;
637 subt->eventsource=1;
638 subt->http_chunked=1;
639 subt->http_multipart=1;
640 }
641 else {
642 for(i=1; i < cf->args->nelts; i++) {
643 val = &((ngx_str_t *) cf->args->elts)[i];
644 if(nchan_strmatch(val, LONGPOLL_STRINGS_N, LONGPOLL_STRINGS)) {
645 subt->longpoll=1;
646 }
647 else if(nchan_strmatch(val, INTERVALPOLL_STRINGS_N, INTERVALPOLL_STRINGS)) {
648 subt->poll=1;
649 }
650 else if(nchan_strmatch(val, HTTP_RAW_STREAM_STRINGS_N, HTTP_RAW_STREAM_STRINGS)) {
651 subt->http_raw_stream=1;
652 }
653 else if(nchan_strmatch(val, HTTP_CHUNKED_STRINGS_N, HTTP_CHUNKED_STRINGS)) {
654 subt->http_chunked=1;
655 }
656 else if(nchan_strmatch(val, HTTP_MULTIPART_STRINGS_N, HTTP_MULTIPART_STRINGS)) {
657 subt->http_multipart=1;
658 }
659 else if(nchan_strmatch(val, WEBSOCKET_STRINGS_N, WEBSOCKET_STRINGS)) {
660 subt->websocket=1;
661 }
662 else if(nchan_strmatch(val, EVENTSOURCE_STRINGS_N, EVENTSOURCE_STRINGS)) {
663 subt->eventsource=1;
664 }
665 else if(nchan_strmatch(val, DISABLED_STRINGS_N, DISABLED_STRINGS)) {
666 subt->poll=0;
667 subt->longpoll=0;
668 subt->websocket=0;
669 subt->eventsource=0;
670 subt->http_chunked=0;
671 subt->http_multipart=0;
672 }
673 else {
674 if(fail) {
675 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
676 }
677 return NGX_CONF_ERROR;
678 }
679 }
680 }
681
682 if(!is_valid_location(cf, lcf)) {
683 return NGX_CONF_ERROR;
684 }
685 lcf->request_handler = &nchan_pubsub_handler;
686 return NGX_CONF_OK;
687 }
688
nchan_subscriber_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)689 static char *nchan_subscriber_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
690 return nchan_subscriber_directive_parse(cf, cmd, conf, 1);
691 }
692
nchan_pubsub_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)693 static char *nchan_pubsub_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
694 ngx_str_t *val;
695 ngx_uint_t i;
696 nchan_loc_conf_t *lcf = conf;
697 nchan_publisher_directive_parse(cf, cmd, conf, 0);
698 nchan_subscriber_directive_parse(cf, cmd, conf, 0);
699 for(i=1; i < cf->args->nelts; i++) {
700 val = &((ngx_str_t *) cf->args->elts)[i];
701 if(! nchan_strmatch(val,
702 WEBSOCKET_STRINGS_N + EVENTSOURCE_STRINGS_N + HTTP_CHUNKED_STRINGS_N + HTTP_MULTIPART_STRINGS_N + LONGPOLL_STRINGS_N + INTERVALPOLL_STRINGS_N + HTTP_RAW_STREAM_STRINGS_N + DISABLED_STRINGS_N,
703 WEBSOCKET_STRINGS, EVENTSOURCE_STRINGS, HTTP_CHUNKED_STRINGS, HTTP_MULTIPART_STRINGS, LONGPOLL_STRINGS, INTERVALPOLL_STRINGS, HTTP_RAW_STREAM_STRINGS, DISABLED_STRINGS)) {
704 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
705 return NGX_CONF_ERROR;
706 }
707 }
708
709 if(!is_valid_location(cf, lcf)) {
710 return NGX_CONF_ERROR;
711 }
712
713 return NGX_CONF_OK;
714 }
715
nchan_subscriber_info_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)716 static char *nchan_subscriber_info_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
717 nchan_loc_conf_t *lcf = conf;
718 nchan_conf_subscriber_types_t *subt = &lcf->sub;
719
720 // doesn't make sense to have longpoll be the default HTTP subscriber, since channel info locations are likely to be curl'd by developers
721 subt->poll=0;
722 subt->http_raw_stream = 1;
723 subt->longpoll=0;
724 subt->websocket=1;
725 subt->eventsource=1;
726 subt->http_chunked=1;
727 subt->http_multipart=1;
728
729 lcf->subscriber_info_location = 1;
730
731 lcf->message_timeout=10;
732 lcf->complex_message_timeout = NULL;
733
734 lcf->request_handler = &nchan_subscriber_info_handler;
735 return NGX_CONF_OK;
736 }
737
nchan_group_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)738 static char *nchan_group_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
739 nchan_loc_conf_t *lcf = conf;
740 ngx_str_t *val;
741 ngx_uint_t i;
742 nchan_conf_group_t *group = &lcf->group;
743
744 if(cf->args->nelts == 1){ //no arguments
745 group->get=1;
746 group->set=1;
747 group->delete=1;
748 }
749 else {
750 for(i=1; i < cf->args->nelts; i++) {
751 val = &((ngx_str_t *) cf->args->elts)[i];
752 if(nchan_strmatch(val, 1, "get")) {
753 group->get=1;
754 }
755 else if(nchan_strmatch(val, 1, "set")) {
756 group->set=1;
757 }
758 else if(nchan_strmatch(val, 1, "delete")) {
759 group->delete=1;
760 }
761 else if(nchan_strmatch(val, 1, "off")) {
762 group->get=0;
763 group->set=0;
764 group->delete=0;
765 }
766 else {
767 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
768 return NGX_CONF_ERROR;
769 }
770 }
771 }
772
773 if(!is_valid_location(cf, lcf)) {
774 return NGX_CONF_ERROR;
775 }
776 lcf->request_handler = &nchan_group_handler;
777 return NGX_CONF_OK;
778 }
779
set_complex_value(ngx_conf_t * cf,ngx_http_complex_value_t ** cv,char * val)780 static ngx_int_t set_complex_value(ngx_conf_t *cf, ngx_http_complex_value_t **cv, char *val) {
781 ngx_http_compile_complex_value_t ccv;
782 ngx_str_t *value = ngx_palloc(cf->pool, sizeof(ngx_str_t));;
783 if(value == NULL) {
784 return NGX_ERROR;
785 }
786 value->data = (u_char *)val;
787 value->len = strlen(val);
788 *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
789 if (*cv == NULL) {
790 return NGX_ERROR;
791 }
792 ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
793 ccv.cf = cf;
794 ccv.value = value;
795 ccv.complex_value = *cv;
796 if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
797 return NGX_ERROR;
798 }
799
800 return NGX_OK;
801 }
802
set_complex_value_array_size1(ngx_conf_t * cf,nchan_complex_value_arr_t * chid,char * val)803 static ngx_int_t set_complex_value_array_size1(ngx_conf_t *cf, nchan_complex_value_arr_t *chid, char *val) {
804 chid->n = 1;
805 return set_complex_value(cf, &chid->cv[0], val);
806 }
807
nchan_benchmark_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)808 static char *nchan_benchmark_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
809 nchan_loc_conf_t *lcf = conf;
810 global_benchmark_enabled = 1;
811 lcf->request_handler = &nchan_benchmark_handler;
812
813
814 //set group
815 if(set_complex_value(cf, &lcf->channel_group, "benchmark") != NGX_OK) {
816 return "error setting benchmark channel group";
817 }
818 //set pub and sub channel ids
819 if(set_complex_value_array_size1(cf, &lcf->pub_chid, "control") != NGX_OK) {
820 return "error setting benchmark control channel";
821 }
822 if(set_complex_value_array_size1(cf, &lcf->sub_chid, "data") != NGX_OK) {
823 return "error setting benchmark data channel";
824 }
825
826 lcf->sub.websocket = 1;
827 lcf->pub.websocket = 1;
828
829 return NGX_CONF_OK;
830 }
831
nchan_benchmark_subscriber_distribution_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)832 static char *nchan_benchmark_subscriber_distribution_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
833 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
834 nchan_loc_conf_t *lcf = conf;
835 if(nchan_strmatch(val, 1, "random")) {
836 lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM;
837 }
838 else if(nchan_strmatch(val, 2, "optimal", "best")) {
839 lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_OPTIMAL;
840 }
841 else {
842 return "invalid value, must be \"random\" or \"optimal\"";
843 }
844 return NGX_CONF_OK;
845 }
nchan_benchmark_publisher_distribution_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)846 static char *nchan_benchmark_publisher_distribution_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
847 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
848 nchan_loc_conf_t *lcf = conf;
849 if(nchan_strmatch(val, 1, "random")) {
850 lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM;
851 }
852 else if(nchan_strmatch(val, 2, "optimal", "best")) {
853 lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_OPTIMAL;
854 }
855 else {
856 return "invalid value, must be \"random\" or \"optimal\"";
857 }
858 return NGX_CONF_OK;
859 }
860
nchan_subscriber_first_message_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)861 static char *nchan_subscriber_first_message_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
862 nchan_loc_conf_t *lcf = (nchan_loc_conf_t *)conf;
863 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
864 if(nchan_strmatch(val, 1, "oldest")) {
865 lcf->subscriber_first_message = 1;
866 }
867 else if(nchan_strmatch(val, 1, "newest")) {
868 lcf->subscriber_first_message = 0;
869 }
870 else {
871 //maybe a number?
872 ngx_str_t num = *val;
873 int sign = 1;
874 ngx_int_t n;
875
876 if(num.len > 0 && num.data[0] == '-') {
877 num.len--;
878 num.data++;
879 sign = -1;
880 }
881 if((n = ngx_atoi(num.data, num.len)) == NGX_ERROR) {
882 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V, must be 'oldest', 'newest', or a number", &cmd->name, val);
883 return NGX_CONF_ERROR;
884 }
885 if (n > 32) {
886 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V, must be 'oldest', 'newest', or a number between -32 and 32", &cmd->name, val);
887 return NGX_CONF_ERROR;
888 }
889 lcf->subscriber_first_message = n * sign;
890 }
891 return NGX_CONF_OK;
892 }
893
nchan_websocket_heartbeat_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)894 static char *nchan_websocket_heartbeat_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
895 nchan_loc_conf_t *lcf = (nchan_loc_conf_t *)conf;
896 ngx_str_t *heartbeat_in = &((ngx_str_t *) cf->args->elts)[1];
897 ngx_str_t *heartbeat_out = &((ngx_str_t *) cf->args->elts)[2];
898 ngx_str_t *in, *out;
899 lcf->websocket_heartbeat.enabled = 1;
900
901 in = ngx_pcalloc(cf->pool, sizeof(ngx_str_t) + heartbeat_in->len);
902 in->data = (u_char *)&in[1];
903 in->len = heartbeat_in->len;
904 ngx_memcpy(in->data, heartbeat_in->data, heartbeat_in->len);
905 lcf->websocket_heartbeat.in = in;
906
907 out = ngx_pcalloc(cf->pool, sizeof(ngx_str_t) + heartbeat_out->len);
908 out->data = (u_char *)&out[1];
909 out->len = heartbeat_out->len;
910 ngx_memcpy(out->data, heartbeat_out->data, heartbeat_out->len);
911 lcf->websocket_heartbeat.out = out;
912
913 return NGX_CONF_OK;
914 }
915
nchan_conf_deflate_compression_level_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)916 static char *nchan_conf_deflate_compression_level_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
917 #if (NGX_ZLIB)
918 nchan_main_conf_t *mcf = (nchan_main_conf_t *)conf;
919 ngx_int_t np;
920 ngx_str_t *value = cf->args->elts;
921 np = ngx_atoi(value[1].data, value[1].len);
922 if (np == NGX_ERROR) {
923 return "invalid number";
924 }
925 if(np < 0 || np > 9) {
926 return "must be between 0 and 9";
927 }
928
929 mcf->zlib_params.level = np;
930 #endif
931 return NGX_CONF_OK;
932 }
933
nchan_conf_deflate_compression_strategy_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)934 static char *nchan_conf_deflate_compression_strategy_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
935 #if (NGX_ZLIB)
936 nchan_main_conf_t *mcf = (nchan_main_conf_t *)conf;
937 ngx_str_t *val = cf->args->elts;
938 if(nchan_strmatch(val, 1, "default")) {
939 mcf->zlib_params.strategy = Z_DEFAULT_STRATEGY;
940 }
941 else if(nchan_strmatch(val, 1, "filtered")) {
942 mcf->zlib_params.strategy = Z_FILTERED;
943 }
944 else if(nchan_strmatch(val, 1, "huffman-only")) {
945 mcf->zlib_params.strategy = Z_HUFFMAN_ONLY;
946 }
947 else if(nchan_strmatch(val, 1, "rle")) {
948 mcf->zlib_params.strategy = Z_RLE;
949 }
950 else if(nchan_strmatch(val, 1, "fixed")) {
951 mcf->zlib_params.strategy = Z_FIXED;
952 }
953 else {
954 return "invalid compression strategy";
955 }
956 #endif
957 return NGX_CONF_OK;
958 }
959
nchan_conf_deflate_compression_window_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)960 static char *nchan_conf_deflate_compression_window_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
961 #if (NGX_ZLIB)
962 nchan_main_conf_t *mcf = (nchan_main_conf_t *)conf;
963 ngx_int_t np;
964 ngx_str_t *value = cf->args->elts;
965 np = ngx_atoi(value[1].data, value[1].len);
966 if (np == NGX_ERROR) {
967 return "invalid number";
968 }
969 if(np < 9 || np > 15) {
970 return "must be between 9 and 15";
971 }
972
973 mcf->zlib_params.windowBits = np;
974 #endif
975 return NGX_CONF_OK;
976 }
977
nchan_conf_deflate_compression_memlevel_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)978 static char *nchan_conf_deflate_compression_memlevel_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
979 #if (NGX_ZLIB)
980 nchan_main_conf_t *mcf = (nchan_main_conf_t *)conf;
981 ngx_int_t np;
982 ngx_str_t *value = cf->args->elts;
983 np = ngx_atoi(value[1].data, value[1].len);
984 if (np == NGX_ERROR) {
985 return "invalid number";
986 }
987 if(np < 1 || np > 9) {
988 return "must be between 1 and 9";
989 }
990
991 mcf->zlib_params.memLevel = np;
992 #endif
993 return NGX_CONF_OK;
994 }
995
996
nchan_exit_worker(ngx_cycle_t * cycle)997 static void nchan_exit_worker(ngx_cycle_t *cycle) {
998 if(!global_nchan_enabled) {
999 return;
1000 }
1001 if(global_redis_enabled) {
1002 redis_store_prepare_to_exit_worker();
1003 }
1004 nchan_store_memory.exit_worker(cycle);
1005 if(global_redis_enabled) {
1006 nchan_store_redis.exit_worker(cycle);
1007 }
1008 nchan_output_shutdown();
1009 #if (NGX_ZLIB)
1010 if(global_zstream_needed) {
1011 nchan_common_deflate_shutdown();
1012 }
1013 #endif
1014 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
1015 subscriber_debug_assert_isempty();
1016 #endif
1017 }
1018
nchan_exit_master(ngx_cycle_t * cycle)1019 static void nchan_exit_master(ngx_cycle_t *cycle) {
1020 if(!global_nchan_enabled) {
1021 return;
1022 }
1023 if(global_benchmark_enabled) {
1024 nchan_benchmark_exit_master(cycle);
1025 }
1026 nchan_store_memory.exit_master(cycle);
1027 if(global_redis_enabled) {
1028 nchan_store_redis.exit_master(cycle);
1029 }
1030 #if (NGX_ZLIB)
1031 if(global_zstream_needed) {
1032 nchan_common_deflate_shutdown();
1033 }
1034 #endif
1035 }
1036
nchan_set_complex_value_array(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,nchan_complex_value_arr_t * chid)1037 static char *nchan_set_complex_value_array(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, nchan_complex_value_arr_t *chid) {
1038 ngx_uint_t i;
1039 ngx_str_t *value;
1040 ngx_http_complex_value_t **cv;
1041 ngx_http_compile_complex_value_t ccv;
1042
1043 chid->n = cf->args->nelts - 1;
1044 for(i=1; i < cf->args->nelts && i <= NCHAN_COMPLEX_VALUE_ARRAY_MAX; i++) {
1045 value = &((ngx_str_t *) cf->args->elts)[i];
1046
1047 cv = &chid->cv[i-1];
1048 *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
1049 if (*cv == NULL) {
1050 return NGX_CONF_ERROR;
1051 }
1052
1053 ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
1054 ccv.cf = cf;
1055 ccv.value = value;
1056 ccv.complex_value = *cv;
1057
1058 if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
1059 return NGX_CONF_ERROR;
1060 }
1061 }
1062
1063 return NGX_CONF_OK;
1064 }
1065
nchan_set_pub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1066 static char *nchan_set_pub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1067 return nchan_set_complex_value_array(cf, cmd, conf, &((nchan_loc_conf_t *)conf)->pub_chid);
1068 }
1069
nchan_set_sub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1070 static char *nchan_set_sub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1071 nchan_loc_conf_t *lcf = conf;
1072 return nchan_set_complex_value_array(cf, cmd, conf, &lcf->sub_chid);
1073 }
1074
nchan_set_pubsub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1075 static char *nchan_set_pubsub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1076 nchan_loc_conf_t *lcf = conf;
1077 return nchan_set_complex_value_array(cf, cmd, conf, &lcf->pubsub_chid);
1078 }
1079
nchan_subscriber_last_message_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1080 static char *nchan_subscriber_last_message_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1081 return nchan_set_complex_value_array(cf, cmd, conf, &((nchan_loc_conf_t *)conf)->last_message_id);
1082 }
1083
nchan_set_channel_events_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1084 static char *nchan_set_channel_events_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1085 ngx_str_t *value = &((ngx_str_t *) cf->args->elts)[1];
1086 ngx_http_complex_value_t **cv = &((nchan_loc_conf_t *)conf)->channel_events_channel_id;
1087 ngx_http_compile_complex_value_t ccv;
1088
1089 *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
1090 if (*cv == NULL) {
1091 return NGX_CONF_ERROR;
1092 }
1093
1094 ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
1095 ccv.cf = cf;
1096 ccv.value = value;
1097 ccv.complex_value = *cv;
1098
1099 if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
1100 return NGX_CONF_ERROR;
1101 }
1102
1103 return NGX_CONF_OK;
1104 }
1105
nchan_store_messages_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1106 static char *nchan_store_messages_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1107 char *p = conf;
1108 ngx_str_t *val = cf->args->elts;
1109
1110
1111 if (ngx_strcasecmp(val[1].data, (u_char *) "off") == 0) {
1112 ngx_int_t *max;
1113 max = (ngx_int_t *) (p + offsetof(nchan_loc_conf_t, max_messages));
1114 *max=0;
1115 }
1116 return NGX_CONF_OK;
1117 }
1118
nchan_set_message_buffer_length(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1119 static char *nchan_set_message_buffer_length(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1120 nchan_loc_conf_t *lcf = conf;
1121 ngx_str_t *val = cf->args->elts;
1122 ngx_str_t *arg = &val[1];
1123
1124 if(memchr(arg->data, '$', arg->len)) {
1125 //complex
1126 lcf->max_messages = NGX_CONF_UNSET;
1127 cmd->offset = offsetof(nchan_loc_conf_t, complex_max_messages);
1128 ngx_http_set_complex_value_slot(cf, cmd, conf);
1129 memstore_reserve_conf_shared_data(lcf);
1130 }
1131 else {
1132 //simple
1133 lcf->complex_max_messages = NULL;
1134 cmd->offset = offsetof(nchan_loc_conf_t, max_messages);
1135 ngx_conf_set_num_slot(cf, cmd, conf);
1136 }
1137 return NGX_CONF_OK;
1138 }
1139
ngx_http_set_unsubscribe_request_url(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1140 static char *ngx_http_set_unsubscribe_request_url(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1141 #if nginx_version >= 1003015
1142 return ngx_http_set_complex_value_slot(cf, cmd, conf);
1143 #else
1144 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "%V not available on nginx version: %s, must be at least 1.3.15", &cmd->name, NGINX_VERSION);
1145 return NGX_CONF_ERROR;
1146 #endif
1147 }
1148
nchan_set_message_timeout(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1149 static char *nchan_set_message_timeout(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1150 nchan_loc_conf_t *lcf = conf;
1151 ngx_str_t *val = cf->args->elts;
1152 ngx_str_t *arg = &val[1];
1153
1154 if(memchr(arg->data, '$', arg->len)) {
1155 //complex
1156 lcf->message_timeout = NGX_CONF_UNSET;
1157 cmd->offset = offsetof(nchan_loc_conf_t, complex_message_timeout);
1158 ngx_http_set_complex_value_slot(cf, cmd, conf);
1159 memstore_reserve_conf_shared_data(lcf);
1160 }
1161 else {
1162 //simple
1163 lcf->complex_message_timeout = NULL;
1164 cmd->offset = offsetof(nchan_loc_conf_t, message_timeout);
1165 ngx_conf_set_sec_slot(cf, cmd, conf);
1166 }
1167 return NGX_CONF_OK;
1168 }
1169
nchan_ignore_obsolete_setting(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1170 static char *nchan_ignore_obsolete_setting(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1171 ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "ignoring obsolete nchan config directive '%V'.", &cmd->name);
1172 return NGX_CONF_OK;
1173 }
1174
nchan_ignore_subscriber_concurrency(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1175 static char *nchan_ignore_subscriber_concurrency(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1176 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
1177 if(!nchan_strmatch(val, 1, "broadcast")) {
1178 ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "ignoring obsolete nchan config directive '%V %V;'. Only 'broadcast' is currently supported.", &cmd->name, val);
1179 }
1180 return NGX_CONF_OK;
1181 }
1182
nchan_set_raw_subscriber_separator(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1183 static char *nchan_set_raw_subscriber_separator(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1184 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
1185 nchan_loc_conf_t *lcf = conf;
1186 ngx_str_t *cf_val = &lcf->subscriber_http_raw_stream_separator;
1187
1188 if( val->len && val->data[val->len - 1] != '\n' ) { //must end in a newline if not empty
1189 u_char *cur;
1190 if((cur = ngx_palloc(cf->pool, val->len + 1)) == NULL) {
1191 return NGX_CONF_ERROR;
1192 }
1193 ngx_memcpy(cur, val->data, val->len);
1194 cur[val->len] = '\n';
1195 cf_val->len = val->len + 1;
1196 cf_val->data = cur;
1197 }
1198 else {
1199 *cf_val = *val;
1200 }
1201 return NGX_CONF_OK;
1202 }
1203
nchan_set_message_compression_slot(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1204 static char *nchan_set_message_compression_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1205 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
1206 nchan_loc_conf_t *lcf = conf;
1207 #if (NGX_ZLIB)
1208 if(nchan_strmatch(val, 1, "on")) {
1209 lcf->message_compression = 1;
1210 global_zstream_needed = 1;
1211 }
1212 else if(nchan_strmatch(val, 1, "off")) {
1213 lcf->message_compression = 0;
1214 }
1215 else {
1216 return "invalid value: must be 'on' or 'off'";
1217 }
1218 return NGX_CONF_OK;
1219 #else
1220 return "cannot use compression, Nginx was built without zlib";
1221 #endif
1222 }
1223
nchan_set_longpoll_multipart(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1224 static char *nchan_set_longpoll_multipart(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1225 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
1226 nchan_loc_conf_t *lcf = conf;
1227 if(nchan_strmatch(val, 1, "on")) {
1228 lcf->longpoll_multimsg = 1;
1229 }
1230 else if(nchan_strmatch(val, 1, "off")) {
1231 lcf->longpoll_multimsg = 0;
1232 }
1233 else if(nchan_strmatch(val, 1, "raw")) {
1234 lcf->longpoll_multimsg = 1;
1235 lcf->longpoll_multimsg_use_raw_stream_separator = 1;
1236 }
1237 else {
1238 ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid value for %V: %V;'. Must be 'on', 'off', or 'raw'", &cmd->name, val);
1239 return NGX_CONF_ERROR;
1240 }
1241 return NGX_CONF_OK;
1242 }
1243
ngx_conf_set_redis_subscribe_weights(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1244 static char *ngx_conf_set_redis_subscribe_weights(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1245 ngx_int_t master = NGX_CONF_UNSET;
1246 ngx_int_t slave = NGX_CONF_UNSET;
1247 ngx_str_t *val = cf->args->elts;
1248 ngx_str_t *cur;
1249 unsigned i;
1250 nchan_srv_conf_t *scf = conf;
1251 for(i=1; i < cf->args->nelts; i++) {
1252 cur = &val[i];
1253 if(nchan_str_after(&cur, "master=")) {
1254 if((master = ngx_atoi(cur->data, cur->len)) == NGX_ERROR) {
1255 return "has invalid weight for master";
1256 }
1257 }
1258 else if(nchan_str_after(&cur, "slave=")) {
1259 if((slave = ngx_atoi(cur->data, cur->len)) == NGX_ERROR) {
1260 return "has invalid weight for slave";
1261 }
1262 }
1263 }
1264
1265 if(master != NGX_CONF_UNSET) {
1266 scf->redis.master_weight = master;
1267 }
1268 if(slave != NGX_CONF_UNSET) {
1269 scf->redis.slave_weight = slave;
1270 }
1271
1272 return NGX_CONF_OK;
1273 }
1274
ngx_conf_set_redis_optimize_target(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1275 static char *ngx_conf_set_redis_optimize_target(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1276 ngx_str_t *val = &((ngx_str_t *) cf->args->elts)[1];
1277 nchan_srv_conf_t *scf = conf;
1278 if(nchan_strmatch(val, 2, "bandwidth", "bw")) {
1279 scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_BANDWIDTH;
1280 }
1281 else if(nchan_strmatch(val, 2, "cpu", "CPU")) {
1282 scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_CPU;
1283 }
1284 else {
1285 return "invalid value, must be \"bandwidth\" or \"cpu\"";
1286 }
1287 return NGX_CONF_OK;
1288 }
1289
ngx_conf_enable_redis(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1290 static char *ngx_conf_enable_redis(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1291 char *rc;
1292 ngx_flag_t *fp;
1293 char *p = conf;
1294 nchan_loc_conf_t *lcf = conf;
1295
1296 ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Use of %V is discouraged in favor of nchan_redis_pass.", &cmd->name);
1297
1298 rc = ngx_conf_set_flag_slot(cf, cmd, conf);
1299 if(rc == NGX_CONF_ERROR) {
1300 return rc;
1301 }
1302 fp = (ngx_flag_t *) (p + cmd->offset);
1303
1304 if(*fp) {
1305 if(!lcf->redis.enabled) {
1306 lcf->redis.enabled = 1;
1307 nchan_store_redis_add_active_loc_conf(cf, lcf);
1308 }
1309 global_redis_enabled = 1;
1310 }
1311 else {
1312 nchan_store_redis_remove_active_loc_conf(cf, lcf);
1313 }
1314
1315 return rc;
1316 }
1317
nchan_stub_status_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1318 static char *nchan_stub_status_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1319 nchan_loc_conf_t *lcf = conf;
1320 nchan_stub_status_enabled = 1;
1321 lcf->request_handler = &nchan_stub_status_handler;
1322 return NGX_CONF_OK;
1323 }
1324
1325
nchan_upstream_dummy_roundrobin_init(ngx_conf_t * cf,ngx_http_upstream_srv_conf_t * us)1326 static ngx_int_t nchan_upstream_dummy_roundrobin_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) {
1327 return NGX_OK;
1328 }
1329
ngx_conf_upstream_redis_server(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1330 static char *ngx_conf_upstream_redis_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1331 ngx_http_upstream_srv_conf_t *uscf;
1332 ngx_str_t *value;
1333 ngx_http_upstream_server_t *usrv;
1334 nchan_loc_conf_t *lcf = conf;
1335 uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
1336 nchan_srv_conf_t *scf = NULL;
1337 scf = ngx_http_conf_upstream_srv_conf(uscf, ngx_nchan_module);
1338 if(scf->upstream_nchan_loc_conf) {
1339 assert(scf->upstream_nchan_loc_conf == lcf);
1340 }
1341 else {
1342 //is this even a safe technique? it might break in the future...
1343 scf->upstream_nchan_loc_conf = lcf;
1344 }
1345
1346
1347 if(uscf->servers == NULL) {
1348 uscf->servers = ngx_array_create(cf->pool, 4, sizeof(ngx_http_upstream_server_t));
1349 }
1350 if ((usrv = ngx_array_push(uscf->servers)) == NULL) {
1351 return NGX_CONF_ERROR;
1352 }
1353 value = cf->args->elts;
1354
1355 if(!nchan_store_redis_validate_url(&value[1])) {
1356 return "url is invalid";
1357 }
1358
1359 ngx_memzero(usrv, sizeof(*usrv));
1360 #if nginx_version >= 1007002
1361 usrv->name = value[1];
1362 #endif
1363 usrv->addrs = ngx_pcalloc(cf->pool, sizeof(ngx_addr_t));
1364 usrv->addrs->name = value[1];
1365
1366 uscf->peer.init_upstream = nchan_upstream_dummy_roundrobin_init;
1367 return NGX_CONF_OK;
1368 }
1369
ipv6_prefix_size_to_mask(int prefix_size,struct in6_addr * mask)1370 static void ipv6_prefix_size_to_mask(int prefix_size, struct in6_addr *mask) {
1371 ngx_memzero(mask, sizeof(*mask));
1372 int i;
1373 for(i=0; prefix_size > 0; prefix_size-=8, i++) {
1374 mask->s6_addr[i]= (prefix_size >= 8) ? 0xFF : (unsigned long)(0xFFU << (8 - prefix_size));
1375 }
1376 }
1377
ipv4_prefix_size_to_mask(int prefix_size,struct in_addr * mask)1378 static void ipv4_prefix_size_to_mask(int prefix_size, struct in_addr *mask) {
1379 mask->s_addr= (in_addr_t )(prefix_size > 0 ? htonl(~((1 << (32 - prefix_size)) - 1)) : 0);
1380 }
1381
ngx_conf_set_redis_ip_blacklist(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1382 static char *ngx_conf_set_redis_ip_blacklist(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1383 nchan_srv_conf_t *scf = conf;
1384 ngx_str_t *cur;
1385 ngx_str_t *val = cf->args->elts;
1386 int i;
1387 nchan_redis_ip_range_t *blacklist = ngx_palloc(cf->pool, sizeof(*blacklist) * (cf->args->nelts - 1));
1388 if(blacklist == NULL) {
1389 return "couldn't allocate Redis server blacklist";
1390 }
1391
1392 scf->redis.blacklist = blacklist;
1393 scf->redis.blacklist_count = cf->args->nelts - 1;
1394
1395 for(i=1; i <= scf->redis.blacklist_count; i++) {
1396 cur = &val[i];
1397 nchan_redis_ip_range_t *entry = &blacklist[i-1];
1398 entry->str = *cur;
1399 int prefix_size;
1400 u_char *slash = memchr(cur->data, '/', cur->len);
1401 if(slash) {
1402 prefix_size = ngx_atoi(slash+1, cur->len - (slash + 1 - cur->data));
1403 if(prefix_size == NGX_ERROR) {
1404 return "invalid CIDR range prefix size";
1405 }
1406 }
1407 else {
1408 prefix_size = -1;
1409 slash = &cur->data[cur->len];
1410 }
1411
1412 char buf[64];
1413 ngx_memzero(buf, sizeof(buf));
1414 memcpy(buf, cur->data, (slash - cur->data));
1415
1416 struct addrinfo hints = {0};
1417 hints.ai_family = AF_UNSPEC;
1418 hints.ai_socktype = SOCK_STREAM;
1419 hints.ai_flags = AI_PASSIVE;
1420
1421 struct addrinfo *res;
1422 if(getaddrinfo(buf, NULL, &hints, &res) != 0) {
1423 return "unable to parse IP address";
1424 }
1425 entry->family = res->ai_family;
1426 if(entry->family == AF_INET) {
1427 struct sockaddr_in *sa = (struct sockaddr_in *)res->ai_addr;
1428 entry->addr.ipv4 = sa->sin_addr;
1429 entry->addr_block.ipv4 = sa->sin_addr;
1430 }
1431 #ifdef AF_INET6
1432 else if(entry->family == AF_INET6) {
1433 struct sockaddr_in6 *sa = (struct sockaddr_in6 *)res->ai_addr;
1434 entry->addr.ipv6 = sa->sin6_addr;
1435 entry->addr_block.ipv6 = sa->sin6_addr;
1436 }
1437 #endif
1438 else {
1439 return "invalid address family";
1440 }
1441
1442 if(prefix_size == 0) {
1443 return "netmask size of 0 would block everything";
1444 }
1445
1446 if(prefix_size == -1) {
1447 //no prefix size given, assume we're blacklisting single ip.
1448 //prefix size depends on ipv4 or ipv6
1449 prefix_size = entry->family == AF_INET ? 32 : 128;
1450 }
1451 entry->prefix_size = prefix_size;
1452
1453 if(entry->family == AF_INET) {
1454 if(prefix_size > 32) {
1455 return "netmask size cannot exceed 32 for IPv4";
1456 }
1457 ipv4_prefix_size_to_mask(prefix_size, &entry->mask.ipv4);
1458 entry->addr_block.ipv4.s_addr &= entry->mask.ipv4.s_addr;
1459
1460 }
1461 #ifdef AF_INET6
1462 else if(entry->family == AF_INET6) {
1463 if(prefix_size > 128) {
1464 return "netmask size cannot exceed 128 for IPv4";
1465 }
1466 ipv6_prefix_size_to_mask(prefix_size, &entry->mask.ipv6);
1467 unsigned j;
1468 uint8_t *addr = entry->addr_block.ipv6.s6_addr;
1469 uint8_t *mask = entry->mask.ipv6.s6_addr;
1470 for(j=0; j<sizeof(entry->addr_block.ipv6.s6_addr); j++) {
1471 addr[j] &= mask[j];
1472 }
1473 }
1474 #endif
1475 else {
1476 return "invalid address family";
1477 }
1478 freeaddrinfo(res);
1479 }
1480 return NGX_OK;
1481 }
1482
ngx_conf_set_str_slot_no_newlines(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1483 static char *ngx_conf_set_str_slot_no_newlines(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1484 ngx_str_t *val = cf->args->elts;
1485 ngx_str_t *arg = &val[1];
1486
1487 if(nchan_ngx_str_substr(arg, "\n")) {
1488 return "can't contain any newline characters";
1489 }
1490
1491 return ngx_conf_set_str_slot(cf, cmd, conf);
1492 }
1493
ngx_conf_set_redis_url(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1494 static char *ngx_conf_set_redis_url(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1495 nchan_loc_conf_t *lcf = conf;
1496 ngx_str_t *val = cf->args->elts;
1497 ngx_str_t *arg = &val[1];
1498
1499 ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Use of %V is discouraged in favor of an upstream { } block with nchan_redis_server %V;", &cmd->name, arg);
1500
1501 if(lcf->redis.upstream) {
1502 return "can't be set here: already using nchan_redis_pass";
1503 }
1504 if(!nchan_store_redis_validate_url(arg)) {
1505 return "url is invalid";
1506 }
1507
1508 return ngx_conf_set_str_slot(cf, cmd, conf);
1509 }
1510
ngx_conf_process_redis_namespace_slot(ngx_conf_t * cf,void * post,void * fld)1511 static char *ngx_conf_process_redis_namespace_slot(ngx_conf_t *cf, void *post, void *fld) {
1512 ngx_str_t *arg = fld;
1513
1514 if(memchr(arg->data, '{', arg->len)) {
1515 return "can't contain character '{'";
1516 }
1517
1518 if(memchr(arg->data, '}', arg->len)) {
1519 return "can't contain character '}'";
1520 }
1521
1522 if(arg->len > 0 && arg->data[arg->len-1] != ':') {
1523 u_char *nns;
1524 if((nns = ngx_palloc(cf->pool, arg->len + 2)) == NULL) {
1525 return "couldn't allocate redis namespace data";
1526 }
1527 ngx_memcpy(nns, arg->data, arg->len);
1528 nns[arg->len]=':';
1529 nns[arg->len+1]='\0';
1530 arg->len++;
1531 arg->data=nns;
1532 }
1533
1534 return NGX_CONF_OK;
1535 }
1536
ngx_conf_set_redis_storage_mode_slot(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1537 static char *ngx_conf_set_redis_storage_mode_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1538 char *p = conf;
1539 ngx_str_t *val = cf->args->elts;
1540 ngx_str_t *arg = &val[1];
1541
1542 nchan_redis_storage_mode_t *field;
1543
1544 field = (nchan_redis_storage_mode_t *) (p + cmd->offset);
1545
1546 if(*field != REDIS_MODE_CONF_UNSET) {
1547 return "is duplicate";
1548 }
1549
1550 if(nchan_strmatch(arg, 1, "backup")) {
1551 *field = REDIS_MODE_BACKUP;
1552 }
1553 else if(nchan_strmatch(arg, 1, "distributed")) {
1554 *field = REDIS_MODE_DISTRIBUTED;
1555 }
1556 else if(nchan_strmatch(arg, 1, "nostore") || nchan_strmatch(arg, 1, "distributed-nostore")) {
1557 *field = REDIS_MODE_DISTRIBUTED_NOSTORE;
1558 }
1559 else {
1560 return "is invalid, must be one of 'distributed', 'backup' or 'nostore'";
1561 }
1562
1563 return NGX_CONF_OK;
1564 }
1565
ngx_conf_set_redis_upstream_pass(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1566 static char *ngx_conf_set_redis_upstream_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1567 nchan_loc_conf_t *lcf = conf;
1568 ngx_str_t *value = cf->args->elts;
1569
1570 lcf->redis.upstream_url = value[1];
1571 return ngx_conf_set_redis_upstream(cf, &value[1], conf);
1572 }
1573
1574
1575 #include "nchan_config_commands.c" //hideous but hey, it works
1576
1577 static ngx_http_module_t nchan_module_ctx = {
1578 nchan_preconfig, /* preconfiguration */
1579 nchan_postconfig, /* postconfiguration */
1580 nchan_create_main_conf, /* create main configuration */
1581 NULL, /* init main configuration */
1582 nchan_create_srv_conf, /* create server configuration */
1583 nchan_merge_srv_conf, /* merge server configuration */
1584 nchan_create_loc_conf, /* create location configuration */
1585 nchan_merge_loc_conf, /* merge location configuration */
1586 };
1587
1588 ngx_module_t ngx_nchan_module = {
1589 NGX_MODULE_V1,
1590 &nchan_module_ctx, /* module context */
1591 nchan_commands, /* module directives */
1592 NGX_HTTP_MODULE, /* module type */
1593 NULL, /* init master */
1594 nchan_init_module, /* init module */
1595 nchan_init_worker, /* init process */
1596 NULL, /* init thread */
1597 NULL, /* exit thread */
1598 nchan_exit_worker, /* exit process */
1599 nchan_exit_master, /* exit master */
1600 NGX_MODULE_V1_PADDING
1601 };
1602