1
2 /*
3 * Copyright (C) Roman Arutyunyan
4 */
5
6
7 #include <ngx_config.h>
8 #include <ngx_core.h>
9 #include <ngx_event.h>
10 #include <nginx.h>
11 #include "ngx_rtmp.h"
12
13
14 static char *ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
15 static ngx_int_t ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
16 ngx_rtmp_listen_t *listen);
17 static char *ngx_rtmp_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports);
18 static ngx_int_t ngx_rtmp_add_addrs(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
19 ngx_rtmp_conf_addr_t *addr);
20 #if (NGX_HAVE_INET6)
21 static ngx_int_t ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
22 ngx_rtmp_conf_addr_t *addr);
23 #endif
24 static ngx_int_t ngx_rtmp_cmp_conf_addrs(const void *one, const void *two);
25 static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf,
26 ngx_rtmp_core_main_conf_t *cmcf);
27 static ngx_int_t ngx_rtmp_init_event_handlers(ngx_conf_t *cf,
28 ngx_rtmp_core_main_conf_t *cmcf);
29 static char * ngx_rtmp_merge_applications(ngx_conf_t *cf,
30 ngx_array_t *applications, void **app_conf, ngx_rtmp_module_t *module,
31 ngx_uint_t ctx_index);
32 static ngx_int_t ngx_rtmp_init_process(ngx_cycle_t *cycle);
33
34
35 #if (nginx_version >= 1007011)
36 ngx_queue_t ngx_rtmp_init_queue;
37 #elif (nginx_version >= 1007005)
38 ngx_thread_volatile ngx_queue_t ngx_rtmp_init_queue;
39 #else
40 ngx_thread_volatile ngx_event_t *ngx_rtmp_init_queue;
41 #endif
42
43
44 ngx_uint_t ngx_rtmp_max_module;
45
46
47 static ngx_command_t ngx_rtmp_commands[] = {
48
49 { ngx_string("rtmp"),
50 NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
51 ngx_rtmp_block,
52 0,
53 0,
54 NULL },
55
56 ngx_null_command
57 };
58
59
60 static ngx_core_module_t ngx_rtmp_module_ctx = {
61 ngx_string("rtmp"),
62 NULL,
63 NULL
64 };
65
66
67 ngx_module_t ngx_rtmp_module = {
68 NGX_MODULE_V1,
69 &ngx_rtmp_module_ctx, /* module context */
70 ngx_rtmp_commands, /* module directives */
71 NGX_CORE_MODULE, /* module type */
72 NULL, /* init master */
73 NULL, /* init module */
74 ngx_rtmp_init_process, /* init process */
75 NULL, /* init thread */
76 NULL, /* exit thread */
77 NULL, /* exit process */
78 NULL, /* exit master */
79 NGX_MODULE_V1_PADDING
80 };
81
82
83 static char *
ngx_rtmp_block(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)84 ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
85 {
86 char *rv;
87 ngx_uint_t i, m, mi, s;
88 ngx_conf_t pcf;
89 ngx_array_t ports;
90 ngx_module_t **modules;
91 ngx_rtmp_listen_t *listen;
92 ngx_rtmp_module_t *module;
93 ngx_rtmp_conf_ctx_t *ctx;
94 ngx_rtmp_core_srv_conf_t *cscf, **cscfp;
95 ngx_rtmp_core_main_conf_t *cmcf;
96
97 ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_conf_ctx_t));
98 if (ctx == NULL) {
99 return NGX_CONF_ERROR;
100 }
101
102 *(ngx_rtmp_conf_ctx_t **) conf = ctx;
103
104 /* count the number of the rtmp modules and set up their indices */
105 #if defined(nginx_version) && nginx_version >= 1009011
106 modules = cf->cycle->modules;
107 #else
108 modules = ngx_modules;
109 #endif
110 ngx_rtmp_max_module = 0;
111 for (m = 0; modules[m]; m++) {
112 if (modules[m]->type != NGX_RTMP_MODULE) {
113 continue;
114 }
115
116 modules[m]->ctx_index = ngx_rtmp_max_module++;
117 }
118
119
120 /* the rtmp main_conf context, it is the same in the all rtmp contexts */
121
122 ctx->main_conf = ngx_pcalloc(cf->pool,
123 sizeof(void *) * ngx_rtmp_max_module);
124 if (ctx->main_conf == NULL) {
125 return NGX_CONF_ERROR;
126 }
127
128
129 /*
130 * the rtmp null srv_conf context, it is used to merge
131 * the server{}s' srv_conf's
132 */
133
134 ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_rtmp_max_module);
135 if (ctx->srv_conf == NULL) {
136 return NGX_CONF_ERROR;
137 }
138
139
140 /*
141 * the rtmp null app_conf context, it is used to merge
142 * the server{}s' app_conf's
143 */
144
145 ctx->app_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_rtmp_max_module);
146 if (ctx->app_conf == NULL) {
147 return NGX_CONF_ERROR;
148 }
149
150
151 /*
152 * create the main_conf's, the null srv_conf's, and the null app_conf's
153 * of the all rtmp modules
154 */
155
156 for (m = 0; modules[m]; m++) {
157 if (modules[m]->type != NGX_RTMP_MODULE) {
158 continue;
159 }
160
161 module = modules[m]->ctx;
162 mi = modules[m]->ctx_index;
163
164 if (module->create_main_conf) {
165 ctx->main_conf[mi] = module->create_main_conf(cf);
166 if (ctx->main_conf[mi] == NULL) {
167 return NGX_CONF_ERROR;
168 }
169 }
170
171 if (module->create_srv_conf) {
172 ctx->srv_conf[mi] = module->create_srv_conf(cf);
173 if (ctx->srv_conf[mi] == NULL) {
174 return NGX_CONF_ERROR;
175 }
176 }
177
178 if (module->create_app_conf) {
179 ctx->app_conf[mi] = module->create_app_conf(cf);
180 if (ctx->app_conf[mi] == NULL) {
181 return NGX_CONF_ERROR;
182 }
183 }
184 }
185
186 pcf = *cf;
187 cf->ctx = ctx;
188
189 for (m = 0; modules[m]; m++) {
190 if (modules[m]->type != NGX_RTMP_MODULE) {
191 continue;
192 }
193
194 module = modules[m]->ctx;
195
196 if (module->preconfiguration) {
197 if (module->preconfiguration(cf) != NGX_OK) {
198 return NGX_CONF_ERROR;
199 }
200 }
201 }
202
203 /* parse inside the rtmp{} block */
204
205 cf->module_type = NGX_RTMP_MODULE;
206 cf->cmd_type = NGX_RTMP_MAIN_CONF;
207 rv = ngx_conf_parse(cf, NULL);
208
209 if (rv != NGX_CONF_OK) {
210 *cf = pcf;
211 return rv;
212 }
213
214
215 /* init rtmp{} main_conf's, merge the server{}s' srv_conf's */
216
217 cmcf = ctx->main_conf[ngx_rtmp_core_module.ctx_index];
218 cscfp = cmcf->servers.elts;
219
220 for (m = 0; modules[m]; m++) {
221 if (modules[m]->type != NGX_RTMP_MODULE) {
222 continue;
223 }
224
225 module = modules[m]->ctx;
226 mi = modules[m]->ctx_index;
227
228 /* init rtmp{} main_conf's */
229
230 cf->ctx = ctx;
231
232 if (module->init_main_conf) {
233 rv = module->init_main_conf(cf, ctx->main_conf[mi]);
234 if (rv != NGX_CONF_OK) {
235 *cf = pcf;
236 return rv;
237 }
238 }
239
240 for (s = 0; s < cmcf->servers.nelts; s++) {
241
242 /* merge the server{}s' srv_conf's */
243
244 cf->ctx = cscfp[s]->ctx;
245
246 if (module->merge_srv_conf) {
247 rv = module->merge_srv_conf(cf,
248 ctx->srv_conf[mi],
249 cscfp[s]->ctx->srv_conf[mi]);
250 if (rv != NGX_CONF_OK) {
251 *cf = pcf;
252 return rv;
253 }
254 }
255
256 if (module->merge_app_conf) {
257
258 /* merge the server{}'s app_conf */
259
260 /*ctx->app_conf = cscfp[s]->ctx->loc_conf;*/
261
262 rv = module->merge_app_conf(cf,
263 ctx->app_conf[mi],
264 cscfp[s]->ctx->app_conf[mi]);
265 if (rv != NGX_CONF_OK) {
266 *cf = pcf;
267 return rv;
268 }
269
270 /* merge the applications{}' app_conf's */
271
272 cscf = cscfp[s]->ctx->srv_conf[ngx_rtmp_core_module.ctx_index];
273
274 rv = ngx_rtmp_merge_applications(cf, &cscf->applications,
275 cscfp[s]->ctx->app_conf,
276 module, mi);
277 if (rv != NGX_CONF_OK) {
278 *cf = pcf;
279 return rv;
280 }
281 }
282
283 }
284 }
285
286
287 if (ngx_rtmp_init_events(cf, cmcf) != NGX_OK) {
288 return NGX_CONF_ERROR;
289 }
290
291 for (m = 0; modules[m]; m++) {
292 if (modules[m]->type != NGX_RTMP_MODULE) {
293 continue;
294 }
295
296 module = modules[m]->ctx;
297
298 if (module->postconfiguration) {
299 if (module->postconfiguration(cf) != NGX_OK) {
300 return NGX_CONF_ERROR;
301 }
302 }
303 }
304
305 *cf = pcf;
306
307 if (ngx_rtmp_init_event_handlers(cf, cmcf) != NGX_OK) {
308 return NGX_CONF_ERROR;
309 }
310
311 if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_rtmp_conf_port_t))
312 != NGX_OK)
313 {
314 return NGX_CONF_ERROR;
315 }
316
317 listen = cmcf->listen.elts;
318
319 for (i = 0; i < cmcf->listen.nelts; i++) {
320 if (ngx_rtmp_add_ports(cf, &ports, &listen[i]) != NGX_OK) {
321 return NGX_CONF_ERROR;
322 }
323 }
324
325 return ngx_rtmp_optimize_servers(cf, &ports);
326 }
327
328
329 static char *
ngx_rtmp_merge_applications(ngx_conf_t * cf,ngx_array_t * applications,void ** app_conf,ngx_rtmp_module_t * module,ngx_uint_t ctx_index)330 ngx_rtmp_merge_applications(ngx_conf_t *cf, ngx_array_t *applications,
331 void **app_conf, ngx_rtmp_module_t *module, ngx_uint_t ctx_index)
332 {
333 char *rv;
334 ngx_rtmp_conf_ctx_t *ctx, saved;
335 ngx_rtmp_core_app_conf_t **cacfp;
336 ngx_uint_t n;
337 ngx_rtmp_core_app_conf_t *cacf;
338
339 if (applications == NULL) {
340 return NGX_CONF_OK;
341 }
342
343 ctx = (ngx_rtmp_conf_ctx_t *) cf->ctx;
344 saved = *ctx;
345
346 cacfp = applications->elts;
347 for (n = 0; n < applications->nelts; ++n, ++cacfp) {
348
349 ctx->app_conf = (*cacfp)->app_conf;
350
351 rv = module->merge_app_conf(cf, app_conf[ctx_index],
352 (*cacfp)->app_conf[ctx_index]);
353 if (rv != NGX_CONF_OK) {
354 return rv;
355 }
356
357 cacf = (*cacfp)->app_conf[ngx_rtmp_core_module.ctx_index];
358 rv = ngx_rtmp_merge_applications(cf, &cacf->applications,
359 (*cacfp)->app_conf,
360 module, ctx_index);
361 if (rv != NGX_CONF_OK) {
362 return rv;
363 }
364 }
365
366 *ctx = saved;
367
368 return NGX_CONF_OK;
369 }
370
371
372 static ngx_int_t
ngx_rtmp_init_events(ngx_conf_t * cf,ngx_rtmp_core_main_conf_t * cmcf)373 ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
374 {
375 size_t n;
376
377 for(n = 0; n < NGX_RTMP_MAX_EVENT; ++n) {
378 if (ngx_array_init(&cmcf->events[n], cf->pool, 1,
379 sizeof(ngx_rtmp_handler_pt)) != NGX_OK)
380 {
381 return NGX_ERROR;
382 }
383 }
384
385 if (ngx_array_init(&cmcf->amf, cf->pool, 1,
386 sizeof(ngx_rtmp_amf_handler_t)) != NGX_OK)
387 {
388 return NGX_ERROR;
389 }
390
391 return NGX_OK;
392 }
393
394
395 static ngx_int_t
ngx_rtmp_init_event_handlers(ngx_conf_t * cf,ngx_rtmp_core_main_conf_t * cmcf)396 ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
397 {
398 ngx_hash_init_t calls_hash;
399 ngx_rtmp_handler_pt *eh;
400 ngx_rtmp_amf_handler_t *h;
401 ngx_hash_key_t *ha;
402 size_t n, m;
403
404 static size_t pm_events[] = {
405 NGX_RTMP_MSG_CHUNK_SIZE,
406 NGX_RTMP_MSG_ABORT,
407 NGX_RTMP_MSG_ACK,
408 NGX_RTMP_MSG_ACK_SIZE,
409 NGX_RTMP_MSG_BANDWIDTH
410 };
411
412 static size_t amf_events[] = {
413 NGX_RTMP_MSG_AMF_CMD,
414 NGX_RTMP_MSG_AMF_META,
415 NGX_RTMP_MSG_AMF_SHARED,
416 NGX_RTMP_MSG_AMF3_CMD,
417 NGX_RTMP_MSG_AMF3_META,
418 NGX_RTMP_MSG_AMF3_SHARED
419 };
420
421 /* init standard protocol events */
422 for(n = 0; n < sizeof(pm_events) / sizeof(pm_events[0]); ++n) {
423 eh = ngx_array_push(&cmcf->events[pm_events[n]]);
424 *eh = ngx_rtmp_protocol_message_handler;
425 }
426
427 /* init amf events */
428 for(n = 0; n < sizeof(amf_events) / sizeof(amf_events[0]); ++n) {
429 eh = ngx_array_push(&cmcf->events[amf_events[n]]);
430 *eh = ngx_rtmp_amf_message_handler;
431 }
432
433 /* init user protocol events */
434 eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]);
435 *eh = ngx_rtmp_user_message_handler;
436
437 /* aggregate to audio/video map */
438 eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AGGREGATE]);
439 *eh = ngx_rtmp_aggregate_message_handler;
440
441 /* init amf callbacks */
442 ngx_array_init(&cmcf->amf_arrays, cf->pool, 1, sizeof(ngx_hash_key_t));
443
444 h = cmcf->amf.elts;
445 for(n = 0; n < cmcf->amf.nelts; ++n, ++h) {
446 ha = cmcf->amf_arrays.elts;
447 for(m = 0; m < cmcf->amf_arrays.nelts; ++m, ++ha) {
448 if (h->name.len == ha->key.len
449 && !ngx_strncmp(h->name.data, ha->key.data, ha->key.len))
450 {
451 break;
452 }
453 }
454 if (m == cmcf->amf_arrays.nelts) {
455 ha = ngx_array_push(&cmcf->amf_arrays);
456 ha->key = h->name;
457 ha->key_hash = ngx_hash_key_lc(ha->key.data, ha->key.len);
458 ha->value = ngx_array_create(cf->pool, 1,
459 sizeof(ngx_rtmp_handler_pt));
460 if (ha->value == NULL) {
461 return NGX_ERROR;
462 }
463 }
464
465 eh = ngx_array_push((ngx_array_t*)ha->value);
466 *eh = h->handler;
467 }
468
469 calls_hash.hash = &cmcf->amf_hash;
470 calls_hash.key = ngx_hash_key_lc;
471 calls_hash.max_size = 512;
472 calls_hash.bucket_size = ngx_cacheline_size;
473 calls_hash.name = "amf_hash";
474 calls_hash.pool = cf->pool;
475 calls_hash.temp_pool = NULL;
476
477 if (ngx_hash_init(&calls_hash, cmcf->amf_arrays.elts, cmcf->amf_arrays.nelts)
478 != NGX_OK)
479 {
480 return NGX_ERROR;
481 }
482
483 return NGX_OK;
484 }
485
486
487 static ngx_int_t
ngx_rtmp_add_ports(ngx_conf_t * cf,ngx_array_t * ports,ngx_rtmp_listen_t * listen)488 ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
489 ngx_rtmp_listen_t *listen)
490 {
491 in_port_t p;
492 ngx_uint_t i;
493 struct sockaddr *sa;
494 struct sockaddr_in *sin;
495 ngx_rtmp_conf_port_t *port;
496 ngx_rtmp_conf_addr_t *addr;
497 #if (NGX_HAVE_INET6)
498 struct sockaddr_in6 *sin6;
499 #endif
500
501 sa = (struct sockaddr *) &listen->sockaddr;
502
503 switch (sa->sa_family) {
504
505 #if (NGX_HAVE_INET6)
506 case AF_INET6:
507 sin6 = (struct sockaddr_in6 *) sa;
508 p = sin6->sin6_port;
509 break;
510 #endif
511
512 default: /* AF_INET */
513 sin = (struct sockaddr_in *) sa;
514 p = sin->sin_port;
515 break;
516 }
517
518 port = ports->elts;
519 for (i = 0; i < ports->nelts; i++) {
520 if (p == port[i].port && sa->sa_family == port[i].family) {
521
522 /* a port is already in the port list */
523
524 port = &port[i];
525 goto found;
526 }
527 }
528
529 /* add a port to the port list */
530
531 port = ngx_array_push(ports);
532 if (port == NULL) {
533 return NGX_ERROR;
534 }
535
536 port->family = sa->sa_family;
537 port->port = p;
538
539 if (ngx_array_init(&port->addrs, cf->temp_pool, 2,
540 sizeof(ngx_rtmp_conf_addr_t))
541 != NGX_OK)
542 {
543 return NGX_ERROR;
544 }
545
546 found:
547
548 addr = ngx_array_push(&port->addrs);
549 if (addr == NULL) {
550 return NGX_ERROR;
551 }
552
553 addr->sockaddr = (struct sockaddr *) &listen->sockaddr;
554 addr->socklen = listen->socklen;
555 addr->ctx = listen->ctx;
556 addr->bind = listen->bind;
557 addr->wildcard = listen->wildcard;
558 addr->so_keepalive = listen->so_keepalive;
559 addr->proxy_protocol = listen->proxy_protocol;
560 #if (NGX_HAVE_KEEPALIVE_TUNABLE)
561 addr->tcp_keepidle = listen->tcp_keepidle;
562 addr->tcp_keepintvl = listen->tcp_keepintvl;
563 addr->tcp_keepcnt = listen->tcp_keepcnt;
564 #endif
565 #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
566 addr->ipv6only = listen->ipv6only;
567 #endif
568
569 return NGX_OK;
570 }
571
572
573 static char *
ngx_rtmp_optimize_servers(ngx_conf_t * cf,ngx_array_t * ports)574 ngx_rtmp_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
575 {
576 ngx_uint_t i, p, last, bind_wildcard;
577 ngx_listening_t *ls;
578 ngx_rtmp_port_t *mport;
579 ngx_rtmp_conf_port_t *port;
580 ngx_rtmp_conf_addr_t *addr;
581
582 port = ports->elts;
583 for (p = 0; p < ports->nelts; p++) {
584
585 ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
586 sizeof(ngx_rtmp_conf_addr_t), ngx_rtmp_cmp_conf_addrs);
587
588 addr = port[p].addrs.elts;
589 last = port[p].addrs.nelts;
590
591 /*
592 * if there is the binding to the "*:port" then we need to bind()
593 * to the "*:port" only and ignore the other bindings
594 */
595
596 if (addr[last - 1].wildcard) {
597 addr[last - 1].bind = 1;
598 bind_wildcard = 1;
599
600 } else {
601 bind_wildcard = 0;
602 }
603
604 i = 0;
605
606 while (i < last) {
607
608 if (bind_wildcard && !addr[i].bind) {
609 i++;
610 continue;
611 }
612
613 ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen);
614 if (ls == NULL) {
615 return NGX_CONF_ERROR;
616 }
617
618 ls->addr_ntop = 1;
619 ls->handler = ngx_rtmp_init_connection;
620 ls->pool_size = 4096;
621
622 /* TODO: error_log directive */
623 ls->logp = &cf->cycle->new_log;
624 ls->log.data = &ls->addr_text;
625 ls->log.handler = ngx_accept_log_error;
626
627 ls->keepalive = addr[i].so_keepalive;
628 #if (NGX_HAVE_KEEPALIVE_TUNABLE)
629 ls->keepidle = addr[i].tcp_keepidle;
630 ls->keepintvl = addr[i].tcp_keepintvl;
631 ls->keepcnt = addr[i].tcp_keepcnt;
632 #endif
633
634 #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
635 ls->ipv6only = addr[i].ipv6only;
636 #endif
637
638 mport = ngx_palloc(cf->pool, sizeof(ngx_rtmp_port_t));
639 if (mport == NULL) {
640 return NGX_CONF_ERROR;
641 }
642
643 ls->servers = mport;
644
645 if (i == last - 1) {
646 mport->naddrs = last;
647
648 } else {
649 mport->naddrs = 1;
650 i = 0;
651 }
652
653 switch (ls->sockaddr->sa_family) {
654 #if (NGX_HAVE_INET6)
655 case AF_INET6:
656 if (ngx_rtmp_add_addrs6(cf, mport, addr) != NGX_OK) {
657 return NGX_CONF_ERROR;
658 }
659 break;
660 #endif
661 default: /* AF_INET */
662 if (ngx_rtmp_add_addrs(cf, mport, addr) != NGX_OK) {
663 return NGX_CONF_ERROR;
664 }
665 break;
666 }
667
668 addr++;
669 last--;
670 }
671 }
672
673 return NGX_CONF_OK;
674 }
675
676
677 static ngx_int_t
ngx_rtmp_add_addrs(ngx_conf_t * cf,ngx_rtmp_port_t * mport,ngx_rtmp_conf_addr_t * addr)678 ngx_rtmp_add_addrs(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
679 ngx_rtmp_conf_addr_t *addr)
680 {
681 u_char *p;
682 size_t len;
683 ngx_uint_t i;
684 ngx_rtmp_in_addr_t *addrs;
685 struct sockaddr_in *sin;
686 u_char buf[NGX_SOCKADDR_STRLEN];
687
688 mport->addrs = ngx_pcalloc(cf->pool,
689 mport->naddrs * sizeof(ngx_rtmp_in_addr_t));
690 if (mport->addrs == NULL) {
691 return NGX_ERROR;
692 }
693
694 addrs = mport->addrs;
695
696 for (i = 0; i < mport->naddrs; i++) {
697
698 sin = (struct sockaddr_in *) addr[i].sockaddr;
699 addrs[i].addr = sin->sin_addr.s_addr;
700
701 addrs[i].conf.ctx = addr[i].ctx;
702
703 len = ngx_sock_ntop(addr[i].sockaddr,
704 #if (nginx_version >= 1005003)
705 addr[i].socklen,
706 #endif
707 buf, NGX_SOCKADDR_STRLEN, 1);
708
709 p = ngx_pnalloc(cf->pool, len);
710 if (p == NULL) {
711 return NGX_ERROR;
712 }
713
714 ngx_memcpy(p, buf, len);
715
716 addrs[i].conf.addr_text.len = len;
717 addrs[i].conf.addr_text.data = p;
718 addrs[i].conf.proxy_protocol = addr->proxy_protocol;
719 }
720
721 return NGX_OK;
722 }
723
724
725 #if (NGX_HAVE_INET6)
726
727 static ngx_int_t
ngx_rtmp_add_addrs6(ngx_conf_t * cf,ngx_rtmp_port_t * mport,ngx_rtmp_conf_addr_t * addr)728 ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
729 ngx_rtmp_conf_addr_t *addr)
730 {
731 u_char *p;
732 size_t len;
733 ngx_uint_t i;
734 ngx_rtmp_in6_addr_t *addrs6;
735 struct sockaddr_in6 *sin6;
736 u_char buf[NGX_SOCKADDR_STRLEN];
737
738 mport->addrs = ngx_pcalloc(cf->pool,
739 mport->naddrs * sizeof(ngx_rtmp_in6_addr_t));
740 if (mport->addrs == NULL) {
741 return NGX_ERROR;
742 }
743
744 addrs6 = mport->addrs;
745
746 for (i = 0; i < mport->naddrs; i++) {
747
748 sin6 = (struct sockaddr_in6 *) addr[i].sockaddr;
749 addrs6[i].addr6 = sin6->sin6_addr;
750
751 addrs6[i].conf.ctx = addr[i].ctx;
752
753 len = ngx_sock_ntop(addr[i].sockaddr,
754 #if (nginx_version >= 1005003)
755 addr[i].socklen,
756 #endif
757 buf, NGX_SOCKADDR_STRLEN, 1);
758
759 p = ngx_pnalloc(cf->pool, len);
760 if (p == NULL) {
761 return NGX_ERROR;
762 }
763
764 ngx_memcpy(p, buf, len);
765
766 addrs6[i].conf.addr_text.len = len;
767 addrs6[i].conf.addr_text.data = p;
768 addrs6[i].conf.proxy_protocol = addr->proxy_protocol;
769 }
770
771 return NGX_OK;
772 }
773
774 #endif
775
776
777 static ngx_int_t
ngx_rtmp_cmp_conf_addrs(const void * one,const void * two)778 ngx_rtmp_cmp_conf_addrs(const void *one, const void *two)
779 {
780 ngx_rtmp_conf_addr_t *first, *second;
781
782 first = (ngx_rtmp_conf_addr_t *) one;
783 second = (ngx_rtmp_conf_addr_t *) two;
784
785 if (first->wildcard) {
786 /* a wildcard must be the last resort, shift it to the end */
787 return 1;
788 }
789
790 if (first->bind && !second->bind) {
791 /* shift explicit bind()ed addresses to the start */
792 return -1;
793 }
794
795 if (!first->bind && second->bind) {
796 /* shift explicit bind()ed addresses to the start */
797 return 1;
798 }
799
800 /* do not sort by default */
801
802 return 0;
803 }
804
805
806 ngx_int_t
ngx_rtmp_fire_event(ngx_rtmp_session_t * s,ngx_uint_t evt,ngx_rtmp_header_t * h,ngx_chain_t * in)807 ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt,
808 ngx_rtmp_header_t *h, ngx_chain_t *in)
809 {
810 ngx_rtmp_core_main_conf_t *cmcf;
811 ngx_array_t *ch;
812 ngx_rtmp_handler_pt *hh;
813 size_t n;
814
815 cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
816
817 ch = &cmcf->events[evt];
818 hh = ch->elts;
819 for(n = 0; n < ch->nelts; ++n, ++hh) {
820 if (*hh && (*hh)(s, h, in) != NGX_OK) {
821 return NGX_ERROR;
822 }
823 }
824 return NGX_OK;
825 }
826
827
828 void *
ngx_rtmp_rmemcpy(void * dst,const void * src,size_t n)829 ngx_rtmp_rmemcpy(void *dst, const void* src, size_t n)
830 {
831 u_char *d, *s;
832
833 d = dst;
834 s = (u_char*)src + n - 1;
835
836 while(s >= (u_char*)src) {
837 *d++ = *s--;
838 }
839
840 return dst;
841 }
842
843
844 static ngx_int_t
ngx_rtmp_init_process(ngx_cycle_t * cycle)845 ngx_rtmp_init_process(ngx_cycle_t *cycle)
846 {
847 #if (nginx_version >= 1007005)
848 ngx_queue_init((ngx_queue_t*) &ngx_rtmp_init_queue);
849 #endif
850 return NGX_OK;
851 }
852