1 
2 /*
3  * Copyright (C) Roman Arutyunyan
4  */
5 
6 
7 #include <ngx_config.h>
8 #include <ngx_core.h>
9 #include <ngx_event.h>
10 #include <nginx.h>
11 #include "ngx_rtmp.h"
12 
13 
14 static char *ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
15 static ngx_int_t ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
16     ngx_rtmp_listen_t *listen);
17 static char *ngx_rtmp_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports);
18 static ngx_int_t ngx_rtmp_add_addrs(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
19     ngx_rtmp_conf_addr_t *addr);
20 #if (NGX_HAVE_INET6)
21 static ngx_int_t ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
22     ngx_rtmp_conf_addr_t *addr);
23 #endif
24 static ngx_int_t ngx_rtmp_cmp_conf_addrs(const void *one, const void *two);
25 static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf,
26         ngx_rtmp_core_main_conf_t *cmcf);
27 static ngx_int_t ngx_rtmp_init_event_handlers(ngx_conf_t *cf,
28         ngx_rtmp_core_main_conf_t *cmcf);
29 static char * ngx_rtmp_merge_applications(ngx_conf_t *cf,
30         ngx_array_t *applications, void **app_conf, ngx_rtmp_module_t *module,
31         ngx_uint_t ctx_index);
32 static ngx_int_t ngx_rtmp_init_process(ngx_cycle_t *cycle);
33 
34 
35 #if (nginx_version >= 1007011)
36 ngx_queue_t                         ngx_rtmp_init_queue;
37 #elif (nginx_version >= 1007005)
38 ngx_thread_volatile ngx_queue_t     ngx_rtmp_init_queue;
39 #else
40 ngx_thread_volatile ngx_event_t    *ngx_rtmp_init_queue;
41 #endif
42 
43 
44 ngx_uint_t  ngx_rtmp_max_module;
45 
46 
47 static ngx_command_t  ngx_rtmp_commands[] = {
48 
49     { ngx_string("rtmp"),
50       NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
51       ngx_rtmp_block,
52       0,
53       0,
54       NULL },
55 
56       ngx_null_command
57 };
58 
59 
60 static ngx_core_module_t  ngx_rtmp_module_ctx = {
61     ngx_string("rtmp"),
62     NULL,
63     NULL
64 };
65 
66 
67 ngx_module_t  ngx_rtmp_module = {
68     NGX_MODULE_V1,
69     &ngx_rtmp_module_ctx,                  /* module context */
70     ngx_rtmp_commands,                     /* module directives */
71     NGX_CORE_MODULE,                       /* module type */
72     NULL,                                  /* init master */
73     NULL,                                  /* init module */
74     ngx_rtmp_init_process,                 /* init process */
75     NULL,                                  /* init thread */
76     NULL,                                  /* exit thread */
77     NULL,                                  /* exit process */
78     NULL,                                  /* exit master */
79     NGX_MODULE_V1_PADDING
80 };
81 
82 
83 static char *
ngx_rtmp_block(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)84 ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
85 {
86     char                        *rv;
87     ngx_uint_t                   i, m, mi, s;
88     ngx_conf_t                   pcf;
89     ngx_array_t                  ports;
90     ngx_module_t               **modules;
91     ngx_rtmp_listen_t           *listen;
92     ngx_rtmp_module_t           *module;
93     ngx_rtmp_conf_ctx_t         *ctx;
94     ngx_rtmp_core_srv_conf_t    *cscf, **cscfp;
95     ngx_rtmp_core_main_conf_t   *cmcf;
96 
97     ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_conf_ctx_t));
98     if (ctx == NULL) {
99         return NGX_CONF_ERROR;
100     }
101 
102     *(ngx_rtmp_conf_ctx_t **) conf = ctx;
103 
104     /* count the number of the rtmp modules and set up their indices */
105 #if defined(nginx_version) && nginx_version >= 1009011
106     modules = cf->cycle->modules;
107 #else
108     modules = ngx_modules;
109 #endif
110     ngx_rtmp_max_module = 0;
111     for (m = 0; modules[m]; m++) {
112         if (modules[m]->type != NGX_RTMP_MODULE) {
113             continue;
114         }
115 
116         modules[m]->ctx_index = ngx_rtmp_max_module++;
117     }
118 
119 
120     /* the rtmp main_conf context, it is the same in the all rtmp contexts */
121 
122     ctx->main_conf = ngx_pcalloc(cf->pool,
123                                  sizeof(void *) * ngx_rtmp_max_module);
124     if (ctx->main_conf == NULL) {
125         return NGX_CONF_ERROR;
126     }
127 
128 
129     /*
130      * the rtmp null srv_conf context, it is used to merge
131      * the server{}s' srv_conf's
132      */
133 
134     ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_rtmp_max_module);
135     if (ctx->srv_conf == NULL) {
136         return NGX_CONF_ERROR;
137     }
138 
139 
140     /*
141      * the rtmp null app_conf context, it is used to merge
142      * the server{}s' app_conf's
143      */
144 
145     ctx->app_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_rtmp_max_module);
146     if (ctx->app_conf == NULL) {
147         return NGX_CONF_ERROR;
148     }
149 
150 
151     /*
152      * create the main_conf's, the null srv_conf's, and the null app_conf's
153      * of the all rtmp modules
154      */
155 
156     for (m = 0; modules[m]; m++) {
157         if (modules[m]->type != NGX_RTMP_MODULE) {
158             continue;
159         }
160 
161         module = modules[m]->ctx;
162         mi = modules[m]->ctx_index;
163 
164         if (module->create_main_conf) {
165             ctx->main_conf[mi] = module->create_main_conf(cf);
166             if (ctx->main_conf[mi] == NULL) {
167                 return NGX_CONF_ERROR;
168             }
169         }
170 
171         if (module->create_srv_conf) {
172             ctx->srv_conf[mi] = module->create_srv_conf(cf);
173             if (ctx->srv_conf[mi] == NULL) {
174                 return NGX_CONF_ERROR;
175             }
176         }
177 
178         if (module->create_app_conf) {
179             ctx->app_conf[mi] = module->create_app_conf(cf);
180             if (ctx->app_conf[mi] == NULL) {
181                 return NGX_CONF_ERROR;
182             }
183         }
184     }
185 
186     pcf = *cf;
187     cf->ctx = ctx;
188 
189     for (m = 0; modules[m]; m++) {
190         if (modules[m]->type != NGX_RTMP_MODULE) {
191             continue;
192         }
193 
194         module = modules[m]->ctx;
195 
196         if (module->preconfiguration) {
197             if (module->preconfiguration(cf) != NGX_OK) {
198                 return NGX_CONF_ERROR;
199             }
200         }
201     }
202 
203     /* parse inside the rtmp{} block */
204 
205     cf->module_type = NGX_RTMP_MODULE;
206     cf->cmd_type = NGX_RTMP_MAIN_CONF;
207     rv = ngx_conf_parse(cf, NULL);
208 
209     if (rv != NGX_CONF_OK) {
210         *cf = pcf;
211         return rv;
212     }
213 
214 
215     /* init rtmp{} main_conf's, merge the server{}s' srv_conf's */
216 
217     cmcf = ctx->main_conf[ngx_rtmp_core_module.ctx_index];
218     cscfp = cmcf->servers.elts;
219 
220     for (m = 0; modules[m]; m++) {
221         if (modules[m]->type != NGX_RTMP_MODULE) {
222             continue;
223         }
224 
225         module = modules[m]->ctx;
226         mi = modules[m]->ctx_index;
227 
228         /* init rtmp{} main_conf's */
229 
230         cf->ctx = ctx;
231 
232         if (module->init_main_conf) {
233             rv = module->init_main_conf(cf, ctx->main_conf[mi]);
234             if (rv != NGX_CONF_OK) {
235                 *cf = pcf;
236                 return rv;
237             }
238         }
239 
240         for (s = 0; s < cmcf->servers.nelts; s++) {
241 
242             /* merge the server{}s' srv_conf's */
243 
244             cf->ctx = cscfp[s]->ctx;
245 
246             if (module->merge_srv_conf) {
247                 rv = module->merge_srv_conf(cf,
248                                             ctx->srv_conf[mi],
249                                             cscfp[s]->ctx->srv_conf[mi]);
250                 if (rv != NGX_CONF_OK) {
251                     *cf = pcf;
252                     return rv;
253                 }
254             }
255 
256             if (module->merge_app_conf) {
257 
258                 /* merge the server{}'s app_conf */
259 
260                 /*ctx->app_conf = cscfp[s]->ctx->loc_conf;*/
261 
262                 rv = module->merge_app_conf(cf,
263                                             ctx->app_conf[mi],
264                                             cscfp[s]->ctx->app_conf[mi]);
265                 if (rv != NGX_CONF_OK) {
266                     *cf = pcf;
267                     return rv;
268                 }
269 
270                 /* merge the applications{}' app_conf's */
271 
272                 cscf = cscfp[s]->ctx->srv_conf[ngx_rtmp_core_module.ctx_index];
273 
274                 rv = ngx_rtmp_merge_applications(cf, &cscf->applications,
275                                             cscfp[s]->ctx->app_conf,
276                                             module, mi);
277                 if (rv != NGX_CONF_OK) {
278                     *cf = pcf;
279                     return rv;
280                 }
281             }
282 
283         }
284     }
285 
286 
287     if (ngx_rtmp_init_events(cf, cmcf) != NGX_OK) {
288         return NGX_CONF_ERROR;
289     }
290 
291     for (m = 0; modules[m]; m++) {
292         if (modules[m]->type != NGX_RTMP_MODULE) {
293             continue;
294         }
295 
296         module = modules[m]->ctx;
297 
298         if (module->postconfiguration) {
299             if (module->postconfiguration(cf) != NGX_OK) {
300                 return NGX_CONF_ERROR;
301             }
302         }
303     }
304 
305     *cf = pcf;
306 
307     if (ngx_rtmp_init_event_handlers(cf, cmcf) != NGX_OK) {
308         return NGX_CONF_ERROR;
309     }
310 
311     if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_rtmp_conf_port_t))
312         != NGX_OK)
313     {
314         return NGX_CONF_ERROR;
315     }
316 
317     listen = cmcf->listen.elts;
318 
319     for (i = 0; i < cmcf->listen.nelts; i++) {
320         if (ngx_rtmp_add_ports(cf, &ports, &listen[i]) != NGX_OK) {
321             return NGX_CONF_ERROR;
322         }
323     }
324 
325     return ngx_rtmp_optimize_servers(cf, &ports);
326 }
327 
328 
329 static char *
ngx_rtmp_merge_applications(ngx_conf_t * cf,ngx_array_t * applications,void ** app_conf,ngx_rtmp_module_t * module,ngx_uint_t ctx_index)330 ngx_rtmp_merge_applications(ngx_conf_t *cf, ngx_array_t *applications,
331             void **app_conf, ngx_rtmp_module_t *module, ngx_uint_t ctx_index)
332 {
333     char                           *rv;
334     ngx_rtmp_conf_ctx_t            *ctx, saved;
335     ngx_rtmp_core_app_conf_t      **cacfp;
336     ngx_uint_t                      n;
337     ngx_rtmp_core_app_conf_t       *cacf;
338 
339     if (applications == NULL) {
340         return NGX_CONF_OK;
341     }
342 
343     ctx = (ngx_rtmp_conf_ctx_t *) cf->ctx;
344     saved = *ctx;
345 
346     cacfp = applications->elts;
347     for (n = 0; n < applications->nelts; ++n, ++cacfp) {
348 
349         ctx->app_conf = (*cacfp)->app_conf;
350 
351         rv = module->merge_app_conf(cf, app_conf[ctx_index],
352                 (*cacfp)->app_conf[ctx_index]);
353         if (rv != NGX_CONF_OK) {
354             return rv;
355         }
356 
357         cacf = (*cacfp)->app_conf[ngx_rtmp_core_module.ctx_index];
358         rv = ngx_rtmp_merge_applications(cf, &cacf->applications,
359                                          (*cacfp)->app_conf,
360                                          module, ctx_index);
361         if (rv != NGX_CONF_OK) {
362             return rv;
363         }
364     }
365 
366     *ctx = saved;
367 
368     return NGX_CONF_OK;
369 }
370 
371 
372 static ngx_int_t
ngx_rtmp_init_events(ngx_conf_t * cf,ngx_rtmp_core_main_conf_t * cmcf)373 ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
374 {
375     size_t                      n;
376 
377     for(n = 0; n < NGX_RTMP_MAX_EVENT; ++n) {
378         if (ngx_array_init(&cmcf->events[n], cf->pool, 1,
379                 sizeof(ngx_rtmp_handler_pt)) != NGX_OK)
380         {
381             return NGX_ERROR;
382         }
383     }
384 
385     if (ngx_array_init(&cmcf->amf, cf->pool, 1,
386                 sizeof(ngx_rtmp_amf_handler_t)) != NGX_OK)
387     {
388         return NGX_ERROR;
389     }
390 
391     return NGX_OK;
392 }
393 
394 
395 static ngx_int_t
ngx_rtmp_init_event_handlers(ngx_conf_t * cf,ngx_rtmp_core_main_conf_t * cmcf)396 ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
397 {
398     ngx_hash_init_t             calls_hash;
399     ngx_rtmp_handler_pt        *eh;
400     ngx_rtmp_amf_handler_t     *h;
401     ngx_hash_key_t             *ha;
402     size_t                      n, m;
403 
404     static size_t               pm_events[] = {
405         NGX_RTMP_MSG_CHUNK_SIZE,
406         NGX_RTMP_MSG_ABORT,
407         NGX_RTMP_MSG_ACK,
408         NGX_RTMP_MSG_ACK_SIZE,
409         NGX_RTMP_MSG_BANDWIDTH
410     };
411 
412     static size_t               amf_events[] = {
413         NGX_RTMP_MSG_AMF_CMD,
414         NGX_RTMP_MSG_AMF_META,
415         NGX_RTMP_MSG_AMF_SHARED,
416         NGX_RTMP_MSG_AMF3_CMD,
417         NGX_RTMP_MSG_AMF3_META,
418         NGX_RTMP_MSG_AMF3_SHARED
419     };
420 
421     /* init standard protocol events */
422     for(n = 0; n < sizeof(pm_events) / sizeof(pm_events[0]); ++n) {
423         eh = ngx_array_push(&cmcf->events[pm_events[n]]);
424         *eh = ngx_rtmp_protocol_message_handler;
425     }
426 
427     /* init amf events */
428     for(n = 0; n < sizeof(amf_events) / sizeof(amf_events[0]); ++n) {
429         eh = ngx_array_push(&cmcf->events[amf_events[n]]);
430         *eh = ngx_rtmp_amf_message_handler;
431     }
432 
433     /* init user protocol events */
434     eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]);
435     *eh = ngx_rtmp_user_message_handler;
436 
437     /* aggregate to audio/video map */
438     eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AGGREGATE]);
439     *eh = ngx_rtmp_aggregate_message_handler;
440 
441     /* init amf callbacks */
442     ngx_array_init(&cmcf->amf_arrays, cf->pool, 1, sizeof(ngx_hash_key_t));
443 
444     h = cmcf->amf.elts;
445     for(n = 0; n < cmcf->amf.nelts; ++n, ++h) {
446         ha = cmcf->amf_arrays.elts;
447         for(m = 0; m < cmcf->amf_arrays.nelts; ++m, ++ha) {
448             if (h->name.len == ha->key.len
449                     && !ngx_strncmp(h->name.data, ha->key.data, ha->key.len))
450             {
451                 break;
452             }
453         }
454         if (m == cmcf->amf_arrays.nelts) {
455             ha = ngx_array_push(&cmcf->amf_arrays);
456             ha->key = h->name;
457             ha->key_hash = ngx_hash_key_lc(ha->key.data, ha->key.len);
458             ha->value = ngx_array_create(cf->pool, 1,
459                     sizeof(ngx_rtmp_handler_pt));
460             if (ha->value == NULL) {
461                 return NGX_ERROR;
462             }
463         }
464 
465         eh = ngx_array_push((ngx_array_t*)ha->value);
466         *eh = h->handler;
467     }
468 
469     calls_hash.hash = &cmcf->amf_hash;
470     calls_hash.key = ngx_hash_key_lc;
471     calls_hash.max_size = 512;
472     calls_hash.bucket_size = ngx_cacheline_size;
473     calls_hash.name = "amf_hash";
474     calls_hash.pool = cf->pool;
475     calls_hash.temp_pool = NULL;
476 
477     if (ngx_hash_init(&calls_hash, cmcf->amf_arrays.elts, cmcf->amf_arrays.nelts)
478             != NGX_OK)
479     {
480         return NGX_ERROR;
481     }
482 
483     return NGX_OK;
484 }
485 
486 
487 static ngx_int_t
ngx_rtmp_add_ports(ngx_conf_t * cf,ngx_array_t * ports,ngx_rtmp_listen_t * listen)488 ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
489     ngx_rtmp_listen_t *listen)
490 {
491     in_port_t              p;
492     ngx_uint_t             i;
493     struct sockaddr       *sa;
494     struct sockaddr_in    *sin;
495     ngx_rtmp_conf_port_t  *port;
496     ngx_rtmp_conf_addr_t  *addr;
497 #if (NGX_HAVE_INET6)
498     struct sockaddr_in6   *sin6;
499 #endif
500 
501     sa = (struct sockaddr *) &listen->sockaddr;
502 
503     switch (sa->sa_family) {
504 
505 #if (NGX_HAVE_INET6)
506     case AF_INET6:
507         sin6 = (struct sockaddr_in6 *) sa;
508         p = sin6->sin6_port;
509         break;
510 #endif
511 
512     default: /* AF_INET */
513         sin = (struct sockaddr_in *) sa;
514         p = sin->sin_port;
515         break;
516     }
517 
518     port = ports->elts;
519     for (i = 0; i < ports->nelts; i++) {
520         if (p == port[i].port && sa->sa_family == port[i].family) {
521 
522             /* a port is already in the port list */
523 
524             port = &port[i];
525             goto found;
526         }
527     }
528 
529     /* add a port to the port list */
530 
531     port = ngx_array_push(ports);
532     if (port == NULL) {
533         return NGX_ERROR;
534     }
535 
536     port->family = sa->sa_family;
537     port->port = p;
538 
539     if (ngx_array_init(&port->addrs, cf->temp_pool, 2,
540                        sizeof(ngx_rtmp_conf_addr_t))
541         != NGX_OK)
542     {
543         return NGX_ERROR;
544     }
545 
546 found:
547 
548     addr = ngx_array_push(&port->addrs);
549     if (addr == NULL) {
550         return NGX_ERROR;
551     }
552 
553     addr->sockaddr = (struct sockaddr *) &listen->sockaddr;
554     addr->socklen = listen->socklen;
555     addr->ctx = listen->ctx;
556     addr->bind = listen->bind;
557     addr->wildcard = listen->wildcard;
558     addr->so_keepalive = listen->so_keepalive;
559     addr->proxy_protocol = listen->proxy_protocol;
560 #if (NGX_HAVE_KEEPALIVE_TUNABLE)
561     addr->tcp_keepidle = listen->tcp_keepidle;
562     addr->tcp_keepintvl = listen->tcp_keepintvl;
563     addr->tcp_keepcnt = listen->tcp_keepcnt;
564 #endif
565 #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
566     addr->ipv6only = listen->ipv6only;
567 #endif
568 
569     return NGX_OK;
570 }
571 
572 
573 static char *
ngx_rtmp_optimize_servers(ngx_conf_t * cf,ngx_array_t * ports)574 ngx_rtmp_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
575 {
576     ngx_uint_t             i, p, last, bind_wildcard;
577     ngx_listening_t       *ls;
578     ngx_rtmp_port_t       *mport;
579     ngx_rtmp_conf_port_t  *port;
580     ngx_rtmp_conf_addr_t  *addr;
581 
582     port = ports->elts;
583     for (p = 0; p < ports->nelts; p++) {
584 
585         ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
586                  sizeof(ngx_rtmp_conf_addr_t), ngx_rtmp_cmp_conf_addrs);
587 
588         addr = port[p].addrs.elts;
589         last = port[p].addrs.nelts;
590 
591         /*
592          * if there is the binding to the "*:port" then we need to bind()
593          * to the "*:port" only and ignore the other bindings
594          */
595 
596         if (addr[last - 1].wildcard) {
597             addr[last - 1].bind = 1;
598             bind_wildcard = 1;
599 
600         } else {
601             bind_wildcard = 0;
602         }
603 
604         i = 0;
605 
606         while (i < last) {
607 
608             if (bind_wildcard && !addr[i].bind) {
609                 i++;
610                 continue;
611             }
612 
613             ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen);
614             if (ls == NULL) {
615                 return NGX_CONF_ERROR;
616             }
617 
618             ls->addr_ntop = 1;
619             ls->handler = ngx_rtmp_init_connection;
620             ls->pool_size = 4096;
621 
622             /* TODO: error_log directive */
623             ls->logp = &cf->cycle->new_log;
624             ls->log.data = &ls->addr_text;
625             ls->log.handler = ngx_accept_log_error;
626 
627             ls->keepalive = addr[i].so_keepalive;
628 #if (NGX_HAVE_KEEPALIVE_TUNABLE)
629             ls->keepidle = addr[i].tcp_keepidle;
630             ls->keepintvl = addr[i].tcp_keepintvl;
631             ls->keepcnt = addr[i].tcp_keepcnt;
632 #endif
633 
634 #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
635             ls->ipv6only = addr[i].ipv6only;
636 #endif
637 
638             mport = ngx_palloc(cf->pool, sizeof(ngx_rtmp_port_t));
639             if (mport == NULL) {
640                 return NGX_CONF_ERROR;
641             }
642 
643             ls->servers = mport;
644 
645             if (i == last - 1) {
646                 mport->naddrs = last;
647 
648             } else {
649                 mport->naddrs = 1;
650                 i = 0;
651             }
652 
653             switch (ls->sockaddr->sa_family) {
654 #if (NGX_HAVE_INET6)
655             case AF_INET6:
656                 if (ngx_rtmp_add_addrs6(cf, mport, addr) != NGX_OK) {
657                     return NGX_CONF_ERROR;
658                 }
659                 break;
660 #endif
661             default: /* AF_INET */
662                 if (ngx_rtmp_add_addrs(cf, mport, addr) != NGX_OK) {
663                     return NGX_CONF_ERROR;
664                 }
665                 break;
666             }
667 
668             addr++;
669             last--;
670         }
671     }
672 
673     return NGX_CONF_OK;
674 }
675 
676 
677 static ngx_int_t
ngx_rtmp_add_addrs(ngx_conf_t * cf,ngx_rtmp_port_t * mport,ngx_rtmp_conf_addr_t * addr)678 ngx_rtmp_add_addrs(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
679     ngx_rtmp_conf_addr_t *addr)
680 {
681     u_char              *p;
682     size_t               len;
683     ngx_uint_t           i;
684     ngx_rtmp_in_addr_t  *addrs;
685     struct sockaddr_in  *sin;
686     u_char               buf[NGX_SOCKADDR_STRLEN];
687 
688     mport->addrs = ngx_pcalloc(cf->pool,
689                                mport->naddrs * sizeof(ngx_rtmp_in_addr_t));
690     if (mport->addrs == NULL) {
691         return NGX_ERROR;
692     }
693 
694     addrs = mport->addrs;
695 
696     for (i = 0; i < mport->naddrs; i++) {
697 
698         sin = (struct sockaddr_in *) addr[i].sockaddr;
699         addrs[i].addr = sin->sin_addr.s_addr;
700 
701         addrs[i].conf.ctx = addr[i].ctx;
702 
703         len = ngx_sock_ntop(addr[i].sockaddr,
704 #if (nginx_version >= 1005003)
705                             addr[i].socklen,
706 #endif
707                             buf, NGX_SOCKADDR_STRLEN, 1);
708 
709         p = ngx_pnalloc(cf->pool, len);
710         if (p == NULL) {
711             return NGX_ERROR;
712         }
713 
714         ngx_memcpy(p, buf, len);
715 
716         addrs[i].conf.addr_text.len = len;
717         addrs[i].conf.addr_text.data = p;
718         addrs[i].conf.proxy_protocol = addr->proxy_protocol;
719     }
720 
721     return NGX_OK;
722 }
723 
724 
725 #if (NGX_HAVE_INET6)
726 
727 static ngx_int_t
ngx_rtmp_add_addrs6(ngx_conf_t * cf,ngx_rtmp_port_t * mport,ngx_rtmp_conf_addr_t * addr)728 ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
729     ngx_rtmp_conf_addr_t *addr)
730 {
731     u_char               *p;
732     size_t                len;
733     ngx_uint_t            i;
734     ngx_rtmp_in6_addr_t  *addrs6;
735     struct sockaddr_in6  *sin6;
736     u_char                buf[NGX_SOCKADDR_STRLEN];
737 
738     mport->addrs = ngx_pcalloc(cf->pool,
739                                mport->naddrs * sizeof(ngx_rtmp_in6_addr_t));
740     if (mport->addrs == NULL) {
741         return NGX_ERROR;
742     }
743 
744     addrs6 = mport->addrs;
745 
746     for (i = 0; i < mport->naddrs; i++) {
747 
748         sin6 = (struct sockaddr_in6 *) addr[i].sockaddr;
749         addrs6[i].addr6 = sin6->sin6_addr;
750 
751         addrs6[i].conf.ctx = addr[i].ctx;
752 
753         len = ngx_sock_ntop(addr[i].sockaddr,
754 #if (nginx_version >= 1005003)
755                             addr[i].socklen,
756 #endif
757                             buf, NGX_SOCKADDR_STRLEN, 1);
758 
759         p = ngx_pnalloc(cf->pool, len);
760         if (p == NULL) {
761             return NGX_ERROR;
762         }
763 
764         ngx_memcpy(p, buf, len);
765 
766         addrs6[i].conf.addr_text.len = len;
767         addrs6[i].conf.addr_text.data = p;
768         addrs6[i].conf.proxy_protocol = addr->proxy_protocol;
769     }
770 
771     return NGX_OK;
772 }
773 
774 #endif
775 
776 
777 static ngx_int_t
ngx_rtmp_cmp_conf_addrs(const void * one,const void * two)778 ngx_rtmp_cmp_conf_addrs(const void *one, const void *two)
779 {
780     ngx_rtmp_conf_addr_t  *first, *second;
781 
782     first = (ngx_rtmp_conf_addr_t *) one;
783     second = (ngx_rtmp_conf_addr_t *) two;
784 
785     if (first->wildcard) {
786         /* a wildcard must be the last resort, shift it to the end */
787         return 1;
788     }
789 
790     if (first->bind && !second->bind) {
791         /* shift explicit bind()ed addresses to the start */
792         return -1;
793     }
794 
795     if (!first->bind && second->bind) {
796         /* shift explicit bind()ed addresses to the start */
797         return 1;
798     }
799 
800     /* do not sort by default */
801 
802     return 0;
803 }
804 
805 
806 ngx_int_t
ngx_rtmp_fire_event(ngx_rtmp_session_t * s,ngx_uint_t evt,ngx_rtmp_header_t * h,ngx_chain_t * in)807 ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt,
808         ngx_rtmp_header_t *h, ngx_chain_t *in)
809 {
810     ngx_rtmp_core_main_conf_t      *cmcf;
811     ngx_array_t                    *ch;
812     ngx_rtmp_handler_pt            *hh;
813     size_t                          n;
814 
815     cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
816 
817     ch = &cmcf->events[evt];
818     hh = ch->elts;
819     for(n = 0; n < ch->nelts; ++n, ++hh) {
820         if (*hh && (*hh)(s, h, in) != NGX_OK) {
821             return NGX_ERROR;
822         }
823     }
824     return NGX_OK;
825 }
826 
827 
828 void *
ngx_rtmp_rmemcpy(void * dst,const void * src,size_t n)829 ngx_rtmp_rmemcpy(void *dst, const void* src, size_t n)
830 {
831     u_char     *d, *s;
832 
833     d = dst;
834     s = (u_char*)src + n - 1;
835 
836     while(s >= (u_char*)src) {
837         *d++ = *s--;
838     }
839 
840     return dst;
841 }
842 
843 
844 static ngx_int_t
ngx_rtmp_init_process(ngx_cycle_t * cycle)845 ngx_rtmp_init_process(ngx_cycle_t *cycle)
846 {
847 #if (nginx_version >= 1007005)
848     ngx_queue_init((ngx_queue_t*) &ngx_rtmp_init_queue);
849 #endif
850     return NGX_OK;
851 }
852