1 #ifndef SPOOL_HEADER
2 #define SPOOL_HEADER
3 
4 #include <util/nchan_rbtree.h>
5 
6 typedef struct spooled_subscriber_s spooled_subscriber_t;
7 typedef struct subscriber_pool_s subscriber_pool_t;
8 
9 typedef struct {
10   spooled_subscriber_t  *ssub;
11   subscriber_pool_t     *spool;
12 } spooled_subscriber_cleanup_t;
13 
14 struct spooled_subscriber_s {
15   ngx_uint_t                    id; //could be useful later
16   subscriber_t                 *sub;
17   spooled_subscriber_cleanup_t  dequeue_callback_data;
18   spooled_subscriber_t         *next;
19   spooled_subscriber_t         *prev;
20 }; //spooled_subscriber_t
21 
22 
23 struct subscriber_pool_s {
24   nchan_msg_id_t              id;
25   nchan_msg_t                *msg;
26   nchan_msg_status_t          msg_status;
27   spooled_subscriber_t       *first;
28 
29   //stack overflow prevention
30   ngx_uint_t                  fetchmsg_prev_msec;
31   ngx_int_t                   fetchmsg_current_count;
32   ngx_event_t                 fetchmsg_ev;
33 
34   ngx_uint_t                  sub_count;
35   ngx_uint_t                  non_internal_sub_count;
36   //ngx_uint_t                  generation;
37   //ngx_uint_t                  responded_count;
38   uint8_t                     reserved;
39   struct channel_spooler_s   *spooler;
40 }; // subscriber_pool_t
41 
42 typedef struct channel_spooler_s channel_spooler_t; //holds many different spools
43 typedef struct channel_spooler_handlers_s channel_spooler_handlers_t; //spooler callbacks table
44 
45 typedef struct {
46   ngx_int_t            (*add)(channel_spooler_t *self, subscriber_t *sub);
47   ngx_int_t            (*handle_channel_status_change)(channel_spooler_t *self);
48   ngx_int_t            (*respond_message)(channel_spooler_t *self, nchan_msg_t *msg);
49   ngx_int_t            (*respond_status)(channel_spooler_t *self, nchan_msg_id_t *id, ngx_int_t status_code, ngx_str_t *status_line);
50   ngx_int_t            (*broadcast_status)(channel_spooler_t *self, ngx_int_t status_code, const ngx_str_t *status_line);
51   ngx_int_t            (*broadcast_notice)(channel_spooler_t *self, ngx_int_t notice_code, void *data);
52   ngx_int_t            (*prepare_to_stop)(channel_spooler_t *self);
53 } channel_spooler_fn_t;
54 
55 typedef enum {NCHAN_SPOOL_FETCH, NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND, NCHAN_SPOOL_PASSTHROUGH} spooler_fetching_strategy_t;
56 
57 
58 typedef struct fetchmsg_data_s fetchmsg_data_t;
59 struct fetchmsg_data_s {
60   channel_spooler_t   *spooler;
61   nchan_msg_id_t       msgid;
62   fetchmsg_data_t     *next;
63   fetchmsg_data_t     *prev;
64 };
65 
66 typedef struct spooler_event_ll_s spooler_event_ll_t;
67 struct spooler_event_ll_s {
68   spooler_event_ll_t   *prev;
69   ngx_event_t           ev;
70   void                (*callback)(void *);
71   void                (*cancel)(void *);
72   channel_spooler_t    *spooler;
73   spooler_event_ll_t   *next;
74 };
75 
76 struct channel_spooler_s {
77   rbtree_seed_t               spoolseed;
78   subscriber_pool_t           current_msg_spool;
79   nchan_msg_id_t              prev_msg_id;
80   ngx_uint_t                  responded_count;
81   ngx_str_t                  *chid;
82   chanhead_pubsub_status_t   *channel_status;
83   uint8_t                    *channel_buffer_complete;
84   nchan_store_t              *store;
85   nchan_loc_conf_t           *cf;
86   channel_spooler_fn_t       *fn;
87   channel_spooler_handlers_t *handlers;
88   void                       *handlers_privdata;
89   fetchmsg_data_t            *fetchmsg_cb_data_list;
90   spooler_event_ll_t         *spooler_dependent_events;
91   spooler_fetching_strategy_t fetching_strategy;
92   unsigned                    publish_events:1;
93   unsigned                    running:1;
94   unsigned                    want_to_stop:1;
95 
96 };
97 
98 struct channel_spooler_handlers_s {
99   void                        (*add)(channel_spooler_t *, subscriber_t *, void *);
100   void                        (*dequeue)(channel_spooler_t *, subscriber_t *, void *);
101   void                        (*bulk_dequeue)(channel_spooler_t *, subscriber_type_t, ngx_int_t, void *); //called after dequeueing 1 or many subs
102   void                        (*use)(channel_spooler_t *, void *);
103   void                        (*get_message_start)(channel_spooler_t *, void *);
104   void                        (*get_message_finish)(channel_spooler_t *, void *);
105 };
106 
107 channel_spooler_t *start_spooler(channel_spooler_t *spl, ngx_str_t *chid, chanhead_pubsub_status_t *channel_status, uint8_t *channel_buffer_complete, nchan_store_t *store, nchan_loc_conf_t *cf, spooler_fetching_strategy_t fetching_strategy, channel_spooler_handlers_t *handlers, void *handlers_privdata);
108 ngx_int_t stop_spooler(channel_spooler_t *spl, uint8_t dequeue_subscribers);
109 
110 ngx_int_t spooler_catch_up(channel_spooler_t *spl);
111 
112 ngx_int_t spooler_print_contents(channel_spooler_t *spl);
113 
114 ngx_event_t *spooler_add_timer(channel_spooler_t *spl, ngx_msec_t timeout, void (*cb)(void *), void (*cancel)(void *), void *pd);
115 
116 
117 #endif  /*SPOOL_HEADER*/
118