1 #include <nchan_module.h>
2 #include <assert.h>
3 #include <store/memory/store.h>
4 #include <util/shmem.h>
5
6 #define ERR(fmt, args...) ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "NCHAN MSG:" fmt, ##args)
7
8 #define MSG_REFCOUNT_INVALID -9000
9
10 #if NCHAN_MSG_RESERVE_DEBUG
nchan_msg_reserve_debug(nchan_msg_t * msg,char * lbl)11 static void nchan_msg_reserve_debug(nchan_msg_t *msg, char *lbl) {
12 msg_rsv_dbg_t *rsv;
13 int shared = msg->storage == NCHAN_MSG_SHARED;
14
15 if(shared)
16 shmtx_lock(nchan_store_memory_shmem);
17
18 if(shared)
19 rsv=shm_locked_calloc(nchan_store_memory_shmem, sizeof(*rsv) + ngx_strlen(lbl) + 1, "msgdebug");
20 else
21 rsv=ngx_calloc(sizeof(*rsv) + ngx_strlen(lbl) + 1, ngx_cycle->log);
22
23 rsv->lbl = (char *)(&rsv[1]);
24 ngx_memcpy(rsv->lbl, lbl, ngx_strlen(lbl));
25 if(msg->rsv == NULL) {
26 msg->rsv = rsv;
27 rsv->prev = NULL;
28 rsv->next = NULL;
29 }
30 else {
31 msg->rsv->prev = rsv;
32 rsv->next = msg->rsv;
33 rsv->prev = NULL;
34 msg->rsv = rsv;
35 }
36
37 if(shared)
38 shmtx_unlock(nchan_store_memory_shmem);
39 }
40
nchan_msg_release_debug(nchan_msg_t * msg,char * lbl)41 static void nchan_msg_release_debug(nchan_msg_t *msg, char *lbl) {
42 msg_rsv_dbg_t *cur, *prev, *next;
43 size_t sz = ngx_strlen(lbl);
44 ngx_int_t rsv_found=0;
45 int shared = msg->storage == NCHAN_MSG_SHARED;
46
47 if(shared)
48 shmtx_lock(nchan_store_memory_shmem);
49
50 assert(msg->refcount > 0);
51 for(cur = msg->rsv; cur != NULL; cur = cur->next) {
52 if(ngx_memcmp(lbl, cur->lbl, sz) == 0) {
53 prev = cur->prev;
54 next = cur->next;
55 if(prev) {
56 prev->next = next;
57 }
58 if(next) {
59 next->prev = prev;
60 }
61 if(cur == msg->rsv) {
62 msg->rsv = next;
63 }
64
65 if(shared)
66 shm_locked_free(nchan_store_memory_shmem, cur);
67 else
68 ngx_free(cur);
69
70 rsv_found = 1;
71 break;
72 }
73 }
74 assert(rsv_found);
75 if(shared)
76 shmtx_unlock(nchan_store_memory_shmem);
77 }
78 #endif
79
msg_refcount_valid(nchan_msg_t * msg)80 int msg_refcount_valid(nchan_msg_t *msg) {
81 return msg->refcount >= 0;
82 }
83
msg_refcount_invalidate_if_zero(nchan_msg_t * msg)84 int msg_refcount_invalidate_if_zero(nchan_msg_t *msg) {
85 return ngx_atomic_cmp_set((ngx_atomic_uint_t *)&msg->refcount, 0, MSG_REFCOUNT_INVALID);
86 }
msg_refcount_invalidate(nchan_msg_t * msg)87 void msg_refcount_invalidate(nchan_msg_t *msg) {
88 msg->refcount = MSG_REFCOUNT_INVALID;
89 }
90
91
msg_reserve(nchan_msg_t * msg,char * lbl)92 ngx_int_t msg_reserve(nchan_msg_t *msg, char *lbl) {
93 if(msg->parent) {
94 assert(msg->storage != NCHAN_MSG_SHARED);
95 msg->refcount++;
96 #if NCHAN_MSG_RESERVE_DEBUG
97 nchan_msg_reserve_debug(msg, lbl);
98 #endif
99 return msg_reserve(msg->parent, lbl);
100 }
101 assert(!msg->parent);
102
103 ngx_atomic_fetch_add((ngx_atomic_uint_t *)&msg->refcount, 1);
104 assert(msg->refcount >= 0);
105 if(msg->refcount < 0) {
106 msg->refcount = MSG_REFCOUNT_INVALID;
107 return NGX_ERROR;
108 }
109 #if NCHAN_MSG_RESERVE_DEBUG
110 nchan_msg_reserve_debug(msg, lbl);
111 #endif
112
113 //DBG("msg %p reserved (%i) %s", msg, msg->refcount, lbl);
114 return NGX_OK;
115 }
116
msg_release(nchan_msg_t * msg,char * lbl)117 ngx_int_t msg_release(nchan_msg_t *msg, char *lbl) {
118 nchan_msg_t *parent = msg->parent;
119 if(parent) {
120 assert(msg->storage != NCHAN_MSG_SHARED);
121 #if NCHAN_MSG_RESERVE_DEBUG
122 nchan_msg_release_debug(msg, lbl);
123 #endif
124 msg->refcount--;
125 assert(msg->refcount >= 0);
126
127 if(msg->refcount == 0) {
128 switch(msg->storage) {
129 case NCHAN_MSG_POOL:
130 //free the id, the rest of the msg will be cleaned with the pool
131 nchan_free_msg_id(&msg->id);
132 break;
133
134 case NCHAN_MSG_HEAP:
135 nchan_free_msg_id(&msg->id);
136 ngx_free(msg);
137 break;
138
139 default:
140 break;
141 //do nothing for NCHAN_MSG_STACK. NCHAN_MSG_SHARED should never be seen here.
142 }
143 }
144 return msg_release(parent, lbl);
145 }
146 assert(!parent);
147
148 #if NCHAN_MSG_RESERVE_DEBUG
149 nchan_msg_release_debug(msg, lbl);
150 #endif
151 assert(msg->refcount > 0);
152 ngx_atomic_fetch_add((ngx_atomic_uint_t *)&msg->refcount, -1);
153 //DBG("msg %p released (%i) %s", msg, msg->refcount, lbl);
154 return NGX_OK;
155 }
156
157
nchan_msgid_tagcount_match(nchan_msg_id_t * id,int count)158 int nchan_msgid_tagcount_match(nchan_msg_id_t *id, int count) {
159 switch(id->time) {
160 case NCHAN_OLDEST_MSGID_TIME:
161 case NCHAN_NEWEST_MSGID_TIME:
162 case NCHAN_NTH_MSGID_TIME:
163 if(id->tagcount == 1 && id->tagactive == 0)
164 return 1;
165 break;
166 default:
167 if(id->tagcount == count && id->tagactive >= 0 && id->tagactive < count)
168 return 1;
169 break;
170 }
171 return 0;
172 }
173
nchan_expand_msg_id_multi_tag(nchan_msg_id_t * id,uint8_t in_n,uint8_t out_n,int16_t fill)174 void nchan_expand_msg_id_multi_tag(nchan_msg_id_t *id, uint8_t in_n, uint8_t out_n, int16_t fill) {
175 int16_t v, n = id->tagcount;
176 int16_t *tags = n <= NCHAN_FIXED_MULTITAG_MAX ? id->tag.fixed : id->tag.allocd;
177 uint8_t i;
178 assert(n > in_n && n > out_n);
179 v = tags[in_n];
180
181 for(i=0; i < n; i++) {
182 tags[i] = (i == out_n) ? v : fill;
183 }
184 }
185
nchan_expand_tiny_msgid(nchan_msg_tiny_id_t * tinyid,nchan_msg_id_t * id)186 void nchan_expand_tiny_msgid(nchan_msg_tiny_id_t *tinyid, nchan_msg_id_t *id) {
187 id->time = tinyid->time;
188 id->tag.fixed[0]=tinyid->tag;
189 id->tagcount = 1;
190 id->tagactive = 1;
191 }
192
nchan_shrink_normal_msgid(nchan_msg_id_t * id,nchan_msg_tiny_id_t * tinyid)193 void nchan_shrink_normal_msgid(nchan_msg_id_t *id, nchan_msg_tiny_id_t *tinyid) {
194 assert(id->tagcount <= 1);
195 tinyid->time = id->time;
196 tinyid->tag = id->tag.fixed[0];
197 }
198
nchan_copy_new_msg_id(nchan_msg_id_t * dst,nchan_msg_id_t * src)199 ngx_int_t nchan_copy_new_msg_id(nchan_msg_id_t *dst, nchan_msg_id_t *src) {
200 ngx_memcpy(dst, src, sizeof(*src));
201 if(src->tagcount > NCHAN_FIXED_MULTITAG_MAX) {
202 size_t sz = sizeof(*src->tag.allocd) * src->tagcount;
203 if((dst->tag.allocd = ngx_alloc(sz, ngx_cycle->log)) == NULL) {
204 return NGX_ERROR;
205 }
206 ngx_memcpy(dst->tag.allocd, src->tag.allocd, sz);
207 }
208 return NGX_OK;
209 }
nchan_copy_msg_id(nchan_msg_id_t * dst,nchan_msg_id_t * src,int16_t * largetags)210 ngx_int_t nchan_copy_msg_id(nchan_msg_id_t *dst, nchan_msg_id_t *src, int16_t *largetags) {
211 uint16_t dst_n = dst->tagcount, src_n = src->tagcount;
212 dst->time = src->time;
213
214 if(dst_n > NCHAN_FIXED_MULTITAG_MAX && dst_n != src_n) {
215 ngx_free(dst->tag.allocd);
216 dst_n = NCHAN_FIXED_MULTITAG_MAX;
217 }
218
219 dst->tagcount = src->tagcount;
220 dst->tagactive = src->tagactive;
221
222 if(src_n <= NCHAN_FIXED_MULTITAG_MAX) {
223 dst->tag = src->tag;
224 }
225 else {
226 if(dst_n != src_n) {
227 if(!largetags) {
228 if((largetags = ngx_alloc(sizeof(*largetags) * src_n, ngx_cycle->log)) == NULL) {
229 return NGX_ERROR;
230 }
231 }
232 dst->tag.allocd = largetags;
233 }
234
235 ngx_memcpy(dst->tag.allocd, src->tag.allocd, sizeof(*src->tag.allocd) * src_n);
236 }
237 return NGX_OK;
238 }
239
nchan_free_msg_id(nchan_msg_id_t * id)240 ngx_int_t nchan_free_msg_id(nchan_msg_id_t *id) {
241 if(id->tagcount > NCHAN_FIXED_MULTITAG_MAX) {
242 ngx_free(id->tag.allocd);
243 id->tag.allocd = NULL;
244 }
245 return NGX_OK;
246 }
247
verify_msg_id(nchan_msg_id_t * id1,nchan_msg_id_t * id2,nchan_msg_id_t * msgid,char ** err)248 static ngx_int_t verify_msg_id(nchan_msg_id_t *id1, nchan_msg_id_t *id2, nchan_msg_id_t *msgid, char **err) {
249 int16_t *tags1 = id1->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? id1->tag.fixed : id1->tag.allocd;
250 int16_t *tags2 = id2->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? id2->tag.fixed : id2->tag.allocd;
251 if(id1->time > 0 && id2->time > 0) {
252 if(id1->time != id2->time) {
253 //is this a missed message, or just a multi msg?
254
255 if(id2->tagcount > 1) {
256 int i = -1, j, max = id2->tagcount;
257 int16_t *msgidtags = msgid->tagcount <= NCHAN_FIXED_MULTITAG_MAX ? msgid->tag.fixed : msgid->tag.allocd;
258
259 for(j=0; j < max; j++) {
260 if(tags2[j] != -1) {
261 if( i != -1) {
262 *err = "more than one tag set to something besides -1. that means this isn't a single channel's forwarded multi msg";
263 return NGX_ERROR;
264 }
265 else {
266 i = j;
267 }
268 }
269 }
270 if(msgidtags[i] != 0) {
271 *err = "only the first message in a given second is ok. anything else means a missed message.";
272 return NGX_ERROR;
273 }
274 //ok, it's just the first-per-second message of a channel from a multi-channel
275 //this is a rather convoluted description... but basically this is ok.
276 return NGX_OK;
277 }
278 else {
279 *err = "previous message id times don't match";
280 return NGX_ERROR;
281 }
282 }
283
284 if(id1->tagcount == 1) {
285 if(tags1[0] != tags2[0]){
286 *err = "previous message id tags don't match";
287 return NGX_ERROR;
288 }
289 }
290 else {
291 int i, max = id1->tagcount;
292 for(i=0; i < max; i++) {
293 if(tags2[i] != -1 && tags1[i] != tags2[i]) {
294 *err = "previous message multitag mismatch";
295 return NGX_ERROR;
296 }
297 }
298 }
299 }
300 return NGX_OK;
301 }
302
nchan_update_multi_msgid(nchan_msg_id_t * oldid,nchan_msg_id_t * newid,int16_t * largetags)303 void nchan_update_multi_msgid(nchan_msg_id_t *oldid, nchan_msg_id_t *newid, int16_t *largetags) {
304 if(newid->tagcount == 1) {
305 //nice and simple
306 *oldid = *newid;
307 }
308 else {
309 //DBG("======= updating multi_msgid ======");
310 //DBG("======= old: %V", msgid_to_str(oldid));
311 //DBG("======= new: %V", msgid_to_str(newid));
312 uint16_t newcount = newid->tagcount, oldcount = oldid->tagcount;
313 if(newcount > NCHAN_FIXED_MULTITAG_MAX && oldcount < newcount) {
314 int16_t *oldtags, *old_largetags = NULL;
315 int i;
316 size_t sz = sizeof(*oldid->tag.allocd) * newcount;
317 if(oldcount > NCHAN_FIXED_MULTITAG_MAX) {
318 old_largetags = oldid->tag.allocd;
319 oldtags = old_largetags;
320 }
321 else {
322 oldtags = oldid->tag.fixed;
323 }
324 if(largetags == NULL) {
325 largetags = ngx_alloc(sz, ngx_cycle->log);
326 }
327 oldid->tag.allocd = largetags;
328 for(i=0; i < newcount; i++) {
329 oldid->tag.allocd[i] = (i < oldcount) ? oldtags[i] : -1;
330 }
331 if(old_largetags) {
332 ngx_free(old_largetags);
333 }
334 oldid->tagcount = newcount;
335 }
336
337 if(oldid->time != newid->time) {
338 nchan_copy_msg_id(oldid, newid, NULL);
339 }
340 else {
341 int i, max = newcount;
342 int16_t *oldtags = oldcount <= NCHAN_FIXED_MULTITAG_MAX ? oldid->tag.fixed : oldid->tag.allocd;
343 int16_t *newtags = oldcount <= NCHAN_FIXED_MULTITAG_MAX ? newid->tag.fixed : newid->tag.allocd;
344
345 assert(max == oldcount);
346
347 for(i=0; i< max; i++) {
348
349 //DEBUG CHECK -- REMOVE BEFORE RELEASE
350 if(newid->tagactive == i && newtags[i] != -1 && oldtags[i] != -1) {
351 assert(newtags[i] > oldtags[i]);
352 }
353
354
355 if (newtags[i] != -1) {
356 oldtags[i] = newtags[i];
357 }
358 }
359 oldid->tagactive = newid->tagactive;
360 }
361 //DBG("=== updated: %V", msgid_to_str(oldid));
362 }
363 }
364
update_subscriber_last_msg_id(subscriber_t * sub,nchan_msg_t * msg)365 ngx_int_t update_subscriber_last_msg_id(subscriber_t *sub, nchan_msg_t *msg) {
366 if(msg) {
367 char *err, *huh;
368 if(verify_msg_id(&sub->last_msgid, &msg->prev_id, &msg->id, &err) == NGX_ERROR) {
369 struct timeval tv;
370 time_t time;
371 int ttl = msg->expires - msg->id.time;
372 ngx_gettimeofday(&tv);
373 time = tv.tv_sec;
374
375 if(sub->last_msgid.time + ttl <= time) {
376 huh = "The message probably expired.";
377 }
378 else {
379 huh = "Try increasing the message buffer length.";
380 }
381
382 if(sub->type == INTERNAL) {
383 nchan_log_warning("Missed message for internal %V subscriber: %s. %s", sub->name, err, huh);
384 }
385 else {
386 nchan_log_request_warning(sub->request, "Missed message for %V subscriber: %s. %s", sub->name, err, huh);
387 }
388 }
389
390 nchan_update_multi_msgid(&sub->last_msgid, &msg->id, NULL);
391 }
392
393 return NGX_OK;
394 }
395
396
397
398
399
400
nchan_parse_msg_tag(u_char * first,u_char * last,nchan_msg_id_t * mid,ngx_int_t expected_tag_count)401 static ngx_int_t nchan_parse_msg_tag(u_char *first, u_char *last, nchan_msg_id_t *mid, ngx_int_t expected_tag_count) {
402 u_char *cur = first;
403 u_char c;
404 int16_t i = 0;
405 int8_t sign = 1;
406 int16_t val = 0;
407 static int16_t tags[NCHAN_MULTITAG_MAX];
408 int brace_open_total = 0;
409 int brace_close_total = 0;
410 int tagactive = NGX_ERROR;
411
412 while(first != NULL && last != NULL && cur <= last && i < NCHAN_MULTITAG_MAX) {
413 if(cur == last) {
414 tags[i]=(val == 0 && sign == -1) ? -1 : val * sign; //shorthand "-" for "-1";
415 i++;
416 break;
417 }
418
419 c = *cur;
420 if(c == '-') {
421 sign = -1;
422 }
423 else if (c >= '0' && c <= '9') {
424 val = 10 * val + (c - '0');
425 }
426 else if (c == '[') {
427 if(++brace_open_total > 1) {
428 return NGX_ERROR;
429 }
430 tagactive = i;
431 }
432 else if (c == ']') {
433 if(++brace_close_total > 1 || brace_open_total - brace_close_total != 0) {
434 return NGX_ERROR;
435 }
436 }
437 else if (c == ',') {
438 tags[i]=(val == 0 && sign == -1) ? -1 : val * sign; //shorthand "-" for "-1"
439 sign=1;
440 val=0;
441 i++;
442 }
443 cur++;
444 }
445
446 if (i == 0) {
447 mid->tagactive = -1; // No tags, so no active tag
448 }
449 if(tagactive == NGX_ERROR) {
450 if(i==1) {
451 //single message tag, so it's active by defaiult
452 mid->tagactive = 0;
453 }
454 else {
455 //got a multitag with no tag braced as [active]. That's invalid.
456 return NGX_ERROR;
457 }
458 }
459 else {
460 mid->tagactive = tagactive;
461 }
462
463 // We fill the rest of the tag needed with value '-1'
464 while (i < expected_tag_count) {
465 tags[i] = -1;
466 i++;
467 }
468
469 mid->tagcount = i;
470
471 if(i <= NCHAN_FIXED_MULTITAG_MAX) {
472 ngx_memcpy(mid->tag.fixed, tags, sizeof(mid->tag.fixed));
473 }
474 else {
475 mid->tag.allocd = tags;
476 }
477 return NGX_OK;
478 }
479
nchan_extract_from_multi_msgid(nchan_msg_id_t * src,uint16_t n,nchan_msg_id_t * dst)480 ngx_int_t nchan_extract_from_multi_msgid(nchan_msg_id_t *src, uint16_t n, nchan_msg_id_t *dst) {
481 uint8_t count = src->tagcount;
482 int16_t *tags;
483
484 if(src->time == NCHAN_OLDEST_MSGID_TIME || src->time == NCHAN_NEWEST_MSGID_TIME) {
485 dst->time = src->time;
486 dst->tag.fixed[0] = 0;
487 dst->tagcount = 1;
488 dst->tagactive = 0;
489 return NGX_OK;
490 }
491 else if(src->time == NCHAN_NTH_MSGID_TIME) {
492 dst->time = src->time;
493 dst->tag.fixed[0] = src->tag.fixed[0];
494 dst->tagcount = 1;
495 dst->tagactive = 0;
496 return NGX_OK;
497 }
498
499 if(n >= count) {
500 ERR("can't extract msgid %i from multi-msg of count %i", n, count);
501 return NGX_ERROR;
502 }
503 tags = (count <= NCHAN_FIXED_MULTITAG_MAX) ? src->tag.fixed : src->tag.allocd;
504
505 dst->time = src->time;
506 if(tags[n] == -1) {
507 dst->time --;
508 dst->tag.fixed[0] = 32767; //eeeeeh this is bad. but it's good enough.
509 }
510 else {
511 dst->tag.fixed[0] = tags[n];
512 }
513 dst->tagcount = 1;
514 dst->tagactive = 0;
515
516 return NGX_OK;
517 }
518
nchan_subscriber_get_etag(ngx_http_request_t * r)519 static ngx_str_t *nchan_subscriber_get_etag(ngx_http_request_t * r) {
520 #if nginx_version >= 1008000
521 return r->headers_in.if_none_match ? &r->headers_in.if_none_match->value : NULL;
522 #else
523 ngx_uint_t i;
524 ngx_list_part_t *part = &r->headers_in.headers.part;
525 ngx_table_elt_t *header= part->elts;
526 for (i = 0; ; i++) {
527 if (i >= part->nelts) {
528 if (part->next == NULL) {
529 break;
530 }
531 part = part->next;
532 header = part->elts;
533 i = 0;
534 }
535 if (header[i].key.len == NCHAN_HEADER_IF_NONE_MATCH.len
536 && ngx_strncasecmp(header[i].key.data, NCHAN_HEADER_IF_NONE_MATCH.data, header[i].key.len) == 0) {
537 return &header[i].value;
538 }
539 }
540 return NULL;
541 #endif
542 }
543
nchan_parse_compound_msgid(nchan_msg_id_t * id,ngx_str_t * str,ngx_int_t expected_tag_count)544 ngx_int_t nchan_parse_compound_msgid(nchan_msg_id_t *id, ngx_str_t *str, ngx_int_t expected_tag_count) {
545 //parse url-unescaped compound msgid
546 u_char *split, *last;
547 ngx_int_t time;
548 uint8_t len;
549 //"<msg_time>:<msg_tag>"
550 last = str->data + str->len;
551 if((split = ngx_strlchr(str->data, last, ':')) != NULL) {
552 len = 1;
553 }
554 else {
555 len = 0; //placate dumb GCC warning
556 }
557 if(split) {
558 time = ngx_atoi(str->data, split - str->data);
559 split += len;
560 if(time != NGX_ERROR) {
561 id->time = time;
562 return nchan_parse_msg_tag(split, last, id, expected_tag_count);
563 }
564 else {
565 return NGX_ERROR;
566 }
567 }
568 return NGX_DECLINED;
569 }
570
571
572
set_default_id(nchan_loc_conf_t * cf,nchan_msg_id_t * id)573 static ngx_int_t set_default_id(nchan_loc_conf_t *cf, nchan_msg_id_t *id) {
574 static nchan_msg_id_t nth_msg_id = NCHAN_NTH_MSGID;
575 static nchan_msg_id_t oldest_msg_id = NCHAN_OLDEST_MSGID;
576 static nchan_msg_id_t newest_msg_id = NCHAN_NEWEST_MSGID;
577 switch(cf->subscriber_first_message) {
578 case 1:
579 *id = oldest_msg_id;
580 break;
581 case 0:
582 *id = newest_msg_id;
583 break;
584 default:
585 *id = nth_msg_id;
586 id->tag.fixed[0] = cf->subscriber_first_message;
587 break;
588 }
589 return NGX_OK;
590 }
591
nchan_subscriber_get_msg_id(ngx_http_request_t * r)592 nchan_msg_id_t *nchan_subscriber_get_msg_id(ngx_http_request_t *r) {
593 static nchan_msg_id_t id = NCHAN_ZERO_MSGID;
594
595 ngx_str_t *if_none_match;
596 nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
597 nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
598 int i;
599 ngx_int_t rc;
600
601 if_none_match = nchan_subscriber_get_etag(r);
602
603 if(!cf->msg_in_etag_only && r->headers_in.if_modified_since != NULL) {
604 id.time=ngx_http_parse_time(r->headers_in.if_modified_since->value.data, r->headers_in.if_modified_since->value.len);
605
606 if(id.time <= 0) { //anything before 1-1-1970 is reserved and treated as no msgid provided
607 set_default_id(cf, &id);
608 return &id;
609 }
610
611 u_char *first = NULL, *last = NULL;
612 if(if_none_match != NULL) {
613 first = if_none_match->data;
614 last = if_none_match->data + if_none_match->len;
615 }
616
617 if(nchan_parse_msg_tag(first, last, &id, ctx->channel_id_count) == NGX_ERROR) {
618 return NULL;
619 }
620
621 return &id;
622 }
623 else if((cf->msg_in_etag_only || r->headers_in.if_modified_since == NULL) && if_none_match) {
624 rc = nchan_parse_compound_msgid(&id, if_none_match, ctx->channel_id_count);
625 if(rc == NGX_OK) {
626 return &id;
627 }
628 else if(rc == NGX_ERROR) {
629 return NULL;
630 }
631 }
632 else {
633 nchan_complex_value_arr_t *alt_msgid_cv_arr = &cf->last_message_id;
634 u_char buf[128];
635 ngx_str_t str;
636 int n = alt_msgid_cv_arr->n;
637 ngx_int_t rc2;
638
639 str.len = 0;
640 str.data = buf;
641
642 for(i=0; i < n; i++) {
643 rc = ngx_http_complex_value_noalloc(r, alt_msgid_cv_arr->cv[i], &str, 128);
644 if(str.len > 0 && rc == NGX_OK) {
645 rc2 = nchan_parse_compound_msgid(&id, nchan_urldecode_str(r, &str), ctx->channel_id_count);
646 if(rc2 == NGX_OK) {
647 return &id;
648 }
649 else if(rc2 == NGX_ERROR) {
650 return NULL;
651 }
652 }
653 }
654 }
655
656 set_default_id(cf, &id);
657 return &id;
658 }
659
660
nchan_compare_msgid_tags(nchan_msg_id_t * id1,nchan_msg_id_t * id2)661 int8_t nchan_compare_msgid_tags(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
662 uint8_t active = id2->tagactive;
663 int16_t *tags1, *tags2;
664 int16_t t1, t2;
665
666 tags1 = (id1->tagcount <= NCHAN_FIXED_MULTITAG_MAX) ? id1->tag.fixed : id1->tag.allocd;
667 tags2 = (id2->tagcount <= NCHAN_FIXED_MULTITAG_MAX) ? id2->tag.fixed : id2->tag.allocd;
668
669 //debugstuff that prevents this function from getting inlined
670 assert(id1->time == id2->time);
671 int i, nonnegs = 0;
672 for (i=0; i < id2->tagcount; i++) {
673 if(tags2[i] >= 0) nonnegs++;
674 }
675 assert(nonnegs == 1);
676
677 if(id1->time == 0 && id2->time == 0) return 0; //always equal on zero-time
678
679 t1 = (active < id1->tagcount) ? tags1[active] : -1;
680 t2 = tags2[active];
681
682 //ERR("Comparing msgids: id1: %V --", msgid_to_str(id1));
683 //ERR(" --- id2: %V --", msgid_to_str(id2));
684
685 if(t1 < t2){
686 //ERR("id1 is smaller. -1");
687 return -1;
688 }
689 if(t1 > t2){
690 //ERR("id1 is larger. 1");
691 return 1;
692 }
693 //ERR("id1 equals id2. 0");
694 return 0;
695 }
696
nchan_compare_msgids(nchan_msg_id_t * id1,nchan_msg_id_t * id2)697 int8_t nchan_compare_msgids(nchan_msg_id_t *id1, nchan_msg_id_t *id2) {
698 assert(id1->tagcount == id2->tagcount);
699 if(id1->time < id2->time) {
700 return -1;
701 }
702 else if(id1->time > id2->time) {
703 return 1;
704 }
705 else {
706 assert(id1->tagcount == id2->tagcount);
707 if(id1->tagcount == 1) {
708 if(id1->tag.fixed[0] < id2->tag.fixed[0]) {
709 return -1;
710 }
711 else if(id1->tag.fixed[0] > id2->tag.fixed[0]) {
712 return 1;
713 }
714 else {
715 return 0;
716 }
717 }
718 else {
719 return nchan_compare_msgid_tags(id1, id2);
720 }
721 }
722 }
723
724
get_shared_msg(nchan_msg_t * msg)725 static nchan_msg_t *get_shared_msg(nchan_msg_t *msg) {
726 if(msg->storage == NCHAN_MSG_SHARED) {
727 assert(msg->parent == NULL);
728 return msg;
729 }
730 else {
731 assert(msg->parent);
732 assert(msg->parent->storage == NCHAN_MSG_SHARED);
733 return msg->parent;
734 }
735 }
736
msg_derive_init(nchan_msg_t * parent,nchan_msg_t * msg,nchan_msg_storage_t storage_type)737 static ngx_inline nchan_msg_t *msg_derive_init(nchan_msg_t *parent, nchan_msg_t *msg, nchan_msg_storage_t storage_type) {
738 nchan_msg_t *shared = get_shared_msg(parent);
739 if(!msg) { return NULL; }
740 *msg = *shared;
741 msg->id.tagcount=1;
742 msg->parent = shared;
743 msg->storage = storage_type;
744 #if NCHAN_MSG_RESERVE_DEBUG
745 msg->rsv = NULL;
746 #endif
747 msg->refcount = 0;
748 return msg;
749 }
750
nchan_msg_derive_alloc(nchan_msg_t * parent)751 nchan_msg_t *nchan_msg_derive_alloc(nchan_msg_t *parent) {
752 nchan_msg_t *msg = msg_derive_init(parent, ngx_alloc(sizeof(nchan_msg_t), ngx_cycle->log), NCHAN_MSG_HEAP);
753 if(!msg || nchan_copy_new_msg_id(&msg->id, &parent->id) != NGX_OK) {
754 ngx_free(msg);
755 return NULL;
756 }
757 return msg;
758 }
nchan_msg_derive_palloc(nchan_msg_t * parent,ngx_pool_t * pool)759 nchan_msg_t *nchan_msg_derive_palloc(nchan_msg_t *parent, ngx_pool_t *pool) {
760 nchan_msg_t *msg = msg_derive_init(parent, ngx_palloc(pool, sizeof(nchan_msg_t)), NCHAN_MSG_POOL);
761 if(!msg || nchan_copy_new_msg_id(&msg->id, &parent->id) != NGX_OK) {
762 return NULL;
763 }
764 return msg;
765 }
nchan_msg_derive_stack(nchan_msg_t * parent,nchan_msg_t * child,int16_t * largetags)766 nchan_msg_t *nchan_msg_derive_stack(nchan_msg_t *parent, nchan_msg_t *child, int16_t *largetags) {
767 nchan_msg_t *msg = msg_derive_init(parent, child, NCHAN_MSG_STACK);
768 if(!msg || nchan_copy_msg_id(&msg->id, &parent->id, largetags) != NGX_OK) {
769 return NULL;
770 }
771 return msg;
772 }
773
774