1 #include <nchan_module.h>
2 #include <assert.h>
3 #include <store/memory/store.h>
4 #include <util/shmem.h>
5 
6 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "NCHAN MSG:" fmt, ##args)
7 
8 #define MSG_REFCOUNT_INVALID -9000
9 
10 #if NCHAN_MSG_RESERVE_DEBUG
nchan_msg_reserve_debug(nchan_msg_t * msg,char * lbl)11 static void nchan_msg_reserve_debug(nchan_msg_t *msg, char *lbl) {
12   msg_rsv_dbg_t     *rsv;
13   int shared = msg->storage == NCHAN_MSG_SHARED;
14 
15   if(shared)
16     shmtx_lock(nchan_store_memory_shmem);
17 
18   if(shared)
19     rsv=shm_locked_calloc(nchan_store_memory_shmem, sizeof(*rsv) + ngx_strlen(lbl) + 1, "msgdebug");
20   else
21     rsv=ngx_calloc(sizeof(*rsv) + ngx_strlen(lbl) + 1, ngx_cycle->log);
22 
23   rsv->lbl = (char *)(&rsv[1]);
24   ngx_memcpy(rsv->lbl, lbl, ngx_strlen(lbl));
25   if(msg->rsv == NULL) {
26     msg->rsv = rsv;
27     rsv->prev = NULL;
28     rsv->next = NULL;
29   }
30   else {
31     msg->rsv->prev = rsv;
32     rsv->next = msg->rsv;
33     rsv->prev = NULL;
34     msg->rsv = rsv;
35   }
36 
37   if(shared)
38     shmtx_unlock(nchan_store_memory_shmem);
39 }
40 
nchan_msg_release_debug(nchan_msg_t * msg,char * lbl)41 static void nchan_msg_release_debug(nchan_msg_t *msg, char *lbl) {
42   msg_rsv_dbg_t     *cur, *prev, *next;
43   size_t             sz = ngx_strlen(lbl);
44   ngx_int_t          rsv_found=0;
45   int shared = msg->storage == NCHAN_MSG_SHARED;
46 
47   if(shared)
48     shmtx_lock(nchan_store_memory_shmem);
49 
50   assert(msg->refcount > 0);
51   for(cur = msg->rsv; cur != NULL; cur = cur->next) {
52     if(ngx_memcmp(lbl, cur->lbl, sz) == 0) {
53       prev = cur->prev;
54       next = cur->next;
55       if(prev) {
56         prev->next = next;
57       }
58       if(next) {
59         next->prev = prev;
60       }
61       if(cur == msg->rsv) {
62         msg->rsv = next;
63       }
64 
65       if(shared)
66         shm_locked_free(nchan_store_memory_shmem, cur);
67       else
68         ngx_free(cur);
69 
70       rsv_found = 1;
71       break;
72     }
73   }
74   assert(rsv_found);
75   if(shared)
76     shmtx_unlock(nchan_store_memory_shmem);
77 }
78 #endif
79 
msg_refcount_valid(nchan_msg_t * msg)80 int msg_refcount_valid(nchan_msg_t *msg) {
81   return msg->refcount >= 0;
82 }
83 
msg_refcount_invalidate_if_zero(nchan_msg_t * msg)84 int msg_refcount_invalidate_if_zero(nchan_msg_t *msg) {
85   return ngx_atomic_cmp_set((ngx_atomic_uint_t *)&msg->refcount, 0, MSG_REFCOUNT_INVALID);
86 }
msg_refcount_invalidate(nchan_msg_t * msg)87 void msg_refcount_invalidate(nchan_msg_t *msg) {
88   msg->refcount = MSG_REFCOUNT_INVALID;
89 }
90 
91 
msg_reserve(nchan_msg_t * msg,char * lbl)92 ngx_int_t msg_reserve(nchan_msg_t *msg, char *lbl) {
93   if(msg->parent) {
94     assert(msg->storage != NCHAN_MSG_SHARED);
95     msg->refcount++;
96 #if NCHAN_MSG_RESERVE_DEBUG
97     nchan_msg_reserve_debug(msg, lbl);
98 #endif
99     return msg_reserve(msg->parent, lbl);
100   }
101   assert(!msg->parent);
102 
103   ngx_atomic_fetch_add((ngx_atomic_uint_t *)&msg->refcount, 1);
104   assert(msg->refcount >= 0);
105   if(msg->refcount < 0) {
106     msg->refcount = MSG_REFCOUNT_INVALID;
107     return NGX_ERROR;
108   }
109 #if NCHAN_MSG_RESERVE_DEBUG
110   nchan_msg_reserve_debug(msg, lbl);
111 #endif
112 
113   //DBG("msg %p reserved (%i) %s", msg, msg->refcount, lbl);
114   return NGX_OK;
115 }
116 
msg_release(nchan_msg_t * msg,char * lbl)117 ngx_int_t msg_release(nchan_msg_t *msg, char *lbl) {
118   nchan_msg_t    *parent = msg->parent;
119   if(parent) {
120     assert(msg->storage != NCHAN_MSG_SHARED);
121 #if NCHAN_MSG_RESERVE_DEBUG
122     nchan_msg_release_debug(msg, lbl);
123 #endif
124     msg->refcount--;
125     assert(msg->refcount >= 0);
126 
127     if(msg->refcount == 0) {
128       switch(msg->storage) {
129         case NCHAN_MSG_POOL:
130           //free the id, the rest of the msg will be cleaned with the pool
131           nchan_free_msg_id(&msg->id);
132           break;
133 
134         case NCHAN_MSG_HEAP:
135           nchan_free_msg_id(&msg->id);
136           ngx_free(msg);
137           break;
138 
139         default:
140           break;
141           //do nothing for NCHAN_MSG_STACK. NCHAN_MSG_SHARED should never be seen here.
142       }
143     }
144     return msg_release(parent, lbl);
145   }
146   assert(!parent);
147 
148 #if NCHAN_MSG_RESERVE_DEBUG
149   nchan_msg_release_debug(msg, lbl);
150 #endif
151   assert(msg->refcount > 0);
152   ngx_atomic_fetch_add((ngx_atomic_uint_t *)&msg->refcount, -1);
153   //DBG("msg %p released (%i) %s", msg, msg->refcount, lbl);
154   return NGX_OK;
155 }
156 
157 
nchan_msgid_tagcount_match(nchan_msg_id_t * id,int count)158 int nchan_msgid_tagcount_match(nchan_msg_id_t *id, int count) {
159   switch(id->time) {
160     case NCHAN_OLDEST_MSGID_TIME:
161     case NCHAN_NEWEST_MSGID_TIME:
162     case NCHAN_NTH_MSGID_TIME:
163       if(id->tagcount == 1 && id->tagactive == 0)
164         return 1;
165       break;
166     default:
167       if(id->tagcount == count && id->tagactive >= 0 && id->tagactive < count)
168         return 1;
169       break;
170   }
171   return 0;
172 }
173 
nchan_expand_msg_id_multi_tag(nchan_msg_id_t * id,uint8_t in_n,uint8_t out_n,int16_t fill)174 void nchan_expand_msg_id_multi_tag(nchan_msg_id_t *id, uint8_t in_n, uint8_t out_n, int16_t fill) {
175   int16_t v, n = id->tagcount;
176   int16_t *tags = n <= NCHAN_FIXED_MULTITAG_MAX ? id->tag.fixed : id->tag.allocd;
177   uint8_t i;
178   assert(n > in_n && n > out_n);
179   v = tags[in_n];
180 
181   for(i=0; i < n; i++) {
182     tags[i] = (i == out_n) ? v : fill;
183   }
184 }
185 
nchan_expand_tiny_msgid(nchan_msg_tiny_id_t * tinyid,nchan_msg_id_t * id)186 void nchan_expand_tiny_msgid(nchan_msg_tiny_id_t *tinyid, nchan_msg_id_t *id) {
187   id->time = tinyid->time;
188   id->tag.fixed[0]=tinyid->tag;
189   id->tagcount = 1;
190   id->tagactive = 1;
191 }
192 
nchan_shrink_normal_msgid(nchan_msg_id_t * id,nchan_msg_tiny_id_t * tinyid)193 void nchan_shrink_normal_msgid(nchan_msg_id_t *id, nchan_msg_tiny_id_t *tinyid) {
194   assert(id->tagcount <= 1);
195   tinyid->time = id->time;
196   tinyid->tag = id->tag.fixed[0];
197 }
198 
nchan_copy_new_msg_id(nchan_msg_id_t * dst,nchan_msg_id_t * src)199 ngx_int_t nchan_copy_new_msg_id(nchan_msg_id_t *dst, nchan_msg_id_t *src) {
200   ngx_memcpy(dst, src, sizeof(*src));
201   if(src->tagcount > NCHAN_FIXED_MULTITAG_MAX) {
202     size_t sz = sizeof(*src->tag.allocd) * src->tagcount;
203     if((dst->tag.allocd = ngx_alloc(sz, ngx_cycle->log)) == NULL) {
204       return NGX_ERROR;
205     }
206     ngx_memcpy(dst->tag.allocd, src->tag.allocd, sz);
207   }
208   return NGX_OK;
209 }
nchan_copy_msg_id(nchan_msg_id_t * dst,nchan_msg_id_t * src,int16_t * largetags)210 ngx_int_t nchan_copy_msg_id(nchan_msg_id_t *dst, nchan_msg_id_t *src, int16_t *largetags) {
211   uint16_t dst_n = dst->tagcount, src_n = src->tagcount;
212   dst->time = src->time;
213 
214   if(dst_n > NCHAN_FIXED_MULTITAG_MAX && dst_n != src_n) {
215     ngx_free(dst->tag.allocd);
216     dst_n = NCHAN_FIXED_MULTITAG_MAX;
217   }
218 
219   dst->tagcount = src->tagcount;
220   dst->tagactive = src->tagactive;
221 
222   if(src_n <= NCHAN_FIXED_MULTITAG_MAX) {
223     dst->tag = src->tag;
224   }
225   else {
226     if(dst_n != src_n) {
227       if(!largetags) {
228         if((largetags = ngx_alloc(sizeof(*largetags) * src_n, ngx_cycle->log)) == NULL) {
229           return NGX_ERROR;
230         }
231       }
232       dst->tag.allocd = largetags;
233     }
234 
235     ngx_memcpy(dst->tag.allocd, src->tag.allocd, sizeof(*src->tag.allocd) * src_n);
236   }
237   return NGX_OK;
238 }
239 
nchan_free_msg_id(nchan_msg_id_t * id)240 ngx_int_t nchan_free_msg_id(nchan_msg_id_t *id) {
241   if(id->tagcount > NCHAN_FIXED_MULTITAG_MAX) {
242     ngx_free(id->tag.allocd);
243     id->tag.allocd = NULL;
244   }
245   return NGX_OK;
246 }
247 
verify_msg_id(nchan_msg_id_t * id1,nchan_msg_id_t * id2,nchan_msg_id_t * msgid,char ** err)248 static ngx_int_t verify_msg_id(nchan_msg_id_t *id1, nchan_msg_id_t *id2, nchan_msg_id_t *msgid, char **err) {
249   int16_t  *tags1 = id1->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? id1->tag.fixed : id1->tag.allocd;
250   int16_t  *tags2 = id2->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? id2->tag.fixed : id2->tag.allocd;
251   if(id1->time > 0 && id2->time > 0) {
252     if(id1->time != id2->time) {
253       //is this a missed message, or just a multi msg?
254 
255       if(id2->tagcount > 1) {
256         int       i = -1, j, max = id2->tagcount;
257         int16_t  *msgidtags = msgid->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? msgid->tag.fixed : msgid->tag.allocd;
258 
259         for(j=0; j < max; j++) {
260           if(tags2[j] != -1) {
261             if( i != -1) {
262               *err = "more than one tag set to something besides -1. that means this isn't a single channel's forwarded multi msg";
263               return NGX_ERROR;
264             }
265             else {
266               i = j;
267             }
268           }
269         }
270         if(msgidtags[i] != 0) {
271           *err = "only the first message in a given second is ok. anything else means a missed message.";
272           return NGX_ERROR;
273         }
274         //ok, it's just the first-per-second message of a channel from a multi-channel
275         //this is a rather convoluted description... but basically this is ok.
276         return NGX_OK;
277       }
278       else {
279         *err = "previous message id times don't match";
280         return NGX_ERROR;
281       }
282     }
283 
284     if(id1->tagcount == 1) {
285       if(tags1[0] != tags2[0]){
286         *err = "previous message id tags don't match";
287         return NGX_ERROR;
288       }
289     }
290     else {
291       int   i, max = id1->tagcount;
292       for(i=0; i < max; i++) {
293         if(tags2[i] != -1 && tags1[i] != tags2[i]) {
294           *err = "previous message multitag mismatch";
295           return NGX_ERROR;
296         }
297       }
298     }
299   }
300   return NGX_OK;
301 }
302 
nchan_update_multi_msgid(nchan_msg_id_t * oldid,nchan_msg_id_t * newid,int16_t * largetags)303 void nchan_update_multi_msgid(nchan_msg_id_t *oldid, nchan_msg_id_t *newid, int16_t *largetags) {
304   if(newid->tagcount == 1) {
305     //nice and simple
306     *oldid = *newid;
307   }
308   else {
309     //DBG("======= updating multi_msgid ======");
310     //DBG("======= old: %V", msgid_to_str(oldid));
311     //DBG("======= new: %V", msgid_to_str(newid));
312     uint16_t         newcount = newid->tagcount, oldcount = oldid->tagcount;
313     if(newcount > NCHAN_FIXED_MULTITAG_MAX && oldcount < newcount) {
314       int16_t       *oldtags, *old_largetags = NULL;
315       int            i;
316       size_t         sz = sizeof(*oldid->tag.allocd) * newcount;
317       if(oldcount > NCHAN_FIXED_MULTITAG_MAX) {
318         old_largetags = oldid->tag.allocd;
319         oldtags = old_largetags;
320       }
321       else {
322         oldtags = oldid->tag.fixed;
323       }
324       if(largetags == NULL) {
325         largetags = ngx_alloc(sz, ngx_cycle->log);
326       }
327       oldid->tag.allocd = largetags;
328       for(i=0; i < newcount; i++) {
329         oldid->tag.allocd[i] = (i < oldcount) ? oldtags[i] : -1;
330       }
331       if(old_largetags) {
332         ngx_free(old_largetags);
333       }
334       oldid->tagcount = newcount;
335     }
336 
337     if(oldid->time != newid->time) {
338       nchan_copy_msg_id(oldid, newid, NULL);
339     }
340     else {
341       int i, max = newcount;
342       int16_t  *oldtags = oldcount <= NCHAN_FIXED_MULTITAG_MAX ? oldid->tag.fixed : oldid->tag.allocd;
343       int16_t  *newtags = oldcount <= NCHAN_FIXED_MULTITAG_MAX ? newid->tag.fixed : newid->tag.allocd;
344 
345       assert(max == oldcount);
346 
347       for(i=0; i< max; i++) {
348 
349         //DEBUG CHECK -- REMOVE BEFORE RELEASE
350         if(newid->tagactive == i && newtags[i] != -1 && oldtags[i] != -1) {
351           assert(newtags[i] > oldtags[i]);
352         }
353 
354 
355         if (newtags[i] != -1) {
356           oldtags[i] = newtags[i];
357         }
358       }
359       oldid->tagactive = newid->tagactive;
360     }
361     //DBG("=== updated: %V", msgid_to_str(oldid));
362   }
363 }
364 
update_subscriber_last_msg_id(subscriber_t * sub,nchan_msg_t * msg)365 ngx_int_t update_subscriber_last_msg_id(subscriber_t *sub, nchan_msg_t *msg) {
366   if(msg) {
367     char *err, *huh;
368     if(verify_msg_id(&sub->last_msgid, &msg->prev_id, &msg->id, &err) == NGX_ERROR) {
369       struct timeval    tv;
370       time_t            time;
371       int               ttl = msg->expires - msg->id.time;
372       ngx_gettimeofday(&tv);
373       time = tv.tv_sec;
374 
375       if(sub->last_msgid.time + ttl <= time) {
376         huh = "The message probably expired.";
377       }
378       else {
379         huh = "Try increasing the message buffer length.";
380       }
381 
382       if(sub->type == INTERNAL) {
383         nchan_log_warning("Missed message for internal %V subscriber: %s. %s", sub->name, err, huh);
384       }
385       else {
386         nchan_log_request_warning(sub->request, "Missed message for %V subscriber: %s. %s", sub->name, err, huh);
387       }
388     }
389 
390     nchan_update_multi_msgid(&sub->last_msgid, &msg->id, NULL);
391   }
392 
393   return NGX_OK;
394 }
395 
396 
397 
398 
399 
400 
nchan_parse_msg_tag(u_char * first,u_char * last,nchan_msg_id_t * mid,ngx_int_t expected_tag_count)401 static ngx_int_t nchan_parse_msg_tag(u_char *first, u_char *last, nchan_msg_id_t *mid, ngx_int_t expected_tag_count) {
402   u_char           *cur = first;
403   u_char            c;
404   int16_t           i = 0;
405   int8_t            sign = 1;
406   int16_t           val = 0;
407   static int16_t    tags[NCHAN_MULTITAG_MAX];
408   int brace_open_total = 0;
409   int brace_close_total = 0;
410   int tagactive = NGX_ERROR;
411 
412   while(first != NULL && last != NULL && cur <= last && i < NCHAN_MULTITAG_MAX) {
413     if(cur == last) {
414       tags[i]=(val == 0 && sign == -1) ? -1 : val * sign; //shorthand "-" for "-1";
415       i++;
416       break;
417     }
418 
419     c = *cur;
420     if(c == '-') {
421       sign = -1;
422     }
423     else if (c >= '0' && c <= '9') {
424       val = 10 * val + (c - '0');
425     }
426     else if (c == '[') {
427       if(++brace_open_total > 1) {
428         return NGX_ERROR;
429       }
430       tagactive = i;
431     }
432     else if (c == ']') {
433       if(++brace_close_total > 1 || brace_open_total - brace_close_total != 0) {
434         return NGX_ERROR;
435       }
436     }
437     else if (c == ',') {
438       tags[i]=(val == 0 && sign == -1) ? -1 : val * sign; //shorthand "-" for "-1"
439       sign=1;
440       val=0;
441       i++;
442     }
443     cur++;
444   }
445 
446   if (i == 0) {
447     mid->tagactive = -1; // No tags, so no active tag
448   }
449   if(tagactive == NGX_ERROR) {
450     if(i==1) {
451       //single message tag, so it's active by defaiult
452       mid->tagactive = 0;
453     }
454     else {
455       //got a multitag with no tag braced as [active]. That's invalid.
456       return NGX_ERROR;
457     }
458   }
459   else {
460     mid->tagactive = tagactive;
461   }
462 
463   // We fill the rest of the tag needed with value '-1'
464   while (i < expected_tag_count) {
465     tags[i] = -1;
466     i++;
467   }
468 
469   mid->tagcount = i;
470 
471   if(i <= NCHAN_FIXED_MULTITAG_MAX) {
472     ngx_memcpy(mid->tag.fixed, tags, sizeof(mid->tag.fixed));
473   }
474   else {
475     mid->tag.allocd = tags;
476   }
477   return NGX_OK;
478 }
479 
nchan_extract_from_multi_msgid(nchan_msg_id_t * src,uint16_t n,nchan_msg_id_t * dst)480 ngx_int_t nchan_extract_from_multi_msgid(nchan_msg_id_t *src, uint16_t n, nchan_msg_id_t *dst) {
481   uint8_t count = src->tagcount;
482   int16_t *tags;
483 
484   if(src->time == NCHAN_OLDEST_MSGID_TIME || src->time == NCHAN_NEWEST_MSGID_TIME) {
485     dst->time = src->time;
486     dst->tag.fixed[0] = 0;
487     dst->tagcount = 1;
488     dst->tagactive = 0;
489     return NGX_OK;
490   }
491   else if(src->time == NCHAN_NTH_MSGID_TIME) {
492     dst->time = src->time;
493     dst->tag.fixed[0] = src->tag.fixed[0];
494     dst->tagcount = 1;
495     dst->tagactive = 0;
496     return NGX_OK;
497   }
498 
499   if(n >= count) {
500     ERR("can't extract msgid %i from multi-msg of count %i", n, count);
501     return NGX_ERROR;
502   }
503   tags = (count <= NCHAN_FIXED_MULTITAG_MAX) ? src->tag.fixed : src->tag.allocd;
504 
505   dst->time = src->time;
506   if(tags[n] == -1) {
507     dst->time --;
508     dst->tag.fixed[0] = 32767; //eeeeeh this is bad. but it's good enough.
509   }
510   else {
511     dst->tag.fixed[0] = tags[n];
512   }
513   dst->tagcount = 1;
514   dst->tagactive = 0;
515 
516   return NGX_OK;
517 }
518 
nchan_subscriber_get_etag(ngx_http_request_t * r)519 static ngx_str_t *nchan_subscriber_get_etag(ngx_http_request_t * r) {
520 #if nginx_version >= 1008000
521   return r->headers_in.if_none_match ? &r->headers_in.if_none_match->value : NULL;
522 #else
523   ngx_uint_t                       i;
524   ngx_list_part_t                 *part = &r->headers_in.headers.part;
525   ngx_table_elt_t                 *header= part->elts;
526   for (i = 0;  ; i++) {
527     if (i >= part->nelts) {
528       if (part->next == NULL) {
529         break;
530       }
531       part = part->next;
532       header = part->elts;
533       i = 0;
534     }
535     if (header[i].key.len == NCHAN_HEADER_IF_NONE_MATCH.len
536       && ngx_strncasecmp(header[i].key.data, NCHAN_HEADER_IF_NONE_MATCH.data, header[i].key.len) == 0) {
537       return &header[i].value;
538     }
539   }
540   return NULL;
541 #endif
542 }
543 
nchan_parse_compound_msgid(nchan_msg_id_t * id,ngx_str_t * str,ngx_int_t expected_tag_count)544 ngx_int_t nchan_parse_compound_msgid(nchan_msg_id_t *id, ngx_str_t *str, ngx_int_t expected_tag_count) {
545   //parse url-unescaped compound msgid
546   u_char       *split, *last;
547   ngx_int_t     time;
548   uint8_t       len;
549   //"<msg_time>:<msg_tag>"
550   last = str->data + str->len;
551   if((split = ngx_strlchr(str->data, last, ':')) != NULL) {
552     len = 1;
553   }
554   else {
555     len = 0; //placate dumb GCC warning
556   }
557   if(split) {
558     time = ngx_atoi(str->data, split - str->data);
559     split += len;
560     if(time != NGX_ERROR) {
561       id->time = time;
562       return nchan_parse_msg_tag(split, last, id, expected_tag_count);
563     }
564     else {
565       return NGX_ERROR;
566     }
567   }
568   return NGX_DECLINED;
569 }
570 
571 
572 
set_default_id(nchan_loc_conf_t * cf,nchan_msg_id_t * id)573 static ngx_int_t set_default_id(nchan_loc_conf_t *cf, nchan_msg_id_t *id) {
574   static nchan_msg_id_t           nth_msg_id = NCHAN_NTH_MSGID;
575   static nchan_msg_id_t           oldest_msg_id = NCHAN_OLDEST_MSGID;
576   static nchan_msg_id_t           newest_msg_id = NCHAN_NEWEST_MSGID;
577   switch(cf->subscriber_first_message) {
578     case 1:
579       *id = oldest_msg_id;
580       break;
581     case 0:
582       *id = newest_msg_id;
583       break;
584     default:
585       *id = nth_msg_id;
586       id->tag.fixed[0] = cf->subscriber_first_message;
587       break;
588   }
589   return NGX_OK;
590 }
591 
nchan_subscriber_get_msg_id(ngx_http_request_t * r)592 nchan_msg_id_t *nchan_subscriber_get_msg_id(ngx_http_request_t *r) {
593   static nchan_msg_id_t           id = NCHAN_ZERO_MSGID;
594 
595   ngx_str_t                      *if_none_match;
596   nchan_loc_conf_t               *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
597   nchan_request_ctx_t            *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
598   int                             i;
599   ngx_int_t                       rc;
600 
601   if_none_match = nchan_subscriber_get_etag(r);
602 
603   if(!cf->msg_in_etag_only && r->headers_in.if_modified_since != NULL) {
604     id.time=ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len);
605 
606     if(id.time <= 0) { //anything before 1-1-1970 is reserved and treated as no msgid provided
607       set_default_id(cf, &id);
608       return &id;
609     }
610 
611     u_char *first = NULL, *last = NULL;
612     if(if_none_match != NULL) {
613       first = if_none_match->data;
614       last = if_none_match->data + if_none_match->len;
615     }
616 
617     if(nchan_parse_msg_tag(first, last, &id, ctx->channel_id_count) == NGX_ERROR) {
618       return NULL;
619     }
620 
621     return &id;
622   }
623   else if((cf->msg_in_etag_only || r->headers_in.if_modified_since == NULL) && if_none_match) {
624     rc = nchan_parse_compound_msgid(&id, if_none_match, ctx->channel_id_count);
625     if(rc == NGX_OK) {
626       return &id;
627     }
628     else if(rc == NGX_ERROR) {
629       return NULL;
630     }
631   }
632   else {
633     nchan_complex_value_arr_t   *alt_msgid_cv_arr = &cf->last_message_id;
634     u_char                       buf[128];
635     ngx_str_t                    str;
636     int                          n = alt_msgid_cv_arr->n;
637     ngx_int_t                    rc2;
638 
639     str.len = 0;
640     str.data = buf;
641 
642     for(i=0; i < n; i++) {
643       rc = ngx_http_complex_value_noalloc(r, alt_msgid_cv_arr->cv[i], &str, 128);
644       if(str.len > 0 && rc == NGX_OK) {
645         rc2 = nchan_parse_compound_msgid(&id, nchan_urldecode_str(r, &str), ctx->channel_id_count);
646         if(rc2 == NGX_OK) {
647           return &id;
648         }
649         else if(rc2 == NGX_ERROR) {
650           return NULL;
651         }
652       }
653     }
654   }
655 
656   set_default_id(cf, &id);
657   return &id;
658 }
659 
660 
nchan_compare_msgid_tags(nchan_msg_id_t * id1,nchan_msg_id_t * id2)661 int8_t nchan_compare_msgid_tags(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
662   uint8_t active = id2->tagactive;
663   int16_t *tags1, *tags2;
664   int16_t t1, t2;
665 
666   tags1 = (id1->tagcount <= NCHAN_FIXED_MULTITAG_MAX) ? id1->tag.fixed : id1->tag.allocd;
667   tags2 = (id2->tagcount <= NCHAN_FIXED_MULTITAG_MAX) ? id2->tag.fixed : id2->tag.allocd;
668 
669   //debugstuff that prevents this function from getting inlined
670   assert(id1->time == id2->time);
671   int i, nonnegs = 0;
672   for (i=0; i < id2->tagcount; i++) {
673     if(tags2[i] >= 0) nonnegs++;
674   }
675   assert(nonnegs == 1);
676 
677   if(id1->time == 0 && id2->time == 0) return 0; //always equal on zero-time
678 
679   t1 = (active < id1->tagcount) ? tags1[active] : -1;
680   t2 = tags2[active];
681 
682   //ERR("Comparing msgids: id1: %V --", msgid_to_str(id1));
683   //ERR("  --- id2: %V --", msgid_to_str(id2));
684 
685   if(t1 < t2){
686     //ERR("id1 is smaller. -1");
687     return -1;
688   }
689   if(t1 > t2){
690     //ERR("id1 is larger. 1");
691     return  1;
692   }
693   //ERR("id1 equals id2. 0");
694   return 0;
695 }
696 
nchan_compare_msgids(nchan_msg_id_t * id1,nchan_msg_id_t * id2)697 int8_t nchan_compare_msgids(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
698   assert(id1->tagcount == id2->tagcount);
699   if(id1->time < id2->time) {
700     return -1;
701   }
702   else if(id1->time > id2->time) {
703     return 1;
704   }
705   else {
706     assert(id1->tagcount == id2->tagcount);
707     if(id1->tagcount == 1) {
708       if(id1->tag.fixed[0] < id2->tag.fixed[0]) {
709         return -1;
710       }
711       else if(id1->tag.fixed[0] > id2->tag.fixed[0]) {
712         return 1;
713       }
714       else {
715         return 0;
716       }
717     }
718     else {
719       return nchan_compare_msgid_tags(id1, id2);
720     }
721   }
722 }
723 
724 
get_shared_msg(nchan_msg_t * msg)725 static nchan_msg_t *get_shared_msg(nchan_msg_t *msg) {
726   if(msg->storage == NCHAN_MSG_SHARED) {
727     assert(msg->parent == NULL);
728     return msg;
729   }
730   else {
731     assert(msg->parent);
732     assert(msg->parent->storage == NCHAN_MSG_SHARED);
733     return msg->parent;
734   }
735 }
736 
msg_derive_init(nchan_msg_t * parent,nchan_msg_t * msg,nchan_msg_storage_t storage_type)737 static ngx_inline nchan_msg_t *msg_derive_init(nchan_msg_t *parent, nchan_msg_t *msg, nchan_msg_storage_t storage_type) {
738   nchan_msg_t    *shared = get_shared_msg(parent);
739   if(!msg) { return NULL; }
740   *msg = *shared;
741   msg->id.tagcount=1;
742   msg->parent = shared;
743   msg->storage = storage_type;
744 #if NCHAN_MSG_RESERVE_DEBUG
745   msg->rsv = NULL;
746 #endif
747   msg->refcount = 0;
748   return msg;
749 }
750 
nchan_msg_derive_alloc(nchan_msg_t * parent)751 nchan_msg_t *nchan_msg_derive_alloc(nchan_msg_t *parent) {
752   nchan_msg_t *msg = msg_derive_init(parent, ngx_alloc(sizeof(nchan_msg_t), ngx_cycle->log), NCHAN_MSG_HEAP);
753   if(!msg || nchan_copy_new_msg_id(&msg->id, &parent->id) != NGX_OK) {
754     ngx_free(msg);
755     return NULL;
756   }
757   return msg;
758 }
nchan_msg_derive_palloc(nchan_msg_t * parent,ngx_pool_t * pool)759 nchan_msg_t *nchan_msg_derive_palloc(nchan_msg_t *parent, ngx_pool_t *pool) {
760   nchan_msg_t *msg = msg_derive_init(parent, ngx_palloc(pool, sizeof(nchan_msg_t)), NCHAN_MSG_POOL);
761   if(!msg || nchan_copy_new_msg_id(&msg->id, &parent->id) != NGX_OK) {
762     return NULL;
763   }
764   return msg;
765 }
nchan_msg_derive_stack(nchan_msg_t * parent,nchan_msg_t * child,int16_t * largetags)766 nchan_msg_t *nchan_msg_derive_stack(nchan_msg_t *parent, nchan_msg_t *child, int16_t *largetags) {
767   nchan_msg_t *msg = msg_derive_init(parent, child, NCHAN_MSG_STACK);
768   if(!msg || nchan_copy_msg_id(&msg->id, &parent->id, largetags) != NGX_OK) {
769     return NULL;
770   }
771   return msg;
772 }
773 
774