1 #ifndef SPOOL_HEADER 2 #define SPOOL_HEADER 3 4 #include <util/nchan_rbtree.h> 5 6 typedef struct spooled_subscriber_s spooled_subscriber_t; 7 typedef struct subscriber_pool_s subscriber_pool_t; 8 9 typedef struct { 10 spooled_subscriber_t *ssub; 11 subscriber_pool_t *spool; 12 } spooled_subscriber_cleanup_t; 13 14 struct spooled_subscriber_s { 15 ngx_uint_t id; //could be useful later 16 subscriber_t *sub; 17 spooled_subscriber_cleanup_t dequeue_callback_data; 18 spooled_subscriber_t *next; 19 spooled_subscriber_t *prev; 20 }; //spooled_subscriber_t 21 22 23 struct subscriber_pool_s { 24 nchan_msg_id_t id; 25 nchan_msg_t *msg; 26 nchan_msg_status_t msg_status; 27 spooled_subscriber_t *first; 28 29 //stack overflow prevention 30 ngx_uint_t fetchmsg_prev_msec; 31 ngx_int_t fetchmsg_current_count; 32 ngx_event_t fetchmsg_ev; 33 34 ngx_uint_t sub_count; 35 ngx_uint_t non_internal_sub_count; 36 //ngx_uint_t generation; 37 //ngx_uint_t responded_count; 38 uint8_t reserved; 39 struct channel_spooler_s *spooler; 40 }; // subscriber_pool_t 41 42 typedef struct channel_spooler_s channel_spooler_t; //holds many different spools 43 typedef struct channel_spooler_handlers_s channel_spooler_handlers_t; //spooler callbacks table 44 45 typedef struct { 46 ngx_int_t (*add)(channel_spooler_t *self, subscriber_t *sub); 47 ngx_int_t (*handle_channel_status_change)(channel_spooler_t *self); 48 ngx_int_t (*respond_message)(channel_spooler_t *self, nchan_msg_t *msg); 49 ngx_int_t (*respond_status)(channel_spooler_t *self, nchan_msg_id_t *id, ngx_int_t status_code, ngx_str_t *status_line); 50 ngx_int_t (*broadcast_status)(channel_spooler_t *self, ngx_int_t status_code, const ngx_str_t *status_line); 51 ngx_int_t (*broadcast_notice)(channel_spooler_t *self, ngx_int_t notice_code, void *data); 52 ngx_int_t (*prepare_to_stop)(channel_spooler_t *self); 53 } channel_spooler_fn_t; 54 55 typedef enum {NCHAN_SPOOL_FETCH, NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND, NCHAN_SPOOL_PASSTHROUGH} spooler_fetching_strategy_t; 56 57 58 typedef struct fetchmsg_data_s fetchmsg_data_t; 59 struct fetchmsg_data_s { 60 channel_spooler_t *spooler; 61 nchan_msg_id_t msgid; 62 fetchmsg_data_t *next; 63 fetchmsg_data_t *prev; 64 }; 65 66 typedef struct spooler_event_ll_s spooler_event_ll_t; 67 struct spooler_event_ll_s { 68 spooler_event_ll_t *prev; 69 ngx_event_t ev; 70 void (*callback)(void *); 71 void (*cancel)(void *); 72 channel_spooler_t *spooler; 73 spooler_event_ll_t *next; 74 }; 75 76 struct channel_spooler_s { 77 rbtree_seed_t spoolseed; 78 subscriber_pool_t current_msg_spool; 79 nchan_msg_id_t prev_msg_id; 80 ngx_uint_t responded_count; 81 ngx_str_t *chid; 82 chanhead_pubsub_status_t *channel_status; 83 uint8_t *channel_buffer_complete; 84 nchan_store_t *store; 85 nchan_loc_conf_t *cf; 86 channel_spooler_fn_t *fn; 87 channel_spooler_handlers_t *handlers; 88 void *handlers_privdata; 89 fetchmsg_data_t *fetchmsg_cb_data_list; 90 spooler_event_ll_t *spooler_dependent_events; 91 spooler_fetching_strategy_t fetching_strategy; 92 unsigned publish_events:1; 93 unsigned running:1; 94 unsigned want_to_stop:1; 95 96 }; 97 98 struct channel_spooler_handlers_s { 99 void (*add)(channel_spooler_t *, subscriber_t *, void *); 100 void (*dequeue)(channel_spooler_t *, subscriber_t *, void *); 101 void (*bulk_dequeue)(channel_spooler_t *, subscriber_type_t, ngx_int_t, void *); //called after dequeueing 1 or many subs 102 void (*use)(channel_spooler_t *, void *); 103 void (*get_message_start)(channel_spooler_t *, void *); 104 void (*get_message_finish)(channel_spooler_t *, void *); 105 }; 106 107 channel_spooler_t *start_spooler(channel_spooler_t *spl, ngx_str_t *chid, chanhead_pubsub_status_t *channel_status, uint8_t *channel_buffer_complete, nchan_store_t *store, nchan_loc_conf_t *cf, spooler_fetching_strategy_t fetching_strategy, channel_spooler_handlers_t *handlers, void *handlers_privdata); 108 ngx_int_t stop_spooler(channel_spooler_t *spl, uint8_t dequeue_subscribers); 109 110 ngx_int_t spooler_catch_up(channel_spooler_t *spl); 111 112 ngx_int_t spooler_print_contents(channel_spooler_t *spl); 113 114 ngx_event_t *spooler_add_timer(channel_spooler_t *spl, ngx_msec_t timeout, void (*cb)(void *), void (*cancel)(void *), void *pd); 115 116 117 #endif /*SPOOL_HEADER*/ 118