1 #ifndef NCHAN_TYPES_H
2 #define NCHAN_TYPES_H
3 #include <util/nchan_reuse_queue.h>
4 #include <util/nchan_bufchainpool.h>
5 
6 typedef ngx_int_t (*callback_pt)(ngx_int_t, void *, void *);
7 
8 #include <util/nchan_fake_request.h>
9 
10 typedef enum {MSG_ERROR, MSG_CHANNEL_NOTREADY, MSG_INVALID, MSG_PENDING, MSG_NOTFOUND, MSG_FOUND, MSG_EXPECTED, MSG_EXPIRED} nchan_msg_status_t;
11 typedef enum {INACTIVE, NOTREADY, WAITING, STUBBED, READY, DELETED} chanhead_pubsub_status_t;
12 
13 typedef enum {NCHAN_CONTENT_TYPE_PLAIN, NCHAN_CONTENT_TYPE_JSON, NCHAN_CONTENT_TYPE_XML, NCHAN_CONTENT_TYPE_YAML, NCHAN_CONTENT_TYPE_HTML} nchan_content_type_t;
14 
15 typedef enum {REDIS_MODE_CONF_UNSET = NGX_CONF_UNSET, REDIS_MODE_BACKUP = 1, REDIS_MODE_DISTRIBUTED = 2, REDIS_MODE_DISTRIBUTED_NOSTORE = 3} nchan_redis_storage_mode_t;
16 
17 typedef enum {
18   SUB_ENQUEUE, SUB_DEQUEUE, SUB_RECEIVE_MESSAGE, SUB_RECEIVE_STATUS,
19   CHAN_PUBLISH, CHAN_DELETE
20 } channel_event_type_t;
21 //on with the declarations
22 typedef struct {
23   size_t                          shm_size;
24   ngx_msec_t                      redis_fakesub_timer_interval;
25   size_t                          redis_publish_message_msgkey_size;
26 #if (NGX_ZLIB)
27   struct {
28                                     int level;
29                                     int windowBits;
30                                     int memLevel;
31                                     int strategy;
32   }                               zlib_params;
33 #endif
34   ngx_path_t                     *message_temp_path;
35 } nchan_main_conf_t;
36 
37 
38 typedef struct {
39   ngx_str_t                     url;
40   ngx_flag_t                    url_enabled;
41   time_t                        ping_interval;
42   time_t                        cluster_check_interval;
43   ngx_str_t                     namespace;
44   nchan_redis_storage_mode_t    storage_mode;
45   ngx_int_t                     nostore_fastpublish;
46   ngx_str_t                     upstream_url;
47   ngx_http_upstream_srv_conf_t *upstream;
48   ngx_flag_t                    upstream_inheritable;
49   unsigned                      enabled:1;
50   void                         *nodeset;
51   void                         *privdata;
52 } nchan_redis_conf_t;
53 
54 typedef struct {
55   ngx_atomic_int_t  lock;
56   ngx_atomic_t      mutex;
57   ngx_int_t         write_pid;
58 } ngx_rwlock_t;
59 
60 
61 #define NCHAN_OLDEST_MSGID_TIME 0
62 #define NCHAN_NEWEST_MSGID_TIME -1
63 #define NCHAN_NTH_MSGID_TIME -2
64 
65 #define NCHAN_ZERO_MSGID {0, {{0}}, 0, 0}
66 #define NCHAN_OLDEST_MSGID {NCHAN_OLDEST_MSGID_TIME, {{0}}, 0, 1}
67 #define NCHAN_NEWEST_MSGID {NCHAN_NEWEST_MSGID_TIME, {{0}}, 0, 1}
68 #define NCHAN_NTH_MSGID {NCHAN_NTH_MSGID_TIME, {{0}}, 0, 1}
69 
70 #define NCHAN_MULTITAG_MAX 255
71 #define NCHAN_FIXED_MULTITAG_MAX 4
72 union nchan_msg_multitag {
73   int16_t         fixed[NCHAN_FIXED_MULTITAG_MAX];
74   int16_t        *allocd;
75 };
76 
77 typedef struct {
78   time_t                          time; //tag message by time
79   union nchan_msg_multitag        tag;
80   int16_t                         tagactive;
81   int16_t                         tagcount;
82 } nchan_msg_id_t;
83 
84 typedef struct {
85   time_t                          time;
86   int16_t                         tag;
87 } nchan_msg_tiny_id_t;
88 
89 //message queue
90 
91 #if NCHAN_MSG_RESERVE_DEBUG
92 typedef struct msg_rsv_dbg_s msg_rsv_dbg_t;
93 struct msg_rsv_dbg_s {
94   char                  *lbl;
95   msg_rsv_dbg_t         *prev;
96   msg_rsv_dbg_t         *next;
97 }; //msg_rsv_dbg_s
98 #endif
99 
100 typedef struct nchan_loc_conf_s nchan_loc_conf_t;
101 typedef struct nchan_msg_s nchan_msg_t;
102 
103 typedef enum {NCHAN_MSG_SHARED, NCHAN_MSG_HEAP, NCHAN_MSG_POOL, NCHAN_MSG_STACK} nchan_msg_storage_t;
104 
105 typedef enum {
106   NCHAN_MSG_COMPRESSION_INVALID = -1,
107   NCHAN_MSG_NO_COMPRESSION = 0,
108   NCHAN_MSG_COMPRESSION_WEBSOCKET_PERMESSAGE_DEFLATE
109 } nchan_msg_compression_type_t;
110 
111 typedef struct {
112   ngx_buf_t                       buf;
113   nchan_msg_compression_type_t    compression;
114 } nchan_compressed_msg_t;
115 
116 struct nchan_msg_s {
117   nchan_msg_id_t                  id;
118   nchan_msg_id_t                  prev_id;
119   ngx_str_t                      *content_type;
120   ngx_str_t                      *eventsource_event;
121   //  ngx_str_t                   charset;
122   ngx_buf_t                       buf;
123   time_t                          expires;
124 
125   ngx_atomic_int_t                refcount;
126   nchan_msg_t                    *parent;
127   nchan_compressed_msg_t         *compressed;
128   //struct nchan_msg_s             *reload_next;
129 
130   nchan_msg_storage_t             storage;
131 
132 #if NCHAN_MSG_RESERVE_DEBUG
133   struct msg_rsv_dbg_s           *rsv;
134 #endif
135 #if NCHAN_MSG_LEAK_DEBUG
136   ngx_str_t                       lbl;
137   struct nchan_msg_s             *dbg_prev;
138   struct nchan_msg_s             *dbg_next;
139 #endif
140 }; // nchan_msg_t
141 
142 typedef struct {
143   ngx_str_t                       id;
144   ngx_int_t                       messages;
145   ngx_int_t                       subscribers;
146   time_t                          last_seen;
147   time_t                          expires;
148   nchan_msg_id_t                  last_published_msg_id;
149 } nchan_channel_t;
150 
151 
152 //garbage collecting goodness
153 typedef struct {
154   ngx_queue_t                     queue;
155   nchan_channel_t                *channel;
156 } nchan_channel_queue_t;
157 
158 
159 typedef struct {
160   ngx_queue_t                    queue;
161   ngx_rwlock_t                   lock;
162 } nchan_worker_msg_sentinel_t;
163 
164 typedef struct {
165   ngx_atomic_uint_t      channels;
166   ngx_atomic_uint_t      subscribers;
167   ngx_atomic_uint_t      total_published_messages;
168   ngx_atomic_uint_t      messages;
169   ngx_atomic_uint_t      redis_pending_commands;
170   ngx_atomic_uint_t      redis_connected_servers;
171   ngx_atomic_uint_t      ipc_total_alerts_sent;
172   ngx_atomic_uint_t      ipc_total_alerts_received;
173   ngx_atomic_uint_t      ipc_queue_size;
174   ngx_atomic_uint_t      ipc_total_send_delay;
175   ngx_atomic_uint_t      ipc_total_receive_delay;
176 } nchan_stub_status_t;
177 
178 typedef struct subscriber_s subscriber_t;
179 
180 typedef struct {
181   //must be made entirely of ngx_atomic_int_t
182   ngx_atomic_int_t                channels;
183   ngx_atomic_int_t                subscribers;
184   ngx_atomic_int_t                messages;
185   ngx_atomic_int_t                messages_shmem_bytes;
186   ngx_atomic_int_t                messages_file_bytes;
187 } nchan_group_limits_t;
188 
189 typedef struct {
190   ngx_atomic_int_t               channels;
191   ngx_atomic_int_t               multiplexed_channels;
192   ngx_atomic_int_t               subscribers;
193   ngx_atomic_int_t               messages;
194   ngx_atomic_int_t               messages_shmem_bytes;
195   ngx_atomic_int_t               messages_file_bytes;
196   nchan_group_limits_t           limit;
197   ngx_str_t                      name;
198 } nchan_group_t;
199 
200 typedef struct{
201   //init
202   ngx_int_t (*init_module)(ngx_cycle_t *cycle);
203   ngx_int_t (*init_worker)(ngx_cycle_t *cycle);
204   ngx_int_t (*init_postconfig)(ngx_conf_t *cf);
205   void      (*create_main_conf)(ngx_conf_t *cf, nchan_main_conf_t *mcf);
206 
207   //quit
208   void      (*exit_worker)(ngx_cycle_t *cycle);
209   void      (*exit_master)(ngx_cycle_t *cycle);
210 
211   //async-friendly functions with callbacks
212   ngx_int_t (*get_message) (ngx_str_t *, nchan_msg_id_t *, nchan_loc_conf_t *cf, callback_pt, void *);
213   ngx_int_t (*subscribe)   (ngx_str_t *, subscriber_t *);
214   ngx_int_t (*publish)     (ngx_str_t *, nchan_msg_t *, nchan_loc_conf_t *, callback_pt, void *);
215 
216     //channel actions
217   ngx_int_t (*delete_channel)(ngx_str_t *, nchan_loc_conf_t *, callback_pt, void *);
218   ngx_int_t (*find_channel)(ngx_str_t *, nchan_loc_conf_t *, callback_pt, void*);
219 
220   //group actions
221   ngx_int_t (*get_group)(ngx_str_t *name, nchan_loc_conf_t *, callback_pt, void *);
222   ngx_int_t (*set_group_limits)(ngx_str_t *name, nchan_loc_conf_t *, nchan_group_limits_t *limits, callback_pt, void *);
223   ngx_int_t (*delete_group)(ngx_str_t *name, nchan_loc_conf_t *, callback_pt, void *);
224 
225   //subscriber info stuff
226   ngx_int_t (*get_subscriber_info_id)(nchan_loc_conf_t *,  callback_pt, void *);
227   ngx_int_t (*request_subscriber_info)(ngx_str_t *channel_id, ngx_int_t request_id, nchan_loc_conf_t *, callback_pt, void *);
228 
229 } nchan_store_t;
230 
231 #define NCHAN_MULTI_SEP_CHR '\0'
232 
233 typedef struct {
234   unsigned                        http:1;
235   unsigned                        websocket:1;
236 } nchan_conf_publisher_types_t;
237 
238 typedef struct {
239   unsigned                        poll:1; //bleugh
240   unsigned                        http_raw_stream:1; //ugleh
241   unsigned                        longpoll:1;
242   unsigned                        http_chunked:1;
243   unsigned                        http_multipart:1;
244   unsigned                        eventsource:1;
245   unsigned                        websocket:1;
246 } nchan_conf_subscriber_types_t;
247 
248 typedef struct {
249   unsigned                        get:1;
250   unsigned                        set:1;
251   unsigned                        delete:1;
252 
253   ngx_int_t                       enable_accounting;
254 
255   ngx_http_complex_value_t       *max_channels;
256   ngx_http_complex_value_t       *max_subscribers;
257   ngx_http_complex_value_t       *max_messages;
258   ngx_http_complex_value_t       *max_messages_shm_bytes;
259   ngx_http_complex_value_t       *max_messages_file_bytes;
260 } nchan_conf_group_t;
261 
262 #define NCHAN_COMPLEX_VALUE_ARRAY_MAX 8
263 typedef struct {
264   ngx_http_complex_value_t       *cv[NCHAN_COMPLEX_VALUE_ARRAY_MAX];
265   ngx_int_t                       n;
266 } nchan_complex_value_arr_t;
267 
268 typedef struct {
269  ngx_atomic_uint_t               message_timeout;
270  ngx_atomic_uint_t               max_messages;
271 } nchan_loc_conf_shared_data_t;
272 
273 typedef enum {
274   NCHAN_REDIS_OPTIMIZE_UNSET = -1,
275   NCHAN_REDIS_OPTIMIZE_CPU = 1,
276   NCHAN_REDIS_OPTIMIZE_BANDWIDTH = 2
277 } nchan_redis_optimize_t;
278 
279 typedef struct {
280   int family;
281   int prefix_size;
282   ngx_str_t str;
283   struct {
284     union {
285       struct in_addr ipv4;
286 #ifdef AF_INET6
287       struct in6_addr ipv6;
288       char            str[16];
289 #endif
290     };
291   } addr;
292   struct {
293     union {
294       struct in_addr ipv4;
295 #ifdef AF_INET6
296       struct in6_addr ipv6;
297       char            str[16];
298 #endif
299     };
300   } addr_block;
301   struct {
302     union {
303       struct in_addr ipv4;
304 #ifdef AF_INET6
305       struct in6_addr ipv6;
306 #endif
307       char            str[16];
308     };
309   } mask;
310 } nchan_redis_ip_range_t;
311 
312 typedef struct {
313   struct {
314       ngx_msec_t                    connect_timeout;
315       nchan_redis_optimize_t        optimize_target;
316       ngx_int_t                     master_weight;
317       ngx_int_t                     slave_weight;
318       ngx_int_t                     blacklist_count;
319       nchan_redis_ip_range_t       *blacklist;
320   }                               redis;
321   nchan_loc_conf_t                *upstream_nchan_loc_conf;
322 } nchan_srv_conf_t;
323 
324 typedef struct {
325   time_t                          time;
326   ngx_int_t                       msgs_per_minute;
327   ngx_int_t                       msg_padding;
328   ngx_int_t                       channels;
329   ngx_int_t                       subscribers_per_channel;
330   enum {
331     NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET = -1,
332     NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM = 1,
333     NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_OPTIMAL = 2
334   }                               subscriber_distribution;
335   enum {
336     NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET = -1,
337     NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM = 1,
338     NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_OPTIMAL = 2
339   }                               publisher_distribution;
340 } nchan_benchmark_conf_t;
341 
342 struct nchan_loc_conf_s { //nchan_loc_conf_t
343 
344   ngx_int_t                       shared_data_index;
345 
346   time_t                          message_timeout;
347   ngx_int_t                       max_messages;
348 
349   ngx_http_complex_value_t       *complex_message_timeout;
350   ngx_http_complex_value_t       *complex_max_messages;
351 
352   ngx_http_complex_value_t       *authorize_request_url;
353   ngx_http_complex_value_t       *publisher_upstream_request_url;
354 
355   ngx_http_complex_value_t       *unsubscribe_request_url;
356   ngx_http_complex_value_t       *subscribe_request_url;
357 
358   nchan_complex_value_arr_t       pub_chid;
359   nchan_complex_value_arr_t       sub_chid;
360   nchan_complex_value_arr_t       pubsub_chid;
361   ngx_http_complex_value_t       *channel_group;
362 
363   ngx_str_t                       channel_id_split_delimiter;
364 
365   ngx_str_t                       subscriber_http_raw_stream_separator;
366 
367   ngx_http_complex_value_t       *allow_origin;
368   ngx_int_t                       allow_credentials;
369 
370   nchan_complex_value_arr_t       last_message_id;
371   ngx_str_t                       custom_msgtag_header;
372   ngx_int_t                       msg_in_etag_only;
373 
374   nchan_conf_publisher_types_t    pub;
375   nchan_conf_subscriber_types_t   sub;
376   nchan_conf_group_t              group;
377   time_t                          subscriber_timeout;
378 
379   ngx_int_t                       longpoll_multimsg;
380   ngx_int_t                       longpoll_multimsg_use_raw_stream_separator;
381 
382   ngx_str_t                       eventsource_event;
383 
384   time_t                          websocket_ping_interval;
385   struct {
386     time_t                          interval;
387     ngx_str_t                       event;
388     ngx_str_t                       data;
389     ngx_str_t                       comment;
390   }                               eventsource_ping;
391 
392   struct {
393     ngx_int_t   enabled;
394     ngx_str_t  *in;
395     ngx_str_t  *out;
396   }                               websocket_heartbeat;
397 
398   nchan_msg_compression_type_t    message_compression;
399 
400   ngx_int_t                       subscriber_first_message;
401 
402   ngx_http_complex_value_t       *subscriber_info_string;
403   ngx_int_t                       subscriber_info_location;
404 
405   ngx_http_complex_value_t       *channel_events_channel_id;
406   ngx_http_complex_value_t       *channel_event_string;
407 
408   ngx_int_t                       subscribe_only_existing_channel;
409 
410   nchan_redis_conf_t              redis;
411   time_t                          redis_idle_channel_cache_timeout;
412 
413   ngx_int_t                       max_channel_id_length;
414   ngx_int_t                       max_channel_subscribers;
415   time_t                          channel_timeout;
416   nchan_store_t                  *storage_engine;
417 
418   nchan_benchmark_conf_t          benchmark;
419 
420   ngx_int_t                     (*request_handler)(ngx_http_request_t *r);
421 };// nchan_loc_conf_t;
422 
423 typedef struct nchan_llist_timed_s {
424   struct nchan_llist_timed_s     *prev;
425   void                           *data;
426   time_t                          time;
427   struct nchan_llist_timed_s     *next;
428 } nchan_llist_timed_t;
429 
430 typedef enum {PUB, SUB} pub_or_sub_t;
431 typedef enum {LONGPOLL, HTTP_CHUNKED, HTTP_MULTIPART, HTTP_RAW_STREAM, INTERVALPOLL, EVENTSOURCE, WEBSOCKET, INTERNAL, SUBSCRIBER_TYPES} subscriber_type_t;
432 typedef void (*subscriber_callback_pt)(subscriber_t *, void *);
433 
434 typedef struct {
435   ngx_int_t              (*enqueue)(struct subscriber_s *);
436   ngx_int_t              (*dequeue)(struct subscriber_s *);
437   ngx_int_t              (*respond_message)(struct subscriber_s *, nchan_msg_t *);
438   ngx_int_t              (*respond_status)(struct subscriber_s *, ngx_int_t, const ngx_str_t *, ngx_chain_t *);
439   ngx_int_t              (*set_enqueue_callback)(subscriber_t *self, subscriber_callback_pt cb, void *privdata);
440   ngx_int_t              (*set_dequeue_callback)(subscriber_t *self, subscriber_callback_pt cb, void *privdata);
441   ngx_int_t              (*reserve)(struct subscriber_s *);
442   ngx_int_t              (*release)(struct subscriber_s *, uint8_t nodestroy);
443   ngx_int_t              (*notify)(struct subscriber_s *, ngx_int_t code, void *data);
444   ngx_int_t              (*subscribe)(subscriber_t *, ngx_str_t *);
445 
446 } subscriber_fn_t;
447 
448 typedef enum {ALIVE, DEAD, UNKNOWN, PININGFORTHEFJORDS} nchan_subscriber_status_t;
449 
450 struct subscriber_s {
451   ngx_str_t                 *name;
452   subscriber_type_t          type;
453   const subscriber_fn_t     *fn;
454   nchan_subscriber_status_t  status;
455   nchan_msg_id_t             last_msgid;
456   nchan_loc_conf_t          *cf;
457   ngx_http_request_t        *request;
458   void                      *upstream_requestmachine;
459   ngx_uint_t                 reserved;
460 
461   unsigned                   enable_sub_unsub_callbacks;
462   unsigned                   dequeue_after_response:1;
463   unsigned                   destroy_after_dequeue:1;
464   unsigned                   enqueued:1;
465 
466 #if FAKESHARD
467   ngx_int_t                  owner;
468 #endif
469 #if NCHAN_SUBSCRIBER_LEAK_DEBUG
470   u_char                    *lbl;
471   subscriber_t              *dbg_prev;
472   subscriber_t              *dbg_next;
473 #endif
474 }; //subscriber_t
475 
476 #define NCHAN_MULTITAG_REQUEST_CTX_MAX 4
477 typedef struct {
478   subscriber_t                  *sub;
479   nchan_reuse_queue_t           *output_str_queue;
480   nchan_reuse_queue_t           *reserved_msg_queue;
481   nchan_bufchain_pool_t         *bcp; //bufchainpool maybe?
482 
483   ngx_str_t                     *subscriber_type;
484   nchan_msg_id_t                 msg_id;
485   nchan_msg_id_t                 prev_msg_id;
486   ngx_str_t                     *publisher_type;
487   ngx_str_t                     *multipart_boundary;
488   ngx_str_t                     *channel_event_name;
489   ngx_str_t                      channel_id[NCHAN_MULTITAG_REQUEST_CTX_MAX];
490   int                            channel_id_count;
491   time_t                         channel_subscriber_last_seen;
492   int                            channel_subscriber_count;
493   int                            channel_message_count;
494   ngx_str_t                     *channel_group_name;
495 
496   ngx_str_t                     *request_origin_header;
497   ngx_str_t                     *allow_origin;
498 
499   ngx_int_t                      subscriber_info_response_id;
500   ngx_str_t                     *subscriber_info_response_channel_id;
501 
502   unsigned                       sent_unsubscribe_request:1;
503   unsigned                       request_ran_content_handler:1;
504 
505 } nchan_request_ctx_t;
506 
507 
508 typedef struct {
509   ngx_str_t     hostname;
510   ngx_str_t     peername; // resolved hostname (ip address)
511   ngx_int_t     port;
512   ngx_str_t     password;
513   ngx_int_t     db;
514 } redis_connect_params_t;
515 
516 #endif  /* NCHAN_TYPES_H */
517