From b960e68a0b6857b4b6bb90457bc25612ae111c06 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 19 Jul 2012 12:10:59 +0400 Subject: [PATCH] fully tunable auto-push feature --- config | 2 + ngx_rtmp_auto_push_module.c | 489 ++++++++++++++++++++++++++++++++++++ ngx_rtmp_relay_module.c | 39 +-- ngx_rtmp_relay_module.h | 34 ++- 4 files changed, 533 insertions(+), 31 deletions(-) create mode 100644 ngx_rtmp_auto_push_module.c diff --git a/config b/config index cd6f634..16e0bf9 100644 --- a/config +++ b/config @@ -13,6 +13,7 @@ CORE_MODULES="$CORE_MODULES ngx_rtmp_exec_module \ ngx_rtmp_codec_module \ ngx_rtmp_play_module \ + ngx_rtmp_auto_push_module \ " @@ -43,6 +44,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_exec_module.c \ $ngx_addon_dir/ngx_rtmp_codec_module.c \ $ngx_addon_dir/ngx_rtmp_play_module.c \ + $ngx_addon_dir/ngx_rtmp_auto_push_module.c \ " CFLAGS="$CFLAGS -I$ngx_addon_dir" diff --git a/ngx_rtmp_auto_push_module.c b/ngx_rtmp_auto_push_module.c new file mode 100644 index 0000000..9bede59 --- /dev/null +++ b/ngx_rtmp_auto_push_module.c @@ -0,0 +1,489 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include "ngx_rtmp_cmd_module.h" +#include "ngx_rtmp_relay_module.h" + + +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_delete_stream_pt next_delete_stream; + + +static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle); +static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle); +static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf); +static char * ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf); +static ngx_int_t ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, + ngx_rtmp_publish_t *v); +static ngx_int_t ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s, + ngx_rtmp_delete_stream_t *v); + + +typedef struct ngx_rtmp_auto_push_ctx_s ngx_rtmp_auto_push_ctx_t; + +struct ngx_rtmp_auto_push_ctx_s { + ngx_int_t *slots; /* NGX_MAX_PROCESSES */ + ngx_str_t name; + ngx_event_t push_evt; +}; + + +typedef struct { + ngx_flag_t auto_push; + ngx_str_t socket_dir; + ngx_msec_t push_reconnect; +} ngx_rtmp_auto_push_conf_t; + + +static ngx_command_t ngx_rtmp_auto_push_commands[] = { + + { ngx_string("rtmp_auto_push"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + 0, + offsetof(ngx_rtmp_auto_push_conf_t, auto_push), + NULL }, + + { ngx_string("rtmp_auto_push_reconnect"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + 0, + offsetof(ngx_rtmp_auto_push_conf_t, push_reconnect), + NULL }, + + { ngx_string("rtmp_socket_dir"), + NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + 0, + offsetof(ngx_rtmp_auto_push_conf_t, socket_dir), + NULL }, + + ngx_null_command +}; + + +static ngx_core_module_t ngx_rtmp_auto_push_module_ctx = { + ngx_string("rtmp_auto_push"), + ngx_rtmp_auto_push_create_conf, /* create conf */ + ngx_rtmp_auto_push_init_conf /* init conf */ +}; + + +ngx_module_t ngx_rtmp_auto_push_module = { + NGX_MODULE_V1, + &ngx_rtmp_auto_push_module_ctx, /* module context */ + ngx_rtmp_auto_push_commands, /* module directives */ + NGX_CORE_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + ngx_rtmp_auto_push_init_process, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + ngx_rtmp_auto_push_exit_process, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +#define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp" +#define NGX_RTMP_AUTO_PUSH_PAGEURL "nginx-auto-push" + + +static ngx_int_t +ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle) +{ +#if (NGX_HAVE_UNIX_DOMAIN) + ngx_rtmp_auto_push_conf_t *apcf; + ngx_listening_t *ls, *lss; + struct sockaddr_un *sun; + int reuseaddr; + ngx_socket_t s; + size_t n; + ngx_file_info_t fi; + + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_rtmp_auto_push_module); + if (apcf->auto_push == 0) { + return NGX_OK; + } + + + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_auto_push_publish; + + next_delete_stream = ngx_rtmp_delete_stream; + ngx_rtmp_delete_stream = ngx_rtmp_auto_push_delete_stream; + + + reuseaddr = 1; + s = (ngx_socket_t) -1; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "auto_push: creating sockets"); + + /*TODO: clone all RTMP listenings? */ + ls = cycle->listening.elts; + lss = NULL; + for (n = 0; n < cycle->listening.nelts; ++n, ++ls) { + if (ls->handler == ngx_rtmp_init_connection) { + lss = ls; + break; + } + } + + if (lss == NULL) { + return NGX_OK; + } + + ls = ngx_array_push(&cycle->listening); + if (ls == NULL) { + return NGX_ERROR; + } + + *ls = *lss; + + ls->socklen = sizeof(struct sockaddr_un); + sun = ngx_pcalloc(cycle->pool, ls->socklen); + ls->sockaddr = (struct sockaddr *) sun; + if (ls->sockaddr == NULL) { + return NGX_ERROR; + } + sun->sun_family = AF_UNIX; + *ngx_snprintf((u_char *) sun->sun_path, sizeof(sun->sun_path), + "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i", + &apcf->socket_dir, ngx_process_slot) + = 0; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "auto_push: create socket '%s'", + sun->sun_path); + + if (ngx_file_info(sun->sun_path, &fi) != ENOENT) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0, + "auto_push: delete existing socket '%s'", + sun->sun_path); + ngx_delete_file(sun->sun_path); + } + + ngx_str_set(&ls->addr_text, "worker_socket"); + + s = ngx_socket(AF_UNIX, SOCK_STREAM, 0); + if (s == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + ngx_socket_n " worker_socket failed"); + return NGX_ERROR; + } + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + (const void *) &reuseaddr, sizeof(int)) + == -1) + { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + "setsockopt(SO_REUSEADDR) worker_socket failed"); + goto sock_error; + } + + if (!(ngx_event_flags & NGX_USE_AIO_EVENT)) { + if (ngx_nonblocking(s) == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + ngx_nonblocking_n " worker_socket failed"); + return NGX_ERROR; + } + } + + if (bind(s, sun, sizeof(*sun)) == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + ngx_nonblocking_n " worker_socket bind failed"); + goto sock_error; + } + + if (listen(s, NGX_LISTEN_BACKLOG) == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + "listen() to worker_socket, backlog %d failed", + NGX_LISTEN_BACKLOG); + goto sock_error; + } + + ls->fd = s; + ls->listen = 1; + + return NGX_OK; + +sock_error: + if (s != (ngx_socket_t) -1 && ngx_close_socket(s) == -1) { + ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno, + ngx_close_socket_n " worker_socket failed"); + } + ngx_delete_file(sun->sun_path); + + return NGX_ERROR; + +#else /* NGX_HAVE_UNIX_DOMAIN */ + + return NGX_OK; + +#endif /* NGX_HAVE_UNIX_DOMAIN */ +} + + +static void +ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle) +{ +#if (NGX_HAVE_UNIX_DOMAIN) + ngx_rtmp_auto_push_conf_t *apcf; + u_char path[NGX_MAX_PATH]; + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx, + ngx_rtmp_auto_push_module); + if (apcf->auto_push == 0) { + return; + } + *ngx_snprintf(path, sizeof(path), + "%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i", + &apcf->socket_dir, ngx_process_slot) + = 0; + + ngx_delete_file(path); + +#endif +} + + +static void * +ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cycle) +{ + ngx_rtmp_auto_push_conf_t *apcf; + + apcf = ngx_pcalloc(cycle->pool, sizeof(ngx_rtmp_auto_push_conf_t)); + if (apcf == NULL) { + return NULL; + } + + apcf->auto_push = NGX_CONF_UNSET; + apcf->push_reconnect = NGX_CONF_UNSET; + + return apcf; +} + + +static char * +ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf) +{ + ngx_rtmp_auto_push_conf_t *apcf = conf; + + ngx_conf_init_value(apcf->auto_push, 0); + ngx_conf_init_msec_value(apcf->push_reconnect, 100); + + if (apcf->socket_dir.len == 0) { + ngx_str_set(&apcf->socket_dir, "/tmp"); + } + + return NGX_CONF_OK; +} + + +static void +ngx_rtmp_auto_push_reconnect(ngx_event_t *ev) +{ + ngx_rtmp_session_t *s = ev->data; + + ngx_rtmp_auto_push_conf_t *apcf; + ngx_rtmp_auto_push_ctx_t *ctx; + ngx_int_t *slot; + ngx_int_t n; + ngx_rtmp_relay_target_t at; + u_char path[sizeof("unix:") + NGX_MAX_PATH]; + u_char flash_ver[sizeof("APSH.") + 2 + + NGX_OFF_T_LEN * 2]; + u_char *p; + ngx_str_t *u; + ngx_pid_t pid; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: reconnect"); + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx, + ngx_rtmp_auto_push_module); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module); + if (ctx == NULL) { + return; + } + + ngx_memzero(&at, sizeof(at)); + ngx_str_set(&at.page_url, NGX_RTMP_AUTO_PUSH_PAGEURL); + at.tag = &ngx_rtmp_auto_push_module; + + slot = ctx->slots; + + for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) { + if (n == ngx_process_slot) { + continue; + } + + pid = ngx_processes[n].pid; + if (pid == 0 || pid == -1) { + continue; + } + + if (*slot) { + continue; + } + + at.data = &ngx_processes[n]; + + ngx_memzero(&at.url, sizeof(at.url)); + u = &at.url.url; + p = ngx_snprintf(path, sizeof(path) - 1, + "unix:%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i", + &apcf->socket_dir, n); + *p = 0; + u->data = path; + u->len = p - path; + if (ngx_parse_url(s->connection->pool, &at.url) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "auto_push: auto-push parse_url failed " + "url='%V' name='%V'", + u, &ctx->name); + continue; + } + + p = ngx_snprintf(flash_ver, sizeof(flash_ver) - 1, "APSH,%i,%i", + (ngx_int_t) ngx_process_slot, (ngx_int_t) ngx_pid); + at.flash_ver.data = flash_ver; + at.flash_ver.len = p - flash_ver; + + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: connect slot=%i pid=%i socket='%s' " + "name='%V'", + n, (ngx_int_t) pid, path, &ctx->name); + + if (ngx_rtmp_relay_push(s, &ctx->name, &at) == NGX_OK) { + *slot = 1; + continue; + } + + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "auto_push: connect failed: slot=%i pid=%i socket='%s'" + "url='%V' name='%V'", + n, (ngx_int_t) pid, path, u, &ctx->name); + + if (!ctx->push_evt.timer_set) { + ngx_add_timer(&ctx->push_evt, apcf->push_reconnect); + } + } +} + + +static ngx_int_t +ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) +{ + ngx_rtmp_auto_push_conf_t *apcf; + ngx_rtmp_auto_push_ctx_t *ctx; + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx, + ngx_rtmp_auto_push_module); + if (apcf->auto_push == 0) { + goto next; + } + + /* auto-push from another worker? */ + if (s->page_url.len == sizeof(NGX_RTMP_AUTO_PUSH_PAGEURL) - 1 && + ngx_memcmp(s->page_url.data, NGX_RTMP_AUTO_PUSH_PAGEURL, + s->page_url.len) == 0) + { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module); + if (ctx == NULL) { + ctx = ngx_palloc(s->connection->pool, + sizeof(ngx_rtmp_auto_push_ctx_t)); + if (ctx == NULL) { + goto next; + } + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_auto_push_module); + + } + ngx_memzero(ctx, sizeof(*ctx)); + + ctx->push_evt.data = s; + ctx->push_evt.log = s->connection->log; + ctx->push_evt.handler = ngx_rtmp_auto_push_reconnect; + + ctx->slots = ngx_pcalloc(s->connection->pool, + sizeof(ngx_int_t) * NGX_MAX_PROCESSES); + if (ctx->slots == NULL) { + goto next; + } + + ctx->name.len = ngx_strlen(v->name); + ctx->name.data = ngx_palloc(s->connection->pool, ctx->name.len); + if (ctx->name.data == NULL) { + goto next; + } + ngx_memcpy(ctx->name.data, v->name, ctx->name.len); + + ngx_rtmp_auto_push_reconnect(&ctx->push_evt); + +next: + return next_publish(s, v); +} + + +static ngx_int_t +ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s, + ngx_rtmp_delete_stream_t *v) +{ + ngx_rtmp_auto_push_conf_t *apcf; + ngx_rtmp_auto_push_ctx_t *ctx, *pctx; + ngx_rtmp_relay_ctx_t *rctx; + ngx_int_t slot; + + apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx, + ngx_rtmp_auto_push_module); + if (apcf->auto_push == 0) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module); + if (ctx) { + if (ctx->push_evt.timer_set) { + ngx_del_timer(&ctx->push_evt); + } + goto next; + } + + /* skip non-relays & publishers */ + rctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (rctx == NULL || + rctx->tag != &ngx_rtmp_auto_push_module || + rctx->publish == NULL) + { + goto next; + } + + slot = (ngx_process_t *) rctx->data - &ngx_processes[0]; + + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: disconnect slot=%i app='%V' name='%V'", + slot, &rctx->app, &rctx->name); + + pctx = ngx_rtmp_get_module_ctx(rctx->publish->session, + ngx_rtmp_auto_push_module); + if (pctx == NULL) { + goto next; + } + + pctx->slots[slot] = 0; + + /* push reconnect */ + if (!pctx->push_evt.timer_set) { + ngx_add_timer(&pctx->push_evt, apcf->push_reconnect); + } + +next: + return next_delete_stream(s, v); +} diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index e3d234f..fc008be 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -36,33 +36,6 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, */ -typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t; - -struct ngx_rtmp_relay_ctx_s { - ngx_str_t name; - ngx_str_t url; - ngx_log_t log; - ngx_rtmp_session_t *session; - ngx_rtmp_relay_ctx_t *publish; - ngx_rtmp_relay_ctx_t *play; - ngx_rtmp_relay_ctx_t *next; - unsigned relay:1; - - ngx_str_t app; - ngx_str_t tc_url; - ngx_str_t page_url; - ngx_str_t swf_url; - ngx_str_t flash_ver; - ngx_str_t play_path; - ngx_int_t live; - ngx_int_t start; - ngx_int_t stop; - - ngx_event_t push_evt; - void *tag; -}; - - typedef struct { ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */ ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */ @@ -221,7 +194,9 @@ ngx_rtmp_relay_reconnect(ngx_event_t *ev) } for (pctx = ctx->play; pctx; pctx = pctx->next) { - if (pctx->tag == target) { + if (pctx->tag == &ngx_rtmp_relay_module && + pctx->data == target) + { break; } } @@ -318,6 +293,7 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, } rctx->tag = target->tag; + rctx->data = target->data; #define NGX_RTMP_RELAY_STR_COPY(to, from) \ if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \ @@ -1239,7 +1215,9 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) &ctx->app, &ctx->name); /* push reconnect */ - if (ctx->relay && ctx->tag && !ctx->publish->push_evt.timer_set) { + if (ctx->relay && ctx->tag == &ngx_rtmp_relay_module && + !ctx->publish->push_evt.timer_set) + { ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect); } @@ -1326,7 +1304,8 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) *t = target; - target->tag = target; + target->tag = &ngx_rtmp_relay_module; + target->data = target; u = &target->url; u->default_port = 1935; diff --git a/ngx_rtmp_relay_module.h b/ngx_rtmp_relay_module.h index e949112..7e676bf 100644 --- a/ngx_rtmp_relay_module.h +++ b/ngx_rtmp_relay_module.h @@ -23,10 +23,42 @@ typedef struct { ngx_int_t start; ngx_int_t stop; - void *tag; + void *tag; /* usually module reference */ + void *data; /* module-specific data */ } ngx_rtmp_relay_target_t; +typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t; + +struct ngx_rtmp_relay_ctx_s { + ngx_str_t name; + ngx_str_t url; + ngx_log_t log; + ngx_rtmp_session_t *session; + ngx_rtmp_relay_ctx_t *publish; + ngx_rtmp_relay_ctx_t *play; + ngx_rtmp_relay_ctx_t *next; + unsigned relay:1; + + ngx_str_t app; + ngx_str_t tc_url; + ngx_str_t page_url; + ngx_str_t swf_url; + ngx_str_t flash_ver; + ngx_str_t play_path; + ngx_int_t live; + ngx_int_t start; + ngx_int_t stop; + + ngx_event_t push_evt; + void *tag; + void *data; +}; + + +extern ngx_module_t ngx_rtmp_relay_module; + + ngx_int_t ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_rtmp_relay_target_t *target); ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,