1 #include <nchan_module.h>
2 #include <subscribers/common.h>
3 #include "internal.h"
4 #include "getmsg_proxy.h"
5 #include <assert.h>
6
7 //#define DEBUG_LEVEL NGX_LOG_WARN
8 #define DEBUG_LEVEL NGX_LOG_DEBUG
9
10 #define DBG(fmt, arg...) ngx_log_error(DEBUG_LEVEL, ngx_cycle->log, 0, "SUB:PROXY:" fmt, ##arg)
11 #define ERR(fmt, arg...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "SUB:PROXY:" fmt, ##arg)
12
13
14 typedef struct sub_data_s sub_data_t;
15
16 struct sub_data_s {
17 subscriber_t *sub;
18 ngx_str_t *chid;
19 ngx_event_t timeout_ev;
20 callback_pt cb;
21 void *pd;
22 }; //sub_data_t
23
sub_enqueue(ngx_int_t status,void * ptr,sub_data_t * d)24 static ngx_int_t sub_enqueue(ngx_int_t status, void *ptr, sub_data_t *d) {
25 DBG("%p enqueued ok", d->sub);
26 return NGX_OK;
27 }
28
sub_dequeue(ngx_int_t status,void * ptr,sub_data_t * d)29 static ngx_int_t sub_dequeue(ngx_int_t status, void *ptr, sub_data_t* d) {
30 internal_subscriber_t *fsub = (internal_subscriber_t *)d->sub;
31 DBG("%p dequeue:", d->sub);
32
33 //TODO: make sure proxy target is already taken care of
34
35 if(fsub->sub.reserved > 0) {
36 DBG("%p not ready to destroy (reserved for %i)", fsub, fsub->sub.reserved);
37 fsub->awaiting_destruction = 1;
38 }
39 else {
40 DBG("%p destroy", fsub);
41 }
42
43 return NGX_OK;
44 }
45
sub_respond_message(ngx_int_t status,void * ptr,sub_data_t * d)46 static ngx_int_t sub_respond_message(ngx_int_t status, void *ptr, sub_data_t* d) {
47 DBG("%p forwarding msg", d->sub);
48 nchan_msg_t *msg = (nchan_msg_t *) ptr;
49 d->cb(MSG_FOUND, msg, d->pd);
50 d->cb = NULL;
51 //TODO : dequeue
52 return NGX_OK;
53 }
54
sub_respond_status(ngx_int_t status,void * ptr,sub_data_t * d)55 static ngx_int_t sub_respond_status(ngx_int_t status, void *ptr, sub_data_t *d) {
56 assert(d->cb);
57 if(!ptr) {
58 switch(status) {
59 case NGX_HTTP_GONE: //delete
60 DBG("%p forwarding MSG_EXPIRED", d->sub);
61 d->cb(MSG_EXPIRED, NULL, d->pd);
62 d->cb = NULL;
63 break;
64 case NGX_HTTP_CONFLICT:
65 case NGX_HTTP_CLOSE: //delete
66 case NGX_HTTP_FORBIDDEN:
67 case NGX_HTTP_NOT_FOUND:
68 case NGX_HTTP_REQUEST_TIMEOUT:
69 DBG("%p forwarding MSG_NOTFOUND", d->sub);
70 d->cb(MSG_NOTFOUND, NULL, d->pd);
71 d->cb = NULL;
72 break;
73 case NGX_HTTP_NO_CONTENT:
74 //translate back to MSG request status code
75 DBG("%p forwarding MSG_EXPECTED", d->sub);
76 d->cb(MSG_EXPECTED, NULL, d->pd);
77 d->cb = NULL;
78 break;
79 case NGX_HTTP_NOT_MODIFIED: //does this even ever happen? I don't think so.
80 assert(0);
81 break;
82 default:
83 ERR("unknown status %i", status);
84 }
85 }
86 return NGX_OK;
87 }
88
89 static ngx_str_t sub_name = ngx_string("getmsg-proxy");
90
getmsg_proxy_subscriber_create(nchan_msg_id_t * msgid,callback_pt cb,void * pd)91 subscriber_t *getmsg_proxy_subscriber_create(nchan_msg_id_t *msgid, callback_pt cb, void *pd) {
92 sub_data_t *d;
93 subscriber_t *sub;
94
95 sub = internal_subscriber_create_init(&sub_name, NULL /*no config*/, sizeof(*d), (void **)&d, (callback_pt )sub_enqueue, (callback_pt )sub_dequeue, (callback_pt )sub_respond_message, (callback_pt )sub_respond_status, NULL, NULL);
96
97
98 DBG("created new getmsg_proxy sub %p", sub);
99 nchan_copy_new_msg_id(&sub->last_msgid, msgid);
100 sub->destroy_after_dequeue = 1;
101 sub->dequeue_after_response = 1;
102 d->sub = sub;
103 d->cb = cb;
104 d->pd = pd;
105 return sub;
106 }
107