1 #include <nchan_websocket_publisher.h>
2 #include <nchan_types.h>
3 #include <util/nchan_output.h>
4 #include <nchan_variables.h>
5 #include <store/memory/store.h>
6 #include <store/redis/store.h>
7 #if (NGX_ZLIB)
8 #include <zlib.h>
9 #endif
10 
11 static char *nchan_set_complex_value_array(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, nchan_complex_value_arr_t *chid);
12 static ngx_int_t set_complex_value_array_size1(ngx_conf_t *cf, nchan_complex_value_arr_t *chid, char *val);
13 
14 static ngx_str_t      DEFAULT_CHANNEL_EVENT_STRING = ngx_string("$nchan_channel_event $nchan_channel_id");
15 
16 static ngx_str_t      DEFAULT_SUBSCRIBER_INFO_STRING = ngx_string("$nchan_subscriber_type $remote_addr:$remote_port $http_user_agent $server_name $request_uri $pid");
17 
18 nchan_store_t   *default_storage_engine = &nchan_store_memory;
19 ngx_flag_t       global_nchan_enabled = 0;
20 ngx_flag_t       global_redis_enabled = 0;
21 ngx_flag_t       global_zstream_needed = 0;
22 ngx_flag_t       global_benchmark_enabled = 0;
23 void            *global_owner_cycle = NULL;
24 
25 #define MERGE_UNSET_CONF(conf, prev, unset, default)         \
26 if (conf == unset) {                                         \
27   conf = (prev == unset) ? default : prev;                   \
28 }
29 
30 #define MERGE_CONF(cf, prev_cf, name) if((cf)->name == NULL) { (cf)->name = (prev_cf)->name; }
31 
nchan_init_module(ngx_cycle_t * cycle)32 static ngx_int_t nchan_init_module(ngx_cycle_t *cycle) {
33   if(global_owner_cycle && global_owner_cycle != ngx_cycle) {
34     global_nchan_enabled = 0;
35     global_redis_enabled = 0;
36     global_zstream_needed = 0;
37     global_benchmark_enabled = 0;
38   }
39   global_owner_cycle = (void *)ngx_cycle;
40 
41   if(!global_nchan_enabled) {
42     return NGX_OK;
43   }
44   ngx_core_conf_t         *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
45   nchan_worker_processes = ccf->worker_processes;
46 
47   //initialize storage engines
48   nchan_store_memory.init_module(cycle);
49   if(global_benchmark_enabled) {
50     nchan_benchmark_init_module(cycle);
51   }
52   if(global_redis_enabled) {
53     nchan_store_redis.init_module(cycle);
54   }
55   return NGX_OK;
56 }
57 
nchan_init_worker(ngx_cycle_t * cycle)58 static ngx_int_t nchan_init_worker(ngx_cycle_t *cycle) {
59   if(!global_nchan_enabled) {
60     return NGX_OK;
61   }
62   if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) {
63     //not a worker, stop initializing stuff.
64     return NGX_OK;
65   }
66 
67   if(nchan_store_memory.init_worker(cycle)!=NGX_OK) {
68     return NGX_ERROR;
69   }
70   if(global_benchmark_enabled) {
71     nchan_benchmark_init_worker(cycle);
72   }
73 
74   if(global_redis_enabled && nchan_store_redis.init_worker(cycle)!=NGX_OK) {
75     return NGX_ERROR;
76   }
77 
78   nchan_websocket_publisher_llist_init();
79   nchan_output_init();
80 
81   return NGX_OK;
82 }
83 
nchan_preconfig(ngx_conf_t * cf)84 static ngx_int_t nchan_preconfig(ngx_conf_t *cf) {
85   global_owner_cycle = (void *)ngx_cycle;
86   global_nchan_enabled = 0;
87   return nchan_add_variables(cf);
88 }
89 
nchan_postconfig(ngx_conf_t * cf)90 static ngx_int_t nchan_postconfig(ngx_conf_t *cf) {
91   global_owner_cycle = (void *)ngx_cycle;
92   if(nchan_store_memory.init_postconfig(cf)!=NGX_OK) {
93     return NGX_ERROR;
94   }
95   if(global_redis_enabled && nchan_store_redis.init_postconfig(cf)!=NGX_OK) {
96     return NGX_ERROR;
97   }
98 
99 #if (NGX_ZLIB)
100   if(global_zstream_needed) {
101     nchan_main_conf_t  *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_nchan_module);
102     nchan_common_deflate_init(mcf);
103   }
104 #endif
105 
106   global_nchan_enabled = 1;
107 
108   return NGX_OK;
109 }
110 
111 //main config
nchan_create_main_conf(ngx_conf_t * cf)112 static void * nchan_create_main_conf(ngx_conf_t *cf) {
113   nchan_main_conf_t      *mcf = ngx_pcalloc(cf->pool, sizeof(*mcf));
114   if(mcf == NULL) {
115     return NGX_CONF_ERROR;
116   }
117 
118   static ngx_path_init_t nchan_temp_path = { ngx_string(NGX_HTTP_CLIENT_TEMP_PATH), { 0, 0, 0 } };
119   ngx_conf_merge_path_value(cf, &mcf->message_temp_path, NULL, &nchan_temp_path);
120 
121   nchan_store_memory.create_main_conf(cf, mcf);
122   nchan_store_redis.create_main_conf(cf, mcf);
123 
124 #if (NGX_ZLIB)
125   mcf->zlib_params.level = Z_DEFAULT_COMPRESSION;
126   mcf->zlib_params.windowBits = 10;
127   mcf->zlib_params.memLevel = 8;
128   mcf->zlib_params.strategy = Z_DEFAULT_STRATEGY;
129 #endif
130 
131   return mcf;
132 }
133 
nchan_create_srv_conf(ngx_conf_t * cf)134 static void *nchan_create_srv_conf(ngx_conf_t *cf) {
135   nchan_srv_conf_t       *scf = ngx_pcalloc(cf->pool, sizeof(*scf));
136   if(scf == NULL) {
137     return NGX_CONF_ERROR;
138   }
139   scf->redis.connect_timeout = NGX_CONF_UNSET_MSEC;
140   scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_UNSET;
141   scf->redis.master_weight = NGX_CONF_UNSET;
142   scf->redis.slave_weight = NGX_CONF_UNSET;
143   scf->redis.blacklist_count = NGX_CONF_UNSET;
144   scf->redis.blacklist = NULL;
145   scf->upstream_nchan_loc_conf = NULL;
146   return scf;
147 }
148 
nchan_merge_srv_conf(ngx_conf_t * cf,void * parent,void * child)149 static char *nchan_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) {
150   nchan_srv_conf_t       *prev = parent, *conf = child;
151   ngx_conf_merge_msec_value(conf->redis.connect_timeout, prev->redis.connect_timeout, NCHAN_DEFAULT_REDIS_NODE_CONNECT_TIMEOUT_MSEC);
152   MERGE_UNSET_CONF(conf->redis.optimize_target, prev->redis.optimize_target, NCHAN_REDIS_OPTIMIZE_UNSET, NCHAN_REDIS_OPTIMIZE_CPU);
153   ngx_conf_merge_value(conf->redis.master_weight, prev->redis.master_weight, 1);
154   ngx_conf_merge_value(conf->redis.slave_weight, prev->redis.slave_weight, 1);
155   ngx_conf_merge_value(conf->redis.blacklist_count, prev->redis.blacklist_count, 0);
156   if(conf->redis.blacklist == NULL) {
157     conf->redis.blacklist = prev->redis.blacklist;
158   }
159   return NGX_CONF_OK;
160 }
161 
162 //location config stuff
nchan_create_loc_conf(ngx_conf_t * cf)163 static void *nchan_create_loc_conf(ngx_conf_t *cf) {
164   nchan_loc_conf_t       *lcf = ngx_pcalloc(cf->pool, sizeof(*lcf));
165   if(lcf == NULL) {
166     return NGX_CONF_ERROR;
167   }
168 
169   lcf->pub.http=0;
170   lcf->pub.websocket=0;
171 
172   lcf->sub.poll=0;
173   lcf->sub.longpoll=0;
174   lcf->sub.eventsource=0;
175   lcf->sub.websocket=0;
176   lcf->sub.http_chunked=0;
177 
178   // lcf->group is already zeroed
179   lcf->group.enable_accounting = NGX_CONF_UNSET;
180 
181   lcf->shared_data_index=NGX_CONF_UNSET;
182 
183   lcf->authorize_request_url = NULL;
184   lcf->publisher_upstream_request_url = NULL;
185   lcf->unsubscribe_request_url = NULL;
186   lcf->subscribe_request_url = NULL;
187   lcf->channel_group = NULL;
188 
189   lcf->message_timeout=NGX_CONF_UNSET;
190   lcf->max_messages=NGX_CONF_UNSET;
191 
192   lcf->complex_message_timeout = NULL;
193   lcf->complex_max_messages = NULL;
194 
195   lcf->subscriber_first_message=NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET;
196 
197   lcf->subscriber_info_string=NULL;
198   lcf->subscriber_info_location=NGX_CONF_UNSET;
199 
200   lcf->subscriber_timeout=NGX_CONF_UNSET;
201   lcf->subscribe_only_existing_channel=NGX_CONF_UNSET;
202   lcf->redis_idle_channel_cache_timeout=NGX_CONF_UNSET;
203   lcf->max_channel_id_length=NGX_CONF_UNSET;
204   lcf->max_channel_subscribers=NGX_CONF_UNSET;
205   lcf->channel_timeout=NGX_CONF_UNSET;
206   lcf->storage_engine=NULL;
207 
208   lcf->websocket_ping_interval=NGX_CONF_UNSET;
209 
210   lcf->eventsource_ping.interval=NGX_CONF_UNSET;
211 
212   lcf->msg_in_etag_only = NGX_CONF_UNSET;
213 
214   lcf->allow_origin = NULL;
215   lcf->allow_credentials = NGX_CONF_UNSET;
216 
217   lcf->channel_events_channel_id = NULL;
218   lcf->channel_event_string = NULL;
219 
220   lcf->websocket_heartbeat.enabled=NGX_CONF_UNSET;
221 
222   lcf->message_compression = NCHAN_MSG_COMPRESSION_INVALID;
223 
224   lcf->longpoll_multimsg=NGX_CONF_UNSET;
225   lcf->longpoll_multimsg_use_raw_stream_separator=NGX_CONF_UNSET;
226 
227   ngx_memzero(&lcf->pub_chid, sizeof(nchan_complex_value_arr_t));
228   ngx_memzero(&lcf->sub_chid, sizeof(nchan_complex_value_arr_t));
229   ngx_memzero(&lcf->pubsub_chid, sizeof(nchan_complex_value_arr_t));
230   ngx_memzero(&lcf->last_message_id, sizeof(nchan_complex_value_arr_t));
231 
232   ngx_memzero(&lcf->redis, sizeof(lcf->redis));
233   lcf->redis.url_enabled=NGX_CONF_UNSET;
234   lcf->redis.ping_interval = NGX_CONF_UNSET;
235   lcf->redis.cluster_check_interval=NGX_CONF_UNSET;
236   lcf->redis.upstream_inheritable=NGX_CONF_UNSET;
237   lcf->redis.storage_mode = REDIS_MODE_CONF_UNSET;
238   lcf->redis.nostore_fastpublish = NGX_CONF_UNSET;
239   lcf->redis.privdata = NULL;
240   lcf->redis.nodeset = NULL;
241 
242   lcf->request_handler = NULL;
243 
244   lcf->benchmark.time = NGX_CONF_UNSET;
245   lcf->benchmark.msgs_per_minute = NGX_CONF_UNSET;
246   lcf->benchmark.msg_padding = NGX_CONF_UNSET;
247   lcf->benchmark.channels = NGX_CONF_UNSET;
248   lcf->benchmark.subscribers_per_channel = NGX_CONF_UNSET;
249   lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET;
250   lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET;
251   return lcf;
252 }
253 
create_complex_value_from_ngx_str(ngx_conf_t * cf,ngx_http_complex_value_t ** dst_cv,ngx_str_t * str)254 static char * create_complex_value_from_ngx_str(ngx_conf_t *cf, ngx_http_complex_value_t **dst_cv, ngx_str_t *str) {
255   ngx_http_complex_value_t           *cv;
256   ngx_http_compile_complex_value_t    ccv;
257 
258   cv = ngx_palloc(cf->pool, sizeof(*cv));
259   if (cv == NULL) {
260     ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "unable to allocate space for complex value");
261     return NGX_CONF_ERROR;
262   }
263 
264   ngx_memzero(&ccv, sizeof(ccv));
265 
266   ccv.cf = cf;
267   ccv.value = str;
268   ccv.complex_value = cv;
269 
270   if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
271     return NGX_CONF_ERROR;
272   }
273 
274   *dst_cv = cv;
275   return NGX_CONF_OK;
276 }
277 
is_pub_location(nchan_loc_conf_t * lcf)278 static int is_pub_location(nchan_loc_conf_t *lcf) {
279   return lcf->pub.http || lcf->pub.websocket;
280 }
is_sub_location(nchan_loc_conf_t * lcf)281 static int is_sub_location(nchan_loc_conf_t *lcf) {
282   nchan_conf_subscriber_types_t s = lcf->sub;
283   return s.poll || s.http_raw_stream || s.longpoll || s.http_chunked || s.http_multipart || s.eventsource || s.websocket;
284 }
is_group_location(nchan_loc_conf_t * lcf)285 static int is_group_location(nchan_loc_conf_t *lcf) {
286   return lcf->group.get || lcf->group.set || lcf->group.delete;
287 }
288 
is_valid_location(ngx_conf_t * cf,nchan_loc_conf_t * lcf)289 static int is_valid_location(ngx_conf_t *cf, nchan_loc_conf_t *lcf) {
290 
291   if(is_group_location(lcf)) {
292     if(is_pub_location(lcf) && is_sub_location(lcf)) {
293       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a publisher and subscriber location and also be a group access location (nchan_group + nchan_publisher, nchan_subscriber or nchan_pubsub)");
294       return 0;
295     }
296     else if(is_pub_location(lcf)) {
297       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a publisher location and also be a group access location (nchan_group + nchan_publisher)");
298       return 0;
299     }
300     else if(is_sub_location(lcf)) {
301       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "Can't have a subscriber location and also be a group access location (nchan_group + nchan_subscriber)");
302       return 0;
303     }
304   }
305   return 1;
306 }
307 
ngx_conf_set_redis_upstream(ngx_conf_t * cf,ngx_str_t * url,void * conf)308 static char *ngx_conf_set_redis_upstream(ngx_conf_t *cf, ngx_str_t *url, void *conf) {
309   ngx_url_t             upstream_url;
310   nchan_loc_conf_t     *lcf = conf;
311   if (lcf->redis.upstream) {
312     return "is duplicate";
313   }
314 
315   ngx_memzero(&upstream_url, sizeof(upstream_url));
316   upstream_url.url = *url;
317   upstream_url.no_resolve = 1;
318 
319   if ((lcf->redis.upstream = ngx_http_upstream_add(cf, &upstream_url, 0)) == NULL) {
320     return NGX_CONF_ERROR;
321   }
322 
323   lcf->redis.enabled = 1;
324   global_redis_enabled = 1;
325   nchan_store_redis_add_active_loc_conf(cf, lcf);
326 
327   return NGX_CONF_OK;
328 }
329 
nchan_setup_handler(ngx_conf_t * cf,ngx_int_t (* handler)(ngx_http_request_t *))330 static char *nchan_setup_handler(ngx_conf_t *cf, ngx_int_t (*handler)(ngx_http_request_t *)) {
331   ngx_http_core_loc_conf_t       *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
332   //nchan_loc_conf_t               *plcf = conf;
333   clcf->handler = handler;
334   clcf->if_modified_since = NGX_HTTP_IMS_OFF;
335 
336   return NGX_CONF_OK;
337 }
338 
nchan_loc_conf_get_upstream_lcf(nchan_loc_conf_t * conf,nchan_loc_conf_t * prev)339 static nchan_loc_conf_t *nchan_loc_conf_get_upstream_lcf(nchan_loc_conf_t *conf, nchan_loc_conf_t *prev) {
340   nchan_redis_conf_t *rcf = &conf->redis, *prev_rcf = &prev->redis;
341   if(rcf->upstream == prev_rcf->upstream || rcf->upstream == NULL) {
342     //same or no upstream, so don't bother
343     return NULL;
344   }
345   else {
346     assert(rcf->upstream);
347     nchan_srv_conf_t      *upstream_scf = ngx_http_conf_upstream_srv_conf(rcf->upstream, ngx_nchan_module);
348     if(upstream_scf && upstream_scf->upstream_nchan_loc_conf) {
349       return upstream_scf->upstream_nchan_loc_conf;
350     }
351     else {
352       //ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "nchan upstream srv_conf loc_conf ptr is null");
353       return NULL;
354     }
355   }
356 }
357 
358 
nchan_merge_loc_conf(ngx_conf_t * cf,void * parent,void * child)359 static char * nchan_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
360   nchan_loc_conf_t       *prev = parent, *conf = child;
361   nchan_loc_conf_t       *up = nchan_loc_conf_get_upstream_lcf(conf, prev);
362   //publisher types
363   ngx_conf_merge_bitmask_value(conf->pub.http, prev->pub.http, 0);
364   ngx_conf_merge_bitmask_value(conf->pub.websocket, prev->pub.websocket, 0);
365 
366   //subscriber types
367   ngx_conf_merge_bitmask_value(conf->sub.poll, prev->sub.poll, 0);
368   ngx_conf_merge_bitmask_value(conf->sub.longpoll, prev->sub.longpoll, 0);
369   ngx_conf_merge_bitmask_value(conf->sub.eventsource, prev->sub.eventsource, 0);
370   ngx_conf_merge_bitmask_value(conf->sub.http_chunked, prev->sub.http_chunked, 0);
371   ngx_conf_merge_bitmask_value(conf->sub.websocket, prev->sub.websocket, 0);
372 
373   //group request types
374   ngx_conf_merge_bitmask_value(conf->group.get, prev->group.get, 0);
375   ngx_conf_merge_bitmask_value(conf->group.set, prev->group.set, 0);
376   ngx_conf_merge_bitmask_value(conf->group.delete, prev->group.delete, 0);
377 
378   ngx_conf_merge_value(conf->group.enable_accounting, prev->group.enable_accounting, 0);
379 
380   //validate location
381   if(!is_valid_location(cf, conf)) {
382     return NGX_CONF_ERROR;
383   }
384 
385   MERGE_UNSET_CONF(conf->message_compression, prev->message_compression, NCHAN_MSG_COMPRESSION_INVALID, NCHAN_MSG_NO_COMPRESSION);
386 
387   ngx_conf_merge_sec_value(conf->message_timeout, prev->message_timeout, NCHAN_DEFAULT_MESSAGE_TIMEOUT);
388   ngx_conf_merge_value(conf->max_messages, prev->max_messages, NCHAN_DEFAULT_MAX_MESSAGES);
389 
390   MERGE_CONF(conf, prev, complex_message_timeout);
391   MERGE_CONF(conf, prev, complex_max_messages);
392 
393   if (conf->subscriber_first_message == NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET) {
394     conf->subscriber_first_message = (prev->subscriber_first_message == NCHAN_SUBSCRIBER_FIRST_MESSAGE_UNSET) ? NCHAN_SUBSCRIBER_DEFAULT_FIRST_MESSAGE : prev->subscriber_first_message;
395   }
396 
397   MERGE_CONF(conf, prev, subscriber_info_string);
398   if(conf->subscriber_info_string == NULL) { //still null? use the default string
399     if(create_complex_value_from_ngx_str(cf, &conf->subscriber_info_string, &DEFAULT_SUBSCRIBER_INFO_STRING) == NGX_CONF_ERROR) {
400       return NGX_CONF_ERROR;
401     }
402   }
403 
404   ngx_conf_merge_value(conf->subscriber_info_location, prev->subscriber_info_location, 0);
405 
406   ngx_conf_merge_sec_value(conf->websocket_ping_interval, prev->websocket_ping_interval, NCHAN_DEFAULT_SUBSCRIBER_PING_INTERVAL);
407 
408   ngx_conf_merge_sec_value(conf->eventsource_ping.interval, prev->eventsource_ping.interval, NCHAN_DEFAULT_SUBSCRIBER_PING_INTERVAL);
409   ngx_conf_merge_str_value(conf->eventsource_ping.data, prev->eventsource_ping.comment, "");
410   ngx_conf_merge_str_value(conf->eventsource_ping.event, prev->eventsource_ping.event, "ping");
411   ngx_conf_merge_str_value(conf->eventsource_ping.data, prev->eventsource_ping.data, "");
412 
413   ngx_conf_merge_sec_value(conf->subscriber_timeout, prev->subscriber_timeout, NCHAN_DEFAULT_SUBSCRIBER_TIMEOUT);
414   ngx_conf_merge_sec_value(conf->redis_idle_channel_cache_timeout, prev->redis_idle_channel_cache_timeout, NCHAN_DEFAULT_REDIS_IDLE_CHANNEL_CACHE_TIMEOUT);
415 
416   ngx_conf_merge_value(conf->subscribe_only_existing_channel, prev->subscribe_only_existing_channel, 0);
417   ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NCHAN_MAX_CHANNEL_ID_LENGTH);
418   ngx_conf_merge_value(conf->max_channel_subscribers, prev->max_channel_subscribers, 0);
419   ngx_conf_merge_value(conf->channel_timeout, prev->channel_timeout, NCHAN_DEFAULT_CHANNEL_TIMEOUT);
420 
421   ngx_conf_merge_str_value(conf->subscriber_http_raw_stream_separator, prev->subscriber_http_raw_stream_separator, "\n");
422 
423   ngx_conf_merge_str_value(conf->channel_id_split_delimiter, prev->channel_id_split_delimiter, "");
424   MERGE_CONF(conf, prev, allow_origin);
425   ngx_conf_merge_value(conf->allow_credentials, prev->allow_credentials, 1);
426   ngx_conf_merge_str_value(conf->eventsource_event, prev->eventsource_event, "");
427   ngx_conf_merge_str_value(conf->custom_msgtag_header, prev->custom_msgtag_header, "");
428   ngx_conf_merge_value(conf->msg_in_etag_only, prev->msg_in_etag_only, 0);
429   ngx_conf_merge_value(conf->longpoll_multimsg, prev->longpoll_multimsg, 0);
430   ngx_conf_merge_value(conf->longpoll_multimsg_use_raw_stream_separator, prev->longpoll_multimsg_use_raw_stream_separator, 0);
431 
432   ngx_conf_merge_value(conf->websocket_heartbeat.enabled, prev->websocket_heartbeat.enabled, 0);
433   MERGE_CONF(conf, prev, websocket_heartbeat.in);
434   MERGE_CONF(conf, prev, websocket_heartbeat.out);
435 
436   MERGE_CONF(conf, prev, channel_events_channel_id);
437   MERGE_CONF(conf, prev, channel_event_string);
438 
439   if(conf->channel_event_string == NULL) { //still null? use the default string
440     if(create_complex_value_from_ngx_str(cf, &conf->channel_event_string, &DEFAULT_CHANNEL_EVENT_STRING) == NGX_CONF_ERROR) {
441       return NGX_CONF_ERROR;
442     }
443   }
444 
445   if(conf->storage_engine == NULL) {
446     conf->storage_engine = prev->storage_engine ? prev->storage_engine : default_storage_engine;
447   }
448 
449   MERGE_CONF(conf, prev, authorize_request_url);
450   MERGE_CONF(conf, prev, publisher_upstream_request_url);
451   MERGE_CONF(conf, prev, unsubscribe_request_url);
452   MERGE_CONF(conf, prev, subscribe_request_url);
453   MERGE_CONF(conf, prev, channel_group);
454 
455   MERGE_CONF(conf, prev, group.max_channels);
456   MERGE_CONF(conf, prev, group.max_subscribers);
457   MERGE_CONF(conf, prev, group.max_messages);
458   MERGE_CONF(conf, prev, group.max_messages_shm_bytes);
459   MERGE_CONF(conf, prev, group.max_messages_file_bytes);
460 
461   if(conf->pub_chid.n == 0) {
462     conf->pub_chid = prev->pub_chid;
463   }
464   if(conf->sub_chid.n == 0) {
465     conf->sub_chid = prev->sub_chid;
466   }
467   if(conf->pubsub_chid.n == 0) {
468     conf->pubsub_chid = prev->pubsub_chid;
469   }
470   if(conf->last_message_id.n == 0) {
471     conf->last_message_id = prev->last_message_id;
472   }
473   if(conf->last_message_id.n == 0) { //if it's still null
474     ngx_str_t      first_choice_msgid = ngx_string("$http_last_event_id");
475     ngx_str_t      second_choice_msgid = ngx_string("$arg_last_event_id");
476 
477     if(create_complex_value_from_ngx_str(cf, &conf->last_message_id.cv[0], &first_choice_msgid) == NGX_CONF_ERROR) {
478       return NGX_CONF_ERROR;
479     }
480     if(create_complex_value_from_ngx_str(cf, &conf->last_message_id.cv[1], &second_choice_msgid) == NGX_CONF_ERROR) {
481       return NGX_CONF_ERROR;
482     }
483     conf->last_message_id.n = 2;
484   }
485 
486   ngx_conf_merge_value(conf->redis.url_enabled, prev->redis.url_enabled, 0);
487 
488   ngx_conf_merge_value(conf->redis.upstream_inheritable, prev->redis.upstream_inheritable, 0);
489   ngx_conf_merge_str_value(conf->redis.url, prev->redis.url, NCHAN_REDIS_DEFAULT_URL);
490 
491   if(up && up->redis.namespace.len > 0) { //upstream has a namespace set
492     ngx_conf_merge_str_value(conf->redis.namespace, up->redis.namespace, "");
493   }
494   else {
495     ngx_conf_merge_str_value(conf->redis.namespace, prev->redis.namespace, "");
496   }
497 
498   if(up)
499     ngx_conf_merge_value(conf->redis.ping_interval, up->redis.ping_interval, NGX_CONF_UNSET);
500   ngx_conf_merge_value(conf->redis.ping_interval, prev->redis.ping_interval, NCHAN_REDIS_DEFAULT_PING_INTERVAL_TIME);
501 
502   if(up)
503     ngx_conf_merge_value(conf->redis.cluster_check_interval, up->redis.cluster_check_interval, NGX_CONF_UNSET);
504   ngx_conf_merge_value(conf->redis.cluster_check_interval, prev->redis.cluster_check_interval, NCHAN_REDIS_DEFAULT_CLUSTER_CHECK_INTERVAL_TIME);
505 
506   if(up)
507     ngx_conf_merge_value(conf->redis.nostore_fastpublish, up->redis.nostore_fastpublish, NGX_CONF_UNSET);
508   ngx_conf_merge_value(conf->redis.nostore_fastpublish, prev->redis.nostore_fastpublish, 0);
509 
510   if(conf->redis.url_enabled) {
511     conf->redis.enabled = 1;
512     nchan_store_redis_add_active_loc_conf(cf, conf);
513   }
514   if(conf->redis.upstream_inheritable && !conf->redis.upstream && prev->redis.upstream && prev->redis.upstream_url.len > 0) {
515     conf->redis.upstream_url = prev->redis.upstream_url;
516     ngx_conf_set_redis_upstream(cf, &conf->redis.upstream_url, conf);
517   }
518 
519   if(up)
520     MERGE_UNSET_CONF(conf->redis.storage_mode, up->redis.storage_mode, REDIS_MODE_CONF_UNSET, REDIS_MODE_CONF_UNSET);
521   MERGE_UNSET_CONF(conf->redis.storage_mode, prev->redis.storage_mode, REDIS_MODE_CONF_UNSET, REDIS_MODE_DISTRIBUTED);
522 
523   if(prev->request_handler != NULL && conf->request_handler == NULL) {
524     conf->request_handler = prev->request_handler;
525   }
526   if(conf->request_handler != NULL) {
527     nchan_setup_handler(cf, conf->request_handler);
528   }
529 
530   ngx_conf_merge_value(conf->benchmark.time, prev->benchmark.time, 10);
531   ngx_conf_merge_value(conf->benchmark.msgs_per_minute, prev->benchmark.msgs_per_minute, 120);
532   ngx_conf_merge_value(conf->benchmark.msg_padding, prev->benchmark.msg_padding, 0);
533   ngx_conf_merge_value(conf->benchmark.channels, prev->benchmark.channels, 1000);
534   ngx_conf_merge_value(conf->benchmark.subscribers_per_channel, prev->benchmark.subscribers_per_channel, 100);
535   MERGE_UNSET_CONF(conf->benchmark.subscriber_distribution, prev->benchmark.subscriber_distribution, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM);
536   MERGE_UNSET_CONF(conf->benchmark.publisher_distribution, prev->benchmark.publisher_distribution, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET, NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM);
537 
538   return NGX_CONF_OK;
539 }
540 
nchan_set_storage_engine(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)541 static char *nchan_set_storage_engine(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
542   nchan_loc_conf_t     *lcf = conf;
543   ngx_str_t            *val = &((ngx_str_t *) cf->args->elts)[1];
544 
545   if(nchan_strmatch(val, 1, "memory")) {
546     lcf->storage_engine = &nchan_store_memory;
547   }
548   else if(nchan_strmatch(val, 1, "redis")) {
549     lcf->storage_engine = &nchan_store_redis;
550     global_redis_enabled = 1;
551   }
552   else {
553     ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "invalid %V value: %V", &cmd->name, val);
554     return NGX_CONF_ERROR;
555   }
556 
557   return NGX_CONF_OK;
558 }
559 
560 #define WEBSOCKET_STRINGS "websocket", "ws", "websockets"
561 #define WEBSOCKET_STRINGS_N 3
562 
563 #define EVENTSOURCE_STRINGS "eventsource", "event-source", "es", "sse"
564 #define EVENTSOURCE_STRINGS_N 4
565 
566 #define HTTP_CHUNKED_STRINGS "chunked", "http-chunked"
567 #define HTTP_CHUNKED_STRINGS_N 2
568 
569 #define HTTP_MULTIPART_STRINGS "multipart", "multipart/mixed", "http-multipart", "multipart-mixed"
570 #define HTTP_MULTIPART_STRINGS_N 4
571 
572 #define LONGPOLL_STRINGS "longpoll", "long-poll"
573 #define LONGPOLL_STRINGS_N 2
574 
575 #define INTERVALPOLL_STRINGS "poll", "interval-poll", "intervalpoll", "http"
576 #define INTERVALPOLL_STRINGS_N 4
577 
578 #define HTTP_RAW_STREAM_STRINGS "http-raw-stream"
579 #define HTTP_RAW_STREAM_STRINGS_N 1
580 
581 #define DISABLED_STRINGS "none", "off", "disabled"
582 #define DISABLED_STRINGS_N 3
583 
nchan_publisher_directive_parse(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,ngx_int_t fail)584 static char *nchan_publisher_directive_parse(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_int_t fail) {
585   nchan_loc_conf_t     *lcf = conf;
586   ngx_str_t            *val;
587   ngx_uint_t            i;
588 
589 
590   nchan_conf_publisher_types_t *pubt = &lcf->pub;
591 
592   if(cf->args->nelts == 1){ //no arguments
593     pubt->http=1;
594     pubt->websocket=1;
595   }
596   else {
597     for(i=1; i < cf->args->nelts; i++) {
598       val = &((ngx_str_t *) cf->args->elts)[i];
599       if(nchan_strmatch(val, 1, "http")) {
600         pubt->http=1;
601       }
602       else if(nchan_strmatch(val, WEBSOCKET_STRINGS_N, WEBSOCKET_STRINGS)) {
603         pubt->websocket=1;
604       }
605       else{
606          if(fail) {
607           ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
608          }
609         return NGX_CONF_ERROR;
610       }
611     }
612   }
613 
614   if(!is_valid_location(cf, lcf)) {
615     return NGX_CONF_ERROR;
616   }
617   lcf->request_handler = &nchan_pubsub_handler;
618   return NGX_CONF_OK;
619 }
620 
nchan_publisher_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)621 static char *nchan_publisher_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
622   return nchan_publisher_directive_parse(cf, cmd, conf, 1);
623 }
624 
nchan_subscriber_directive_parse(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,ngx_int_t fail)625 static char *nchan_subscriber_directive_parse(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, ngx_int_t fail) {
626   nchan_loc_conf_t     *lcf = conf;
627   ngx_str_t            *val;
628   ngx_uint_t            i;
629 
630   nchan_conf_subscriber_types_t *subt = &lcf->sub;
631 
632   if(cf->args->nelts == 1){ //no arguments
633     subt->poll=0;
634     subt->http_raw_stream = 0;
635     subt->longpoll=1;
636     subt->websocket=1;
637     subt->eventsource=1;
638     subt->http_chunked=1;
639     subt->http_multipart=1;
640   }
641   else {
642     for(i=1; i < cf->args->nelts; i++) {
643       val = &((ngx_str_t *) cf->args->elts)[i];
644       if(nchan_strmatch(val, LONGPOLL_STRINGS_N, LONGPOLL_STRINGS)) {
645         subt->longpoll=1;
646       }
647       else if(nchan_strmatch(val, INTERVALPOLL_STRINGS_N, INTERVALPOLL_STRINGS)) {
648         subt->poll=1;
649       }
650       else if(nchan_strmatch(val, HTTP_RAW_STREAM_STRINGS_N, HTTP_RAW_STREAM_STRINGS)) {
651         subt->http_raw_stream=1;
652       }
653       else if(nchan_strmatch(val, HTTP_CHUNKED_STRINGS_N, HTTP_CHUNKED_STRINGS)) {
654         subt->http_chunked=1;
655       }
656       else if(nchan_strmatch(val, HTTP_MULTIPART_STRINGS_N, HTTP_MULTIPART_STRINGS)) {
657         subt->http_multipart=1;
658       }
659       else if(nchan_strmatch(val, WEBSOCKET_STRINGS_N, WEBSOCKET_STRINGS)) {
660         subt->websocket=1;
661       }
662       else if(nchan_strmatch(val, EVENTSOURCE_STRINGS_N, EVENTSOURCE_STRINGS)) {
663         subt->eventsource=1;
664       }
665       else if(nchan_strmatch(val, DISABLED_STRINGS_N, DISABLED_STRINGS)) {
666         subt->poll=0;
667         subt->longpoll=0;
668         subt->websocket=0;
669         subt->eventsource=0;
670         subt->http_chunked=0;
671         subt->http_multipart=0;
672       }
673       else {
674         if(fail) {
675           ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
676         }
677         return NGX_CONF_ERROR;
678       }
679     }
680   }
681 
682   if(!is_valid_location(cf, lcf)) {
683     return NGX_CONF_ERROR;
684   }
685   lcf->request_handler = &nchan_pubsub_handler;
686   return NGX_CONF_OK;
687 }
688 
nchan_subscriber_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)689 static char *nchan_subscriber_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
690   return nchan_subscriber_directive_parse(cf, cmd, conf, 1);
691 }
692 
nchan_pubsub_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)693 static char *nchan_pubsub_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
694   ngx_str_t            *val;
695   ngx_uint_t            i;
696   nchan_loc_conf_t     *lcf = conf;
697   nchan_publisher_directive_parse(cf, cmd, conf, 0);
698   nchan_subscriber_directive_parse(cf, cmd, conf, 0);
699   for(i=1; i < cf->args->nelts; i++) {
700     val = &((ngx_str_t *) cf->args->elts)[i];
701     if(! nchan_strmatch(val,
702       WEBSOCKET_STRINGS_N + EVENTSOURCE_STRINGS_N + HTTP_CHUNKED_STRINGS_N + HTTP_MULTIPART_STRINGS_N + LONGPOLL_STRINGS_N + INTERVALPOLL_STRINGS_N + HTTP_RAW_STREAM_STRINGS_N + DISABLED_STRINGS_N,
703       WEBSOCKET_STRINGS, EVENTSOURCE_STRINGS, HTTP_CHUNKED_STRINGS, HTTP_MULTIPART_STRINGS, LONGPOLL_STRINGS, INTERVALPOLL_STRINGS, HTTP_RAW_STREAM_STRINGS, DISABLED_STRINGS)) {
704       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
705       return NGX_CONF_ERROR;
706     }
707   }
708 
709   if(!is_valid_location(cf, lcf)) {
710     return NGX_CONF_ERROR;
711   }
712 
713   return NGX_CONF_OK;
714 }
715 
nchan_subscriber_info_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)716 static char *nchan_subscriber_info_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
717   nchan_loc_conf_t     *lcf = conf;
718   nchan_conf_subscriber_types_t *subt = &lcf->sub;
719 
720   // doesn't make sense to have longpoll be the default HTTP subscriber, since channel info locations are likely to be curl'd by developers
721   subt->poll=0;
722   subt->http_raw_stream = 1;
723   subt->longpoll=0;
724   subt->websocket=1;
725   subt->eventsource=1;
726   subt->http_chunked=1;
727   subt->http_multipart=1;
728 
729   lcf->subscriber_info_location = 1;
730 
731   lcf->message_timeout=10;
732   lcf->complex_message_timeout = NULL;
733 
734   lcf->request_handler = &nchan_subscriber_info_handler;
735   return NGX_CONF_OK;
736 }
737 
nchan_group_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)738 static char *nchan_group_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
739   nchan_loc_conf_t     *lcf = conf;
740   ngx_str_t            *val;
741   ngx_uint_t            i;
742   nchan_conf_group_t   *group = &lcf->group;
743 
744   if(cf->args->nelts == 1){ //no arguments
745     group->get=1;
746     group->set=1;
747     group->delete=1;
748   }
749   else {
750     for(i=1; i < cf->args->nelts; i++) {
751       val = &((ngx_str_t *) cf->args->elts)[i];
752       if(nchan_strmatch(val, 1, "get")) {
753         group->get=1;
754       }
755       else if(nchan_strmatch(val, 1, "set")) {
756         group->set=1;
757       }
758       else if(nchan_strmatch(val, 1, "delete")) {
759         group->delete=1;
760       }
761       else if(nchan_strmatch(val, 1, "off")) {
762         group->get=0;
763         group->set=0;
764         group->delete=0;
765       }
766       else {
767         ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V", &cmd->name, val);
768         return NGX_CONF_ERROR;
769       }
770     }
771   }
772 
773   if(!is_valid_location(cf, lcf)) {
774     return NGX_CONF_ERROR;
775   }
776   lcf->request_handler = &nchan_group_handler;
777   return NGX_CONF_OK;
778 }
779 
set_complex_value(ngx_conf_t * cf,ngx_http_complex_value_t ** cv,char * val)780 static ngx_int_t set_complex_value(ngx_conf_t *cf, ngx_http_complex_value_t **cv, char *val) {
781   ngx_http_compile_complex_value_t    ccv;
782   ngx_str_t                          *value = ngx_palloc(cf->pool, sizeof(ngx_str_t));;
783   if(value == NULL) {
784     return NGX_ERROR;
785   }
786   value->data = (u_char *)val;
787   value->len = strlen(val);
788   *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
789   if (*cv == NULL) {
790     return NGX_ERROR;
791   }
792   ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
793   ccv.cf = cf;
794   ccv.value = value;
795   ccv.complex_value = *cv;
796   if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
797     return NGX_ERROR;
798   }
799 
800   return NGX_OK;
801 }
802 
set_complex_value_array_size1(ngx_conf_t * cf,nchan_complex_value_arr_t * chid,char * val)803 static ngx_int_t set_complex_value_array_size1(ngx_conf_t *cf, nchan_complex_value_arr_t *chid, char *val) {
804   chid->n = 1;
805   return set_complex_value(cf, &chid->cv[0], val);
806 }
807 
nchan_benchmark_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)808 static char *nchan_benchmark_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
809   nchan_loc_conf_t     *lcf = conf;
810   global_benchmark_enabled = 1;
811   lcf->request_handler = &nchan_benchmark_handler;
812 
813 
814   //set group
815   if(set_complex_value(cf, &lcf->channel_group, "benchmark") != NGX_OK) {
816     return "error setting benchmark channel group";
817   }
818   //set pub and sub channel ids
819   if(set_complex_value_array_size1(cf, &lcf->pub_chid, "control") != NGX_OK) {
820     return "error setting benchmark control channel";
821   }
822   if(set_complex_value_array_size1(cf, &lcf->sub_chid, "data") != NGX_OK) {
823     return "error setting benchmark data channel";
824   }
825 
826   lcf->sub.websocket = 1;
827   lcf->pub.websocket = 1;
828 
829   return NGX_CONF_OK;
830 }
831 
nchan_benchmark_subscriber_distribution_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)832 static char *nchan_benchmark_subscriber_distribution_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
833   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
834   nchan_loc_conf_t   *lcf = conf;
835   if(nchan_strmatch(val, 1, "random")) {
836     lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM;
837   }
838   else if(nchan_strmatch(val, 2, "optimal", "best")) {
839     lcf->benchmark.subscriber_distribution = NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_OPTIMAL;
840   }
841   else {
842     return "invalid value, must be \"random\" or \"optimal\"";
843   }
844   return NGX_CONF_OK;
845 }
nchan_benchmark_publisher_distribution_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)846 static char *nchan_benchmark_publisher_distribution_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
847   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
848   nchan_loc_conf_t   *lcf = conf;
849   if(nchan_strmatch(val, 1, "random")) {
850     lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM;
851   }
852   else if(nchan_strmatch(val, 2, "optimal", "best")) {
853     lcf->benchmark.publisher_distribution = NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_OPTIMAL;
854   }
855   else {
856     return "invalid value, must be \"random\" or \"optimal\"";
857   }
858   return NGX_CONF_OK;
859 }
860 
nchan_subscriber_first_message_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)861 static char *nchan_subscriber_first_message_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
862   nchan_loc_conf_t   *lcf = (nchan_loc_conf_t *)conf;
863   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
864   if(nchan_strmatch(val, 1, "oldest")) {
865     lcf->subscriber_first_message = 1;
866   }
867   else if(nchan_strmatch(val, 1, "newest")) {
868     lcf->subscriber_first_message = 0;
869   }
870   else {
871     //maybe a number?
872     ngx_str_t num = *val;
873     int       sign = 1;
874     ngx_int_t n;
875 
876     if(num.len > 0 && num.data[0] == '-') {
877       num.len--;
878       num.data++;
879       sign = -1;
880     }
881     if((n = ngx_atoi(num.data, num.len)) == NGX_ERROR) {
882       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V, must be 'oldest', 'newest', or a number", &cmd->name, val);
883       return NGX_CONF_ERROR;
884     }
885     if (n > 32) {
886       ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid %V value: %V, must be 'oldest', 'newest', or a number between -32 and 32", &cmd->name, val);
887       return NGX_CONF_ERROR;
888     }
889     lcf->subscriber_first_message = n * sign;
890   }
891   return NGX_CONF_OK;
892 }
893 
nchan_websocket_heartbeat_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)894 static char *nchan_websocket_heartbeat_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
895   nchan_loc_conf_t   *lcf = (nchan_loc_conf_t *)conf;
896   ngx_str_t          *heartbeat_in = &((ngx_str_t *) cf->args->elts)[1];
897   ngx_str_t          *heartbeat_out = &((ngx_str_t *) cf->args->elts)[2];
898   ngx_str_t          *in, *out;
899   lcf->websocket_heartbeat.enabled = 1;
900 
901   in = ngx_pcalloc(cf->pool, sizeof(ngx_str_t) + heartbeat_in->len);
902   in->data = (u_char *)&in[1];
903   in->len = heartbeat_in->len;
904   ngx_memcpy(in->data, heartbeat_in->data, heartbeat_in->len);
905   lcf->websocket_heartbeat.in = in;
906 
907   out = ngx_pcalloc(cf->pool, sizeof(ngx_str_t) + heartbeat_out->len);
908   out->data = (u_char *)&out[1];
909   out->len = heartbeat_out->len;
910   ngx_memcpy(out->data, heartbeat_out->data, heartbeat_out->len);
911   lcf->websocket_heartbeat.out = out;
912 
913   return NGX_CONF_OK;
914 }
915 
nchan_conf_deflate_compression_level_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)916 static char *nchan_conf_deflate_compression_level_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
917 #if (NGX_ZLIB)
918   nchan_main_conf_t  *mcf = (nchan_main_conf_t *)conf;
919   ngx_int_t           np;
920   ngx_str_t           *value = cf->args->elts;
921   np = ngx_atoi(value[1].data, value[1].len);
922   if (np == NGX_ERROR) {
923     return "invalid number";
924   }
925   if(np < 0 || np > 9) {
926     return "must be between 0 and 9";
927   }
928 
929   mcf->zlib_params.level = np;
930 #endif
931   return NGX_CONF_OK;
932 }
933 
nchan_conf_deflate_compression_strategy_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)934 static char *nchan_conf_deflate_compression_strategy_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
935 #if (NGX_ZLIB)
936   nchan_main_conf_t   *mcf = (nchan_main_conf_t *)conf;
937   ngx_str_t           *val = cf->args->elts;
938   if(nchan_strmatch(val, 1, "default")) {
939     mcf->zlib_params.strategy = Z_DEFAULT_STRATEGY;
940   }
941   else if(nchan_strmatch(val, 1, "filtered")) {
942     mcf->zlib_params.strategy = Z_FILTERED;
943   }
944   else if(nchan_strmatch(val, 1, "huffman-only")) {
945     mcf->zlib_params.strategy = Z_HUFFMAN_ONLY;
946   }
947   else if(nchan_strmatch(val, 1, "rle")) {
948     mcf->zlib_params.strategy = Z_RLE;
949   }
950   else if(nchan_strmatch(val, 1, "fixed")) {
951     mcf->zlib_params.strategy = Z_FIXED;
952   }
953   else {
954     return "invalid compression strategy";
955   }
956 #endif
957   return NGX_CONF_OK;
958 }
959 
nchan_conf_deflate_compression_window_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)960 static char *nchan_conf_deflate_compression_window_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
961 #if (NGX_ZLIB)
962   nchan_main_conf_t  *mcf = (nchan_main_conf_t *)conf;
963   ngx_int_t           np;
964   ngx_str_t           *value = cf->args->elts;
965   np = ngx_atoi(value[1].data, value[1].len);
966   if (np == NGX_ERROR) {
967     return "invalid number";
968   }
969   if(np < 9 || np > 15) {
970     return "must be between 9 and 15";
971   }
972 
973   mcf->zlib_params.windowBits = np;
974 #endif
975   return NGX_CONF_OK;
976 }
977 
nchan_conf_deflate_compression_memlevel_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)978 static char *nchan_conf_deflate_compression_memlevel_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
979 #if (NGX_ZLIB)
980   nchan_main_conf_t  *mcf = (nchan_main_conf_t *)conf;
981   ngx_int_t           np;
982   ngx_str_t           *value = cf->args->elts;
983   np = ngx_atoi(value[1].data, value[1].len);
984   if (np == NGX_ERROR) {
985     return "invalid number";
986   }
987   if(np < 1 || np > 9) {
988     return "must be between 1 and 9";
989   }
990 
991   mcf->zlib_params.memLevel = np;
992 #endif
993   return NGX_CONF_OK;
994 }
995 
996 
nchan_exit_worker(ngx_cycle_t * cycle)997 static void nchan_exit_worker(ngx_cycle_t *cycle) {
998   if(!global_nchan_enabled) {
999     return;
1000   }
1001   if(global_redis_enabled) {
1002     redis_store_prepare_to_exit_worker();
1003   }
1004   nchan_store_memory.exit_worker(cycle);
1005   if(global_redis_enabled) {
1006     nchan_store_redis.exit_worker(cycle);
1007   }
1008   nchan_output_shutdown();
1009 #if (NGX_ZLIB)
1010   if(global_zstream_needed) {
1011     nchan_common_deflate_shutdown();
1012   }
1013 #endif
1014 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
1015   subscriber_debug_assert_isempty();
1016 #endif
1017 }
1018 
nchan_exit_master(ngx_cycle_t * cycle)1019 static void nchan_exit_master(ngx_cycle_t *cycle) {
1020   if(!global_nchan_enabled) {
1021     return;
1022   }
1023   if(global_benchmark_enabled) {
1024     nchan_benchmark_exit_master(cycle);
1025   }
1026   nchan_store_memory.exit_master(cycle);
1027   if(global_redis_enabled) {
1028     nchan_store_redis.exit_master(cycle);
1029   }
1030 #if (NGX_ZLIB)
1031   if(global_zstream_needed) {
1032     nchan_common_deflate_shutdown();
1033   }
1034 #endif
1035 }
1036 
nchan_set_complex_value_array(ngx_conf_t * cf,ngx_command_t * cmd,void * conf,nchan_complex_value_arr_t * chid)1037 static char *nchan_set_complex_value_array(ngx_conf_t *cf, ngx_command_t *cmd, void *conf, nchan_complex_value_arr_t *chid) {
1038   ngx_uint_t                          i;
1039   ngx_str_t                          *value;
1040   ngx_http_complex_value_t          **cv;
1041   ngx_http_compile_complex_value_t    ccv;
1042 
1043   chid->n = cf->args->nelts - 1;
1044   for(i=1; i < cf->args->nelts && i <= NCHAN_COMPLEX_VALUE_ARRAY_MAX; i++) {
1045     value = &((ngx_str_t *) cf->args->elts)[i];
1046 
1047     cv = &chid->cv[i-1];
1048     *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
1049     if (*cv == NULL) {
1050       return NGX_CONF_ERROR;
1051     }
1052 
1053     ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
1054     ccv.cf = cf;
1055     ccv.value = value;
1056     ccv.complex_value = *cv;
1057 
1058     if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
1059       return NGX_CONF_ERROR;
1060     }
1061   }
1062 
1063   return NGX_CONF_OK;
1064 }
1065 
nchan_set_pub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1066 static char *nchan_set_pub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1067   return nchan_set_complex_value_array(cf, cmd, conf, &((nchan_loc_conf_t *)conf)->pub_chid);
1068 }
1069 
nchan_set_sub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1070 static char *nchan_set_sub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1071   nchan_loc_conf_t *lcf = conf;
1072   return nchan_set_complex_value_array(cf, cmd, conf, &lcf->sub_chid);
1073 }
1074 
nchan_set_pubsub_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1075 static char *nchan_set_pubsub_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1076   nchan_loc_conf_t *lcf = conf;
1077   return nchan_set_complex_value_array(cf, cmd, conf, &lcf->pubsub_chid);
1078 }
1079 
nchan_subscriber_last_message_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1080 static char *nchan_subscriber_last_message_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1081   return nchan_set_complex_value_array(cf, cmd, conf, &((nchan_loc_conf_t *)conf)->last_message_id);
1082 }
1083 
nchan_set_channel_events_channel_id(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1084 static char *nchan_set_channel_events_channel_id(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1085   ngx_str_t                          *value = &((ngx_str_t *) cf->args->elts)[1];
1086   ngx_http_complex_value_t          **cv = &((nchan_loc_conf_t *)conf)->channel_events_channel_id;
1087   ngx_http_compile_complex_value_t    ccv;
1088 
1089   *cv = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
1090   if (*cv == NULL) {
1091     return NGX_CONF_ERROR;
1092   }
1093 
1094   ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
1095   ccv.cf = cf;
1096   ccv.value = value;
1097   ccv.complex_value = *cv;
1098 
1099   if (ngx_http_compile_complex_value(&ccv) != NGX_OK) {
1100     return NGX_CONF_ERROR;
1101   }
1102 
1103   return NGX_CONF_OK;
1104 }
1105 
nchan_store_messages_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1106 static char *nchan_store_messages_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1107   char    *p = conf;
1108   ngx_str_t *val = cf->args->elts;
1109 
1110 
1111   if (ngx_strcasecmp(val[1].data, (u_char *) "off") == 0) {
1112     ngx_int_t *max;
1113     max = (ngx_int_t *) (p + offsetof(nchan_loc_conf_t, max_messages));
1114     *max=0;
1115   }
1116   return NGX_CONF_OK;
1117 }
1118 
nchan_set_message_buffer_length(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1119 static char *nchan_set_message_buffer_length(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1120   nchan_loc_conf_t    *lcf = conf;
1121   ngx_str_t *val = cf->args->elts;
1122   ngx_str_t *arg = &val[1];
1123 
1124   if(memchr(arg->data, '$', arg->len)) {
1125     //complex
1126     lcf->max_messages = NGX_CONF_UNSET;
1127     cmd->offset = offsetof(nchan_loc_conf_t, complex_max_messages);
1128     ngx_http_set_complex_value_slot(cf, cmd, conf);
1129     memstore_reserve_conf_shared_data(lcf);
1130   }
1131   else {
1132     //simple
1133     lcf->complex_max_messages = NULL;
1134     cmd->offset = offsetof(nchan_loc_conf_t, max_messages);
1135     ngx_conf_set_num_slot(cf, cmd, conf);
1136   }
1137   return NGX_CONF_OK;
1138 }
1139 
ngx_http_set_unsubscribe_request_url(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1140 static char *ngx_http_set_unsubscribe_request_url(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1141 #if nginx_version >= 1003015
1142   return ngx_http_set_complex_value_slot(cf, cmd, conf);
1143 #else
1144   ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "%V not available on nginx version: %s, must be at least 1.3.15", &cmd->name, NGINX_VERSION);
1145   return NGX_CONF_ERROR;
1146 #endif
1147 }
1148 
nchan_set_message_timeout(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1149 static char *nchan_set_message_timeout(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1150   nchan_loc_conf_t    *lcf = conf;
1151   ngx_str_t *val = cf->args->elts;
1152   ngx_str_t *arg = &val[1];
1153 
1154   if(memchr(arg->data, '$', arg->len)) {
1155     //complex
1156     lcf->message_timeout = NGX_CONF_UNSET;
1157     cmd->offset = offsetof(nchan_loc_conf_t, complex_message_timeout);
1158     ngx_http_set_complex_value_slot(cf, cmd, conf);
1159     memstore_reserve_conf_shared_data(lcf);
1160   }
1161   else {
1162     //simple
1163     lcf->complex_message_timeout = NULL;
1164     cmd->offset = offsetof(nchan_loc_conf_t, message_timeout);
1165     ngx_conf_set_sec_slot(cf, cmd, conf);
1166   }
1167   return NGX_CONF_OK;
1168 }
1169 
nchan_ignore_obsolete_setting(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1170 static char *nchan_ignore_obsolete_setting(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1171   ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "ignoring obsolete nchan config directive '%V'.", &cmd->name);
1172   return NGX_CONF_OK;
1173 }
1174 
nchan_ignore_subscriber_concurrency(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1175 static char *nchan_ignore_subscriber_concurrency(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1176   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
1177   if(!nchan_strmatch(val, 1, "broadcast")) {
1178     ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "ignoring obsolete nchan config directive '%V %V;'. Only 'broadcast' is currently supported.", &cmd->name, val);
1179   }
1180   return NGX_CONF_OK;
1181 }
1182 
nchan_set_raw_subscriber_separator(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1183 static char *nchan_set_raw_subscriber_separator(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1184   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
1185   nchan_loc_conf_t   *lcf = conf;
1186   ngx_str_t          *cf_val = &lcf->subscriber_http_raw_stream_separator;
1187 
1188   if( val->len && val->data[val->len - 1] != '\n' ) { //must end in a newline if not empty
1189     u_char   *cur;
1190     if((cur = ngx_palloc(cf->pool, val->len + 1)) == NULL) {
1191       return NGX_CONF_ERROR;
1192     }
1193     ngx_memcpy(cur, val->data, val->len);
1194     cur[val->len] = '\n';
1195     cf_val->len = val->len + 1;
1196     cf_val->data = cur;
1197   }
1198   else {
1199     *cf_val = *val;
1200   }
1201   return NGX_CONF_OK;
1202 }
1203 
nchan_set_message_compression_slot(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1204 static char *nchan_set_message_compression_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1205   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
1206   nchan_loc_conf_t   *lcf = conf;
1207 #if (NGX_ZLIB)
1208   if(nchan_strmatch(val, 1, "on")) {
1209     lcf->message_compression = 1;
1210     global_zstream_needed = 1;
1211   }
1212   else if(nchan_strmatch(val, 1, "off")) {
1213     lcf->message_compression = 0;
1214   }
1215   else {
1216     return "invalid value: must be 'on' or 'off'";
1217   }
1218   return NGX_CONF_OK;
1219 #else
1220   return "cannot use compression, Nginx was built without zlib";
1221 #endif
1222 }
1223 
nchan_set_longpoll_multipart(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1224 static char *nchan_set_longpoll_multipart(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1225   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
1226   nchan_loc_conf_t   *lcf = conf;
1227   if(nchan_strmatch(val, 1, "on")) {
1228     lcf->longpoll_multimsg = 1;
1229   }
1230   else if(nchan_strmatch(val, 1, "off")) {
1231     lcf->longpoll_multimsg = 0;
1232   }
1233   else if(nchan_strmatch(val, 1, "raw")) {
1234     lcf->longpoll_multimsg = 1;
1235     lcf->longpoll_multimsg_use_raw_stream_separator = 1;
1236   }
1237   else {
1238     ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "invalid value for %V: %V;'. Must be 'on', 'off', or 'raw'", &cmd->name, val);
1239     return NGX_CONF_ERROR;
1240   }
1241   return NGX_CONF_OK;
1242 }
1243 
ngx_conf_set_redis_subscribe_weights(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1244 static char *ngx_conf_set_redis_subscribe_weights(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1245   ngx_int_t  master = NGX_CONF_UNSET;
1246   ngx_int_t  slave = NGX_CONF_UNSET;
1247   ngx_str_t *val = cf->args->elts;
1248   ngx_str_t *cur;
1249   unsigned   i;
1250   nchan_srv_conf_t *scf = conf;
1251   for(i=1; i < cf->args->nelts; i++) {
1252     cur = &val[i];
1253     if(nchan_str_after(&cur, "master=")) {
1254       if((master = ngx_atoi(cur->data, cur->len)) == NGX_ERROR) {
1255         return "has invalid weight for master";
1256       }
1257     }
1258     else if(nchan_str_after(&cur, "slave=")) {
1259       if((slave = ngx_atoi(cur->data, cur->len)) == NGX_ERROR) {
1260         return "has invalid weight for slave";
1261       }
1262     }
1263   }
1264 
1265   if(master != NGX_CONF_UNSET) {
1266     scf->redis.master_weight = master;
1267   }
1268   if(slave != NGX_CONF_UNSET) {
1269     scf->redis.slave_weight = slave;
1270   }
1271 
1272   return NGX_CONF_OK;
1273 }
1274 
ngx_conf_set_redis_optimize_target(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1275 static char *ngx_conf_set_redis_optimize_target(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1276   ngx_str_t          *val = &((ngx_str_t *) cf->args->elts)[1];
1277   nchan_srv_conf_t   *scf = conf;
1278   if(nchan_strmatch(val, 2, "bandwidth", "bw")) {
1279     scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_BANDWIDTH;
1280   }
1281   else if(nchan_strmatch(val, 2, "cpu", "CPU")) {
1282     scf->redis.optimize_target = NCHAN_REDIS_OPTIMIZE_CPU;
1283   }
1284   else {
1285     return "invalid value, must be \"bandwidth\" or \"cpu\"";
1286   }
1287   return NGX_CONF_OK;
1288 }
1289 
ngx_conf_enable_redis(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1290 static char *ngx_conf_enable_redis(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1291   char                *rc;
1292   ngx_flag_t          *fp;
1293   char                *p = conf;
1294   nchan_loc_conf_t    *lcf = conf;
1295 
1296   ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Use of %V is discouraged in favor of nchan_redis_pass.", &cmd->name);
1297 
1298   rc = ngx_conf_set_flag_slot(cf, cmd, conf);
1299   if(rc == NGX_CONF_ERROR) {
1300     return rc;
1301   }
1302   fp = (ngx_flag_t *) (p + cmd->offset);
1303 
1304   if(*fp) {
1305     if(!lcf->redis.enabled) {
1306       lcf->redis.enabled = 1;
1307       nchan_store_redis_add_active_loc_conf(cf, lcf);
1308     }
1309     global_redis_enabled = 1;
1310   }
1311   else {
1312     nchan_store_redis_remove_active_loc_conf(cf, lcf);
1313   }
1314 
1315   return rc;
1316 }
1317 
nchan_stub_status_directive(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1318 static char *nchan_stub_status_directive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1319   nchan_loc_conf_t    *lcf = conf;
1320   nchan_stub_status_enabled = 1;
1321   lcf->request_handler = &nchan_stub_status_handler;
1322   return NGX_CONF_OK;
1323 }
1324 
1325 
nchan_upstream_dummy_roundrobin_init(ngx_conf_t * cf,ngx_http_upstream_srv_conf_t * us)1326 static ngx_int_t nchan_upstream_dummy_roundrobin_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) {
1327   return NGX_OK;
1328 }
1329 
ngx_conf_upstream_redis_server(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1330 static char *ngx_conf_upstream_redis_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1331   ngx_http_upstream_srv_conf_t        *uscf;
1332   ngx_str_t                           *value;
1333   ngx_http_upstream_server_t          *usrv;
1334   nchan_loc_conf_t                    *lcf = conf;
1335   uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
1336   nchan_srv_conf_t                    *scf = NULL;
1337   scf = ngx_http_conf_upstream_srv_conf(uscf, ngx_nchan_module);
1338   if(scf->upstream_nchan_loc_conf) {
1339     assert(scf->upstream_nchan_loc_conf == lcf);
1340   }
1341   else {
1342     //is this even a safe technique? it might break in the future...
1343     scf->upstream_nchan_loc_conf = lcf;
1344   }
1345 
1346 
1347   if(uscf->servers == NULL) {
1348         uscf->servers = ngx_array_create(cf->pool, 4, sizeof(ngx_http_upstream_server_t));
1349   }
1350   if ((usrv = ngx_array_push(uscf->servers)) == NULL) {
1351     return NGX_CONF_ERROR;
1352   }
1353   value = cf->args->elts;
1354 
1355   if(!nchan_store_redis_validate_url(&value[1])) {
1356     return "url is invalid";
1357   }
1358 
1359   ngx_memzero(usrv, sizeof(*usrv));
1360 #if nginx_version >= 1007002
1361   usrv->name = value[1];
1362 #endif
1363   usrv->addrs = ngx_pcalloc(cf->pool, sizeof(ngx_addr_t));
1364   usrv->addrs->name = value[1];
1365 
1366   uscf->peer.init_upstream = nchan_upstream_dummy_roundrobin_init;
1367   return NGX_CONF_OK;
1368 }
1369 
ipv6_prefix_size_to_mask(int prefix_size,struct in6_addr * mask)1370 static void ipv6_prefix_size_to_mask(int prefix_size, struct in6_addr *mask) {
1371   ngx_memzero(mask, sizeof(*mask));
1372   int i;
1373   for(i=0; prefix_size > 0; prefix_size-=8, i++) {
1374     mask->s6_addr[i]= (prefix_size >= 8) ? 0xFF : (unsigned long)(0xFFU << (8 - prefix_size));
1375   }
1376 }
1377 
ipv4_prefix_size_to_mask(int prefix_size,struct in_addr * mask)1378 static void ipv4_prefix_size_to_mask(int prefix_size, struct in_addr *mask) {
1379   mask->s_addr= (in_addr_t )(prefix_size > 0 ? htonl(~((1 << (32 - prefix_size)) - 1)) : 0);
1380 }
1381 
ngx_conf_set_redis_ip_blacklist(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1382 static char *ngx_conf_set_redis_ip_blacklist(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1383   nchan_srv_conf_t    *scf = conf;
1384   ngx_str_t           *cur;
1385   ngx_str_t           *val = cf->args->elts;
1386   int                  i;
1387   nchan_redis_ip_range_t *blacklist = ngx_palloc(cf->pool, sizeof(*blacklist) * (cf->args->nelts - 1));
1388   if(blacklist == NULL) {
1389     return "couldn't allocate Redis server blacklist";
1390   }
1391 
1392   scf->redis.blacklist = blacklist;
1393   scf->redis.blacklist_count = cf->args->nelts - 1;
1394 
1395   for(i=1; i <= scf->redis.blacklist_count; i++) {
1396     cur = &val[i];
1397     nchan_redis_ip_range_t *entry = &blacklist[i-1];
1398     entry->str = *cur;
1399     int                     prefix_size;
1400     u_char                 *slash = memchr(cur->data, '/', cur->len);
1401     if(slash) {
1402       prefix_size = ngx_atoi(slash+1, cur->len - (slash + 1 - cur->data));
1403       if(prefix_size == NGX_ERROR) {
1404         return "invalid CIDR range prefix size";
1405       }
1406     }
1407     else {
1408       prefix_size = -1;
1409       slash = &cur->data[cur->len];
1410     }
1411 
1412     char buf[64];
1413     ngx_memzero(buf, sizeof(buf));
1414     memcpy(buf, cur->data, (slash - cur->data));
1415 
1416     struct addrinfo hints = {0};
1417     hints.ai_family = AF_UNSPEC;
1418     hints.ai_socktype = SOCK_STREAM;
1419     hints.ai_flags    = AI_PASSIVE;
1420 
1421     struct addrinfo *res;
1422     if(getaddrinfo(buf, NULL, &hints, &res) != 0) {
1423       return "unable to parse IP address";
1424     }
1425     entry->family = res->ai_family;
1426     if(entry->family == AF_INET) {
1427       struct sockaddr_in *sa = (struct sockaddr_in *)res->ai_addr;
1428       entry->addr.ipv4 = sa->sin_addr;
1429       entry->addr_block.ipv4 = sa->sin_addr;
1430     }
1431 #ifdef AF_INET6
1432     else if(entry->family == AF_INET6) {
1433       struct sockaddr_in6 *sa = (struct sockaddr_in6 *)res->ai_addr;
1434       entry->addr.ipv6 = sa->sin6_addr;
1435       entry->addr_block.ipv6 = sa->sin6_addr;
1436     }
1437 #endif
1438     else {
1439       return "invalid address family";
1440     }
1441 
1442     if(prefix_size == 0) {
1443       return "netmask size of 0 would block everything";
1444     }
1445 
1446     if(prefix_size == -1) {
1447       //no prefix size given, assume we're blacklisting single ip.
1448       //prefix size depends on ipv4 or ipv6
1449       prefix_size = entry->family == AF_INET ? 32 : 128;
1450     }
1451     entry->prefix_size = prefix_size;
1452 
1453     if(entry->family == AF_INET) {
1454       if(prefix_size > 32) {
1455         return "netmask size cannot exceed 32 for IPv4";
1456       }
1457       ipv4_prefix_size_to_mask(prefix_size, &entry->mask.ipv4);
1458       entry->addr_block.ipv4.s_addr &= entry->mask.ipv4.s_addr;
1459 
1460     }
1461 #ifdef AF_INET6
1462     else if(entry->family == AF_INET6) {
1463       if(prefix_size > 128) {
1464         return "netmask size cannot exceed 128 for IPv4";
1465       }
1466       ipv6_prefix_size_to_mask(prefix_size, &entry->mask.ipv6);
1467       unsigned j;
1468       uint8_t *addr = entry->addr_block.ipv6.s6_addr;
1469       uint8_t *mask = entry->mask.ipv6.s6_addr;
1470       for(j=0; j<sizeof(entry->addr_block.ipv6.s6_addr); j++) {
1471         addr[j] &= mask[j];
1472       }
1473     }
1474 #endif
1475     else {
1476       return "invalid address family";
1477     }
1478     freeaddrinfo(res);
1479   }
1480   return NGX_OK;
1481 }
1482 
ngx_conf_set_str_slot_no_newlines(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1483 static char *ngx_conf_set_str_slot_no_newlines(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1484   ngx_str_t *val = cf->args->elts;
1485   ngx_str_t *arg = &val[1];
1486 
1487   if(nchan_ngx_str_substr(arg, "\n")) {
1488     return "can't contain any newline characters";
1489   }
1490 
1491   return ngx_conf_set_str_slot(cf, cmd, conf);
1492 }
1493 
ngx_conf_set_redis_url(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1494 static char *ngx_conf_set_redis_url(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1495   nchan_loc_conf_t  *lcf = conf;
1496   ngx_str_t *val = cf->args->elts;
1497   ngx_str_t *arg = &val[1];
1498 
1499   ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Use of %V is discouraged in favor of an upstream { } block with nchan_redis_server %V;", &cmd->name, arg);
1500 
1501   if(lcf->redis.upstream) {
1502     return "can't be set here: already using nchan_redis_pass";
1503   }
1504   if(!nchan_store_redis_validate_url(arg)) {
1505     return "url is invalid";
1506   }
1507 
1508   return ngx_conf_set_str_slot(cf, cmd, conf);
1509 }
1510 
ngx_conf_process_redis_namespace_slot(ngx_conf_t * cf,void * post,void * fld)1511 static char *ngx_conf_process_redis_namespace_slot(ngx_conf_t *cf, void *post, void *fld) {
1512   ngx_str_t *arg = fld;
1513 
1514   if(memchr(arg->data, '{', arg->len)) {
1515     return "can't contain character '{'";
1516   }
1517 
1518   if(memchr(arg->data, '}', arg->len)) {
1519     return "can't contain character '}'";
1520   }
1521 
1522   if(arg->len > 0 && arg->data[arg->len-1] != ':') {
1523     u_char  *nns;
1524     if((nns = ngx_palloc(cf->pool, arg->len + 2)) == NULL) {
1525       return "couldn't allocate redis namespace data";
1526     }
1527     ngx_memcpy(nns, arg->data, arg->len);
1528     nns[arg->len]=':';
1529     nns[arg->len+1]='\0';
1530     arg->len++;
1531     arg->data=nns;
1532   }
1533 
1534   return NGX_CONF_OK;
1535 }
1536 
ngx_conf_set_redis_storage_mode_slot(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1537 static char *ngx_conf_set_redis_storage_mode_slot(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1538   char                         *p = conf;
1539   ngx_str_t                    *val = cf->args->elts;
1540   ngx_str_t                    *arg = &val[1];
1541 
1542   nchan_redis_storage_mode_t   *field;
1543 
1544   field = (nchan_redis_storage_mode_t *) (p + cmd->offset);
1545 
1546   if(*field != REDIS_MODE_CONF_UNSET) {
1547     return "is duplicate";
1548   }
1549 
1550   if(nchan_strmatch(arg, 1, "backup")) {
1551     *field = REDIS_MODE_BACKUP;
1552   }
1553   else if(nchan_strmatch(arg, 1, "distributed")) {
1554     *field = REDIS_MODE_DISTRIBUTED;
1555   }
1556   else if(nchan_strmatch(arg, 1, "nostore") ||  nchan_strmatch(arg, 1, "distributed-nostore")) {
1557     *field = REDIS_MODE_DISTRIBUTED_NOSTORE;
1558   }
1559   else {
1560     return "is invalid, must be one of 'distributed',  'backup' or 'nostore'";
1561   }
1562 
1563   return NGX_CONF_OK;
1564 }
1565 
ngx_conf_set_redis_upstream_pass(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)1566 static char *ngx_conf_set_redis_upstream_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1567   nchan_loc_conf_t  *lcf = conf;
1568   ngx_str_t         *value = cf->args->elts;
1569 
1570   lcf->redis.upstream_url = value[1];
1571   return ngx_conf_set_redis_upstream(cf, &value[1], conf);
1572 }
1573 
1574 
1575 #include "nchan_config_commands.c" //hideous but hey, it works
1576 
1577 static ngx_http_module_t  nchan_module_ctx = {
1578     nchan_preconfig,               /* preconfiguration */
1579     nchan_postconfig,              /* postconfiguration */
1580     nchan_create_main_conf,        /* create main configuration */
1581     NULL,                          /* init main configuration */
1582     nchan_create_srv_conf,         /* create server configuration */
1583     nchan_merge_srv_conf,          /* merge server configuration */
1584     nchan_create_loc_conf,         /* create location configuration */
1585     nchan_merge_loc_conf,          /* merge location configuration */
1586 };
1587 
1588 ngx_module_t  ngx_nchan_module = {
1589     NGX_MODULE_V1,
1590     &nchan_module_ctx,             /* module context */
1591     nchan_commands,                /* module directives */
1592     NGX_HTTP_MODULE,               /* module type */
1593     NULL,                          /* init master */
1594     nchan_init_module,             /* init module */
1595     nchan_init_worker,             /* init process */
1596     NULL,                          /* init thread */
1597     NULL,                          /* exit thread */
1598     nchan_exit_worker,             /* exit process */
1599     nchan_exit_master,             /* exit master */
1600     NGX_MODULE_V1_PADDING
1601 };
1602