1 #ifndef NCHAN_TYPES_H 2 #define NCHAN_TYPES_H 3 #include <util/nchan_reuse_queue.h> 4 #include <util/nchan_bufchainpool.h> 5 6 typedef ngx_int_t (*callback_pt)(ngx_int_t, void *, void *); 7 8 #include <util/nchan_fake_request.h> 9 10 typedef enum {MSG_ERROR, MSG_CHANNEL_NOTREADY, MSG_INVALID, MSG_PENDING, MSG_NOTFOUND, MSG_FOUND, MSG_EXPECTED, MSG_EXPIRED} nchan_msg_status_t; 11 typedef enum {INACTIVE, NOTREADY, WAITING, STUBBED, READY, DELETED} chanhead_pubsub_status_t; 12 13 typedef enum {NCHAN_CONTENT_TYPE_PLAIN, NCHAN_CONTENT_TYPE_JSON, NCHAN_CONTENT_TYPE_XML, NCHAN_CONTENT_TYPE_YAML, NCHAN_CONTENT_TYPE_HTML} nchan_content_type_t; 14 15 typedef enum {REDIS_MODE_CONF_UNSET = NGX_CONF_UNSET, REDIS_MODE_BACKUP = 1, REDIS_MODE_DISTRIBUTED = 2, REDIS_MODE_DISTRIBUTED_NOSTORE = 3} nchan_redis_storage_mode_t; 16 17 typedef enum { 18 SUB_ENQUEUE, SUB_DEQUEUE, SUB_RECEIVE_MESSAGE, SUB_RECEIVE_STATUS, 19 CHAN_PUBLISH, CHAN_DELETE 20 } channel_event_type_t; 21 //on with the declarations 22 typedef struct { 23 size_t shm_size; 24 ngx_msec_t redis_fakesub_timer_interval; 25 size_t redis_publish_message_msgkey_size; 26 #if (NGX_ZLIB) 27 struct { 28 int level; 29 int windowBits; 30 int memLevel; 31 int strategy; 32 } zlib_params; 33 #endif 34 ngx_path_t *message_temp_path; 35 } nchan_main_conf_t; 36 37 38 typedef struct { 39 ngx_str_t url; 40 ngx_flag_t url_enabled; 41 time_t ping_interval; 42 time_t cluster_check_interval; 43 ngx_str_t namespace; 44 nchan_redis_storage_mode_t storage_mode; 45 ngx_int_t nostore_fastpublish; 46 ngx_str_t upstream_url; 47 ngx_http_upstream_srv_conf_t *upstream; 48 ngx_flag_t upstream_inheritable; 49 unsigned enabled:1; 50 void *nodeset; 51 void *privdata; 52 } nchan_redis_conf_t; 53 54 typedef struct { 55 ngx_atomic_int_t lock; 56 ngx_atomic_t mutex; 57 ngx_int_t write_pid; 58 } ngx_rwlock_t; 59 60 61 #define NCHAN_OLDEST_MSGID_TIME 0 62 #define NCHAN_NEWEST_MSGID_TIME -1 63 #define NCHAN_NTH_MSGID_TIME -2 64 65 #define NCHAN_ZERO_MSGID {0, {{0}}, 0, 0} 66 #define NCHAN_OLDEST_MSGID {NCHAN_OLDEST_MSGID_TIME, {{0}}, 0, 1} 67 #define NCHAN_NEWEST_MSGID {NCHAN_NEWEST_MSGID_TIME, {{0}}, 0, 1} 68 #define NCHAN_NTH_MSGID {NCHAN_NTH_MSGID_TIME, {{0}}, 0, 1} 69 70 #define NCHAN_MULTITAG_MAX 255 71 #define NCHAN_FIXED_MULTITAG_MAX 4 72 union nchan_msg_multitag { 73 int16_t fixed[NCHAN_FIXED_MULTITAG_MAX]; 74 int16_t *allocd; 75 }; 76 77 typedef struct { 78 time_t time; //tag message by time 79 union nchan_msg_multitag tag; 80 int16_t tagactive; 81 int16_t tagcount; 82 } nchan_msg_id_t; 83 84 typedef struct { 85 time_t time; 86 int16_t tag; 87 } nchan_msg_tiny_id_t; 88 89 //message queue 90 91 #if NCHAN_MSG_RESERVE_DEBUG 92 typedef struct msg_rsv_dbg_s msg_rsv_dbg_t; 93 struct msg_rsv_dbg_s { 94 char *lbl; 95 msg_rsv_dbg_t *prev; 96 msg_rsv_dbg_t *next; 97 }; //msg_rsv_dbg_s 98 #endif 99 100 typedef struct nchan_loc_conf_s nchan_loc_conf_t; 101 typedef struct nchan_msg_s nchan_msg_t; 102 103 typedef enum {NCHAN_MSG_SHARED, NCHAN_MSG_HEAP, NCHAN_MSG_POOL, NCHAN_MSG_STACK} nchan_msg_storage_t; 104 105 typedef enum { 106 NCHAN_MSG_COMPRESSION_INVALID = -1, 107 NCHAN_MSG_NO_COMPRESSION = 0, 108 NCHAN_MSG_COMPRESSION_WEBSOCKET_PERMESSAGE_DEFLATE 109 } nchan_msg_compression_type_t; 110 111 typedef struct { 112 ngx_buf_t buf; 113 nchan_msg_compression_type_t compression; 114 } nchan_compressed_msg_t; 115 116 struct nchan_msg_s { 117 nchan_msg_id_t id; 118 nchan_msg_id_t prev_id; 119 ngx_str_t *content_type; 120 ngx_str_t *eventsource_event; 121 // ngx_str_t charset; 122 ngx_buf_t buf; 123 time_t expires; 124 125 ngx_atomic_int_t refcount; 126 nchan_msg_t *parent; 127 nchan_compressed_msg_t *compressed; 128 //struct nchan_msg_s *reload_next; 129 130 nchan_msg_storage_t storage; 131 132 #if NCHAN_MSG_RESERVE_DEBUG 133 struct msg_rsv_dbg_s *rsv; 134 #endif 135 #if NCHAN_MSG_LEAK_DEBUG 136 ngx_str_t lbl; 137 struct nchan_msg_s *dbg_prev; 138 struct nchan_msg_s *dbg_next; 139 #endif 140 }; // nchan_msg_t 141 142 typedef struct { 143 ngx_str_t id; 144 ngx_int_t messages; 145 ngx_int_t subscribers; 146 time_t last_seen; 147 time_t expires; 148 nchan_msg_id_t last_published_msg_id; 149 } nchan_channel_t; 150 151 152 //garbage collecting goodness 153 typedef struct { 154 ngx_queue_t queue; 155 nchan_channel_t *channel; 156 } nchan_channel_queue_t; 157 158 159 typedef struct { 160 ngx_queue_t queue; 161 ngx_rwlock_t lock; 162 } nchan_worker_msg_sentinel_t; 163 164 typedef struct { 165 ngx_atomic_uint_t channels; 166 ngx_atomic_uint_t subscribers; 167 ngx_atomic_uint_t total_published_messages; 168 ngx_atomic_uint_t messages; 169 ngx_atomic_uint_t redis_pending_commands; 170 ngx_atomic_uint_t redis_connected_servers; 171 ngx_atomic_uint_t ipc_total_alerts_sent; 172 ngx_atomic_uint_t ipc_total_alerts_received; 173 ngx_atomic_uint_t ipc_queue_size; 174 ngx_atomic_uint_t ipc_total_send_delay; 175 ngx_atomic_uint_t ipc_total_receive_delay; 176 } nchan_stub_status_t; 177 178 typedef struct subscriber_s subscriber_t; 179 180 typedef struct { 181 //must be made entirely of ngx_atomic_int_t 182 ngx_atomic_int_t channels; 183 ngx_atomic_int_t subscribers; 184 ngx_atomic_int_t messages; 185 ngx_atomic_int_t messages_shmem_bytes; 186 ngx_atomic_int_t messages_file_bytes; 187 } nchan_group_limits_t; 188 189 typedef struct { 190 ngx_atomic_int_t channels; 191 ngx_atomic_int_t multiplexed_channels; 192 ngx_atomic_int_t subscribers; 193 ngx_atomic_int_t messages; 194 ngx_atomic_int_t messages_shmem_bytes; 195 ngx_atomic_int_t messages_file_bytes; 196 nchan_group_limits_t limit; 197 ngx_str_t name; 198 } nchan_group_t; 199 200 typedef struct{ 201 //init 202 ngx_int_t (*init_module)(ngx_cycle_t *cycle); 203 ngx_int_t (*init_worker)(ngx_cycle_t *cycle); 204 ngx_int_t (*init_postconfig)(ngx_conf_t *cf); 205 void (*create_main_conf)(ngx_conf_t *cf, nchan_main_conf_t *mcf); 206 207 //quit 208 void (*exit_worker)(ngx_cycle_t *cycle); 209 void (*exit_master)(ngx_cycle_t *cycle); 210 211 //async-friendly functions with callbacks 212 ngx_int_t (*get_message) (ngx_str_t *, nchan_msg_id_t *, nchan_loc_conf_t *cf, callback_pt, void *); 213 ngx_int_t (*subscribe) (ngx_str_t *, subscriber_t *); 214 ngx_int_t (*publish) (ngx_str_t *, nchan_msg_t *, nchan_loc_conf_t *, callback_pt, void *); 215 216 //channel actions 217 ngx_int_t (*delete_channel)(ngx_str_t *, nchan_loc_conf_t *, callback_pt, void *); 218 ngx_int_t (*find_channel)(ngx_str_t *, nchan_loc_conf_t *, callback_pt, void*); 219 220 //group actions 221 ngx_int_t (*get_group)(ngx_str_t *name, nchan_loc_conf_t *, callback_pt, void *); 222 ngx_int_t (*set_group_limits)(ngx_str_t *name, nchan_loc_conf_t *, nchan_group_limits_t *limits, callback_pt, void *); 223 ngx_int_t (*delete_group)(ngx_str_t *name, nchan_loc_conf_t *, callback_pt, void *); 224 225 //subscriber info stuff 226 ngx_int_t (*get_subscriber_info_id)(nchan_loc_conf_t *, callback_pt, void *); 227 ngx_int_t (*request_subscriber_info)(ngx_str_t *channel_id, ngx_int_t request_id, nchan_loc_conf_t *, callback_pt, void *); 228 229 } nchan_store_t; 230 231 #define NCHAN_MULTI_SEP_CHR '\0' 232 233 typedef struct { 234 unsigned http:1; 235 unsigned websocket:1; 236 } nchan_conf_publisher_types_t; 237 238 typedef struct { 239 unsigned poll:1; //bleugh 240 unsigned http_raw_stream:1; //ugleh 241 unsigned longpoll:1; 242 unsigned http_chunked:1; 243 unsigned http_multipart:1; 244 unsigned eventsource:1; 245 unsigned websocket:1; 246 } nchan_conf_subscriber_types_t; 247 248 typedef struct { 249 unsigned get:1; 250 unsigned set:1; 251 unsigned delete:1; 252 253 ngx_int_t enable_accounting; 254 255 ngx_http_complex_value_t *max_channels; 256 ngx_http_complex_value_t *max_subscribers; 257 ngx_http_complex_value_t *max_messages; 258 ngx_http_complex_value_t *max_messages_shm_bytes; 259 ngx_http_complex_value_t *max_messages_file_bytes; 260 } nchan_conf_group_t; 261 262 #define NCHAN_COMPLEX_VALUE_ARRAY_MAX 8 263 typedef struct { 264 ngx_http_complex_value_t *cv[NCHAN_COMPLEX_VALUE_ARRAY_MAX]; 265 ngx_int_t n; 266 } nchan_complex_value_arr_t; 267 268 typedef struct { 269 ngx_atomic_uint_t message_timeout; 270 ngx_atomic_uint_t max_messages; 271 } nchan_loc_conf_shared_data_t; 272 273 typedef enum { 274 NCHAN_REDIS_OPTIMIZE_UNSET = -1, 275 NCHAN_REDIS_OPTIMIZE_CPU = 1, 276 NCHAN_REDIS_OPTIMIZE_BANDWIDTH = 2 277 } nchan_redis_optimize_t; 278 279 typedef struct { 280 int family; 281 int prefix_size; 282 ngx_str_t str; 283 struct { 284 union { 285 struct in_addr ipv4; 286 #ifdef AF_INET6 287 struct in6_addr ipv6; 288 char str[16]; 289 #endif 290 }; 291 } addr; 292 struct { 293 union { 294 struct in_addr ipv4; 295 #ifdef AF_INET6 296 struct in6_addr ipv6; 297 char str[16]; 298 #endif 299 }; 300 } addr_block; 301 struct { 302 union { 303 struct in_addr ipv4; 304 #ifdef AF_INET6 305 struct in6_addr ipv6; 306 #endif 307 char str[16]; 308 }; 309 } mask; 310 } nchan_redis_ip_range_t; 311 312 typedef struct { 313 struct { 314 ngx_msec_t connect_timeout; 315 nchan_redis_optimize_t optimize_target; 316 ngx_int_t master_weight; 317 ngx_int_t slave_weight; 318 ngx_int_t blacklist_count; 319 nchan_redis_ip_range_t *blacklist; 320 } redis; 321 nchan_loc_conf_t *upstream_nchan_loc_conf; 322 } nchan_srv_conf_t; 323 324 typedef struct { 325 time_t time; 326 ngx_int_t msgs_per_minute; 327 ngx_int_t msg_padding; 328 ngx_int_t channels; 329 ngx_int_t subscribers_per_channel; 330 enum { 331 NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_UNSET = -1, 332 NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_RANDOM = 1, 333 NCHAN_BENCHMARK_SUBSCRIBER_DISTRIBUTION_OPTIMAL = 2 334 } subscriber_distribution; 335 enum { 336 NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_UNSET = -1, 337 NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_RANDOM = 1, 338 NCHAN_BENCHMARK_PUBLISHER_DISTRIBUTION_OPTIMAL = 2 339 } publisher_distribution; 340 } nchan_benchmark_conf_t; 341 342 struct nchan_loc_conf_s { //nchan_loc_conf_t 343 344 ngx_int_t shared_data_index; 345 346 time_t message_timeout; 347 ngx_int_t max_messages; 348 349 ngx_http_complex_value_t *complex_message_timeout; 350 ngx_http_complex_value_t *complex_max_messages; 351 352 ngx_http_complex_value_t *authorize_request_url; 353 ngx_http_complex_value_t *publisher_upstream_request_url; 354 355 ngx_http_complex_value_t *unsubscribe_request_url; 356 ngx_http_complex_value_t *subscribe_request_url; 357 358 nchan_complex_value_arr_t pub_chid; 359 nchan_complex_value_arr_t sub_chid; 360 nchan_complex_value_arr_t pubsub_chid; 361 ngx_http_complex_value_t *channel_group; 362 363 ngx_str_t channel_id_split_delimiter; 364 365 ngx_str_t subscriber_http_raw_stream_separator; 366 367 ngx_http_complex_value_t *allow_origin; 368 ngx_int_t allow_credentials; 369 370 nchan_complex_value_arr_t last_message_id; 371 ngx_str_t custom_msgtag_header; 372 ngx_int_t msg_in_etag_only; 373 374 nchan_conf_publisher_types_t pub; 375 nchan_conf_subscriber_types_t sub; 376 nchan_conf_group_t group; 377 time_t subscriber_timeout; 378 379 ngx_int_t longpoll_multimsg; 380 ngx_int_t longpoll_multimsg_use_raw_stream_separator; 381 382 ngx_str_t eventsource_event; 383 384 time_t websocket_ping_interval; 385 struct { 386 time_t interval; 387 ngx_str_t event; 388 ngx_str_t data; 389 ngx_str_t comment; 390 } eventsource_ping; 391 392 struct { 393 ngx_int_t enabled; 394 ngx_str_t *in; 395 ngx_str_t *out; 396 } websocket_heartbeat; 397 398 nchan_msg_compression_type_t message_compression; 399 400 ngx_int_t subscriber_first_message; 401 402 ngx_http_complex_value_t *subscriber_info_string; 403 ngx_int_t subscriber_info_location; 404 405 ngx_http_complex_value_t *channel_events_channel_id; 406 ngx_http_complex_value_t *channel_event_string; 407 408 ngx_int_t subscribe_only_existing_channel; 409 410 nchan_redis_conf_t redis; 411 time_t redis_idle_channel_cache_timeout; 412 413 ngx_int_t max_channel_id_length; 414 ngx_int_t max_channel_subscribers; 415 time_t channel_timeout; 416 nchan_store_t *storage_engine; 417 418 nchan_benchmark_conf_t benchmark; 419 420 ngx_int_t (*request_handler)(ngx_http_request_t *r); 421 };// nchan_loc_conf_t; 422 423 typedef struct nchan_llist_timed_s { 424 struct nchan_llist_timed_s *prev; 425 void *data; 426 time_t time; 427 struct nchan_llist_timed_s *next; 428 } nchan_llist_timed_t; 429 430 typedef enum {PUB, SUB} pub_or_sub_t; 431 typedef enum {LONGPOLL, HTTP_CHUNKED, HTTP_MULTIPART, HTTP_RAW_STREAM, INTERVALPOLL, EVENTSOURCE, WEBSOCKET, INTERNAL, SUBSCRIBER_TYPES} subscriber_type_t; 432 typedef void (*subscriber_callback_pt)(subscriber_t *, void *); 433 434 typedef struct { 435 ngx_int_t (*enqueue)(struct subscriber_s *); 436 ngx_int_t (*dequeue)(struct subscriber_s *); 437 ngx_int_t (*respond_message)(struct subscriber_s *, nchan_msg_t *); 438 ngx_int_t (*respond_status)(struct subscriber_s *, ngx_int_t, const ngx_str_t *, ngx_chain_t *); 439 ngx_int_t (*set_enqueue_callback)(subscriber_t *self, subscriber_callback_pt cb, void *privdata); 440 ngx_int_t (*set_dequeue_callback)(subscriber_t *self, subscriber_callback_pt cb, void *privdata); 441 ngx_int_t (*reserve)(struct subscriber_s *); 442 ngx_int_t (*release)(struct subscriber_s *, uint8_t nodestroy); 443 ngx_int_t (*notify)(struct subscriber_s *, ngx_int_t code, void *data); 444 ngx_int_t (*subscribe)(subscriber_t *, ngx_str_t *); 445 446 } subscriber_fn_t; 447 448 typedef enum {ALIVE, DEAD, UNKNOWN, PININGFORTHEFJORDS} nchan_subscriber_status_t; 449 450 struct subscriber_s { 451 ngx_str_t *name; 452 subscriber_type_t type; 453 const subscriber_fn_t *fn; 454 nchan_subscriber_status_t status; 455 nchan_msg_id_t last_msgid; 456 nchan_loc_conf_t *cf; 457 ngx_http_request_t *request; 458 void *upstream_requestmachine; 459 ngx_uint_t reserved; 460 461 unsigned enable_sub_unsub_callbacks; 462 unsigned dequeue_after_response:1; 463 unsigned destroy_after_dequeue:1; 464 unsigned enqueued:1; 465 466 #if FAKESHARD 467 ngx_int_t owner; 468 #endif 469 #if NCHAN_SUBSCRIBER_LEAK_DEBUG 470 u_char *lbl; 471 subscriber_t *dbg_prev; 472 subscriber_t *dbg_next; 473 #endif 474 }; //subscriber_t 475 476 #define NCHAN_MULTITAG_REQUEST_CTX_MAX 4 477 typedef struct { 478 subscriber_t *sub; 479 nchan_reuse_queue_t *output_str_queue; 480 nchan_reuse_queue_t *reserved_msg_queue; 481 nchan_bufchain_pool_t *bcp; //bufchainpool maybe? 482 483 ngx_str_t *subscriber_type; 484 nchan_msg_id_t msg_id; 485 nchan_msg_id_t prev_msg_id; 486 ngx_str_t *publisher_type; 487 ngx_str_t *multipart_boundary; 488 ngx_str_t *channel_event_name; 489 ngx_str_t channel_id[NCHAN_MULTITAG_REQUEST_CTX_MAX]; 490 int channel_id_count; 491 time_t channel_subscriber_last_seen; 492 int channel_subscriber_count; 493 int channel_message_count; 494 ngx_str_t *channel_group_name; 495 496 ngx_str_t *request_origin_header; 497 ngx_str_t *allow_origin; 498 499 ngx_int_t subscriber_info_response_id; 500 ngx_str_t *subscriber_info_response_channel_id; 501 502 unsigned sent_unsubscribe_request:1; 503 unsigned request_ran_content_handler:1; 504 505 } nchan_request_ctx_t; 506 507 508 typedef struct { 509 ngx_str_t hostname; 510 ngx_str_t peername; // resolved hostname (ip address) 511 ngx_int_t port; 512 ngx_str_t password; 513 ngx_int_t db; 514 } redis_connect_params_t; 515 516 #endif /* NCHAN_TYPES_H */ 517