1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 #include "internal.h"
4 #include "getmsg_proxy.h"
5 #include <assert.h>
6 
7 //#define DEBUG_LEVEL NGX_LOG_WARN
8 #define DEBUG_LEVEL NGX_LOG_DEBUG
9 
10 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:PROXY:" fmt, ##arg)
11 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:PROXY:" fmt, ##arg)
12 
13 
14 typedef struct sub_data_s sub_data_t;
15 
16 struct sub_data_s {
17   subscriber_t                 *sub;
18   ngx_str_t                    *chid;
19   ngx_event_t                   timeout_ev;
20   callback_pt                   cb;
21   void                         *pd;
22 }; //sub_data_t
23 
sub_enqueue(ngx_int_t status,void * ptr,sub_data_t * d)24 static ngx_int_t sub_enqueue(ngx_int_t status, void *ptr, sub_data_t *d) {
25   DBG("%p enqueued ok", d->sub);
26   return NGX_OK;
27 }
28 
sub_dequeue(ngx_int_t status,void * ptr,sub_data_t * d)29 static ngx_int_t sub_dequeue(ngx_int_t status, void *ptr, sub_data_t* d) {
30   internal_subscriber_t  *fsub = (internal_subscriber_t  *)d->sub;
31   DBG("%p dequeue:", d->sub);
32 
33   //TODO: make sure proxy target is already taken care of
34 
35   if(fsub->sub.reserved > 0) {
36     DBG("%p  not ready to destroy (reserved for %i)", fsub, fsub->sub.reserved);
37     fsub->awaiting_destruction = 1;
38   }
39   else {
40     DBG("%p destroy", fsub);
41   }
42 
43   return NGX_OK;
44 }
45 
sub_respond_message(ngx_int_t status,void * ptr,sub_data_t * d)46 static ngx_int_t sub_respond_message(ngx_int_t status, void *ptr, sub_data_t* d) {
47   DBG("%p forwarding msg", d->sub);
48   nchan_msg_t            *msg = (nchan_msg_t *) ptr;
49   d->cb(MSG_FOUND, msg, d->pd);
50   d->cb = NULL;
51   //TODO : dequeue
52   return NGX_OK;
53 }
54 
sub_respond_status(ngx_int_t status,void * ptr,sub_data_t * d)55 static ngx_int_t sub_respond_status(ngx_int_t status, void *ptr, sub_data_t *d) {
56   assert(d->cb);
57   if(!ptr) {
58     switch(status) {
59       case NGX_HTTP_GONE: //delete
60         DBG("%p forwarding MSG_EXPIRED", d->sub);
61         d->cb(MSG_EXPIRED, NULL, d->pd);
62         d->cb = NULL;
63         break;
64       case NGX_HTTP_CONFLICT:
65       case NGX_HTTP_CLOSE: //delete
66       case NGX_HTTP_FORBIDDEN:
67       case NGX_HTTP_NOT_FOUND:
68       case NGX_HTTP_REQUEST_TIMEOUT:
69         DBG("%p forwarding MSG_NOTFOUND", d->sub);
70         d->cb(MSG_NOTFOUND, NULL, d->pd);
71         d->cb = NULL;
72         break;
73       case NGX_HTTP_NO_CONTENT:
74         //translate back to MSG request status code
75         DBG("%p forwarding MSG_EXPECTED", d->sub);
76         d->cb(MSG_EXPECTED, NULL, d->pd);
77         d->cb = NULL;
78         break;
79       case NGX_HTTP_NOT_MODIFIED: //does this even ever happen? I don't think so.
80         assert(0);
81         break;
82       default:
83         ERR("unknown status %i", status);
84     }
85   }
86   return NGX_OK;
87 }
88 
89 static ngx_str_t sub_name = ngx_string("getmsg-proxy");
90 
getmsg_proxy_subscriber_create(nchan_msg_id_t * msgid,callback_pt cb,void * pd)91 subscriber_t *getmsg_proxy_subscriber_create(nchan_msg_id_t *msgid, callback_pt cb, void *pd) {
92   sub_data_t                 *d;
93   subscriber_t               *sub;
94 
95   sub = internal_subscriber_create_init(&sub_name, NULL /*no config*/, sizeof(*d), (void **)&d, (callback_pt )sub_enqueue, (callback_pt )sub_dequeue, (callback_pt )sub_respond_message, (callback_pt )sub_respond_status, NULL, NULL);
96 
97 
98   DBG("created new getmsg_proxy sub %p", sub);
99   nchan_copy_new_msg_id(&sub->last_msgid, msgid);
100   sub->destroy_after_dequeue = 1;
101   sub->dequeue_after_response = 1;
102   d->sub = sub;
103   d->cb = cb;
104   d->pd = pd;
105   return sub;
106 }
107