mirror of
https://github.com/zotanmew/nginx-rtmp-module.git
synced 2024-05-08 05:41:08 +02:00
fully tunable auto-push feature
This commit is contained in:
parent
4e475cccb5
commit
b960e68a0b
2
config
2
config
|
@ -13,6 +13,7 @@ CORE_MODULES="$CORE_MODULES
|
|||
ngx_rtmp_exec_module \
|
||||
ngx_rtmp_codec_module \
|
||||
ngx_rtmp_play_module \
|
||||
ngx_rtmp_auto_push_module \
|
||||
"
|
||||
|
||||
|
||||
|
@ -43,6 +44,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
|
|||
$ngx_addon_dir/ngx_rtmp_exec_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_codec_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_play_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_auto_push_module.c \
|
||||
"
|
||||
CFLAGS="$CFLAGS -I$ngx_addon_dir"
|
||||
|
||||
|
|
489
ngx_rtmp_auto_push_module.c
Normal file
489
ngx_rtmp_auto_push_module.c
Normal file
|
@ -0,0 +1,489 @@
|
|||
/*
|
||||
* Copyright (c) 2012 Roman Arutyunyan
|
||||
*/
|
||||
|
||||
|
||||
#include "ngx_rtmp_cmd_module.h"
|
||||
#include "ngx_rtmp_relay_module.h"
|
||||
|
||||
|
||||
static ngx_rtmp_publish_pt next_publish;
|
||||
static ngx_rtmp_delete_stream_pt next_delete_stream;
|
||||
|
||||
|
||||
static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle);
|
||||
static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle);
|
||||
static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf);
|
||||
static char * ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf);
|
||||
static ngx_int_t ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_publish_t *v);
|
||||
static ngx_int_t ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_delete_stream_t *v);
|
||||
|
||||
|
||||
typedef struct ngx_rtmp_auto_push_ctx_s ngx_rtmp_auto_push_ctx_t;
|
||||
|
||||
struct ngx_rtmp_auto_push_ctx_s {
|
||||
ngx_int_t *slots; /* NGX_MAX_PROCESSES */
|
||||
ngx_str_t name;
|
||||
ngx_event_t push_evt;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_flag_t auto_push;
|
||||
ngx_str_t socket_dir;
|
||||
ngx_msec_t push_reconnect;
|
||||
} ngx_rtmp_auto_push_conf_t;
|
||||
|
||||
|
||||
static ngx_command_t ngx_rtmp_auto_push_commands[] = {
|
||||
|
||||
{ ngx_string("rtmp_auto_push"),
|
||||
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_flag_slot,
|
||||
0,
|
||||
offsetof(ngx_rtmp_auto_push_conf_t, auto_push),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("rtmp_auto_push_reconnect"),
|
||||
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_msec_slot,
|
||||
0,
|
||||
offsetof(ngx_rtmp_auto_push_conf_t, push_reconnect),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("rtmp_socket_dir"),
|
||||
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_str_slot,
|
||||
0,
|
||||
offsetof(ngx_rtmp_auto_push_conf_t, socket_dir),
|
||||
NULL },
|
||||
|
||||
ngx_null_command
|
||||
};
|
||||
|
||||
|
||||
static ngx_core_module_t ngx_rtmp_auto_push_module_ctx = {
|
||||
ngx_string("rtmp_auto_push"),
|
||||
ngx_rtmp_auto_push_create_conf, /* create conf */
|
||||
ngx_rtmp_auto_push_init_conf /* init conf */
|
||||
};
|
||||
|
||||
|
||||
ngx_module_t ngx_rtmp_auto_push_module = {
|
||||
NGX_MODULE_V1,
|
||||
&ngx_rtmp_auto_push_module_ctx, /* module context */
|
||||
ngx_rtmp_auto_push_commands, /* module directives */
|
||||
NGX_CORE_MODULE, /* module type */
|
||||
NULL, /* init master */
|
||||
NULL, /* init module */
|
||||
ngx_rtmp_auto_push_init_process, /* init process */
|
||||
NULL, /* init thread */
|
||||
NULL, /* exit thread */
|
||||
ngx_rtmp_auto_push_exit_process, /* exit process */
|
||||
NULL, /* exit master */
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
|
||||
#define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-rtmp"
|
||||
#define NGX_RTMP_AUTO_PUSH_PAGEURL "nginx-auto-push"
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
|
||||
{
|
||||
#if (NGX_HAVE_UNIX_DOMAIN)
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
ngx_listening_t *ls, *lss;
|
||||
struct sockaddr_un *sun;
|
||||
int reuseaddr;
|
||||
ngx_socket_t s;
|
||||
size_t n;
|
||||
ngx_file_info_t fi;
|
||||
|
||||
|
||||
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
|
||||
ngx_rtmp_auto_push_module);
|
||||
if (apcf->auto_push == 0) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
next_publish = ngx_rtmp_publish;
|
||||
ngx_rtmp_publish = ngx_rtmp_auto_push_publish;
|
||||
|
||||
next_delete_stream = ngx_rtmp_delete_stream;
|
||||
ngx_rtmp_delete_stream = ngx_rtmp_auto_push_delete_stream;
|
||||
|
||||
|
||||
reuseaddr = 1;
|
||||
s = (ngx_socket_t) -1;
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
|
||||
"auto_push: creating sockets");
|
||||
|
||||
/*TODO: clone all RTMP listenings? */
|
||||
ls = cycle->listening.elts;
|
||||
lss = NULL;
|
||||
for (n = 0; n < cycle->listening.nelts; ++n, ++ls) {
|
||||
if (ls->handler == ngx_rtmp_init_connection) {
|
||||
lss = ls;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (lss == NULL) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
ls = ngx_array_push(&cycle->listening);
|
||||
if (ls == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
*ls = *lss;
|
||||
|
||||
ls->socklen = sizeof(struct sockaddr_un);
|
||||
sun = ngx_pcalloc(cycle->pool, ls->socklen);
|
||||
ls->sockaddr = (struct sockaddr *) sun;
|
||||
if (ls->sockaddr == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
sun->sun_family = AF_UNIX;
|
||||
*ngx_snprintf((u_char *) sun->sun_path, sizeof(sun->sun_path),
|
||||
"%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
|
||||
&apcf->socket_dir, ngx_process_slot)
|
||||
= 0;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
|
||||
"auto_push: create socket '%s'",
|
||||
sun->sun_path);
|
||||
|
||||
if (ngx_file_info(sun->sun_path, &fi) != ENOENT) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
|
||||
"auto_push: delete existing socket '%s'",
|
||||
sun->sun_path);
|
||||
ngx_delete_file(sun->sun_path);
|
||||
}
|
||||
|
||||
ngx_str_set(&ls->addr_text, "worker_socket");
|
||||
|
||||
s = ngx_socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (s == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
ngx_socket_n " worker_socket failed");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
|
||||
(const void *) &reuseaddr, sizeof(int))
|
||||
== -1)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
"setsockopt(SO_REUSEADDR) worker_socket failed");
|
||||
goto sock_error;
|
||||
}
|
||||
|
||||
if (!(ngx_event_flags & NGX_USE_AIO_EVENT)) {
|
||||
if (ngx_nonblocking(s) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
ngx_nonblocking_n " worker_socket failed");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (bind(s, sun, sizeof(*sun)) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
ngx_nonblocking_n " worker_socket bind failed");
|
||||
goto sock_error;
|
||||
}
|
||||
|
||||
if (listen(s, NGX_LISTEN_BACKLOG) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
"listen() to worker_socket, backlog %d failed",
|
||||
NGX_LISTEN_BACKLOG);
|
||||
goto sock_error;
|
||||
}
|
||||
|
||||
ls->fd = s;
|
||||
ls->listen = 1;
|
||||
|
||||
return NGX_OK;
|
||||
|
||||
sock_error:
|
||||
if (s != (ngx_socket_t) -1 && ngx_close_socket(s) == -1) {
|
||||
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
|
||||
ngx_close_socket_n " worker_socket failed");
|
||||
}
|
||||
ngx_delete_file(sun->sun_path);
|
||||
|
||||
return NGX_ERROR;
|
||||
|
||||
#else /* NGX_HAVE_UNIX_DOMAIN */
|
||||
|
||||
return NGX_OK;
|
||||
|
||||
#endif /* NGX_HAVE_UNIX_DOMAIN */
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle)
|
||||
{
|
||||
#if (NGX_HAVE_UNIX_DOMAIN)
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
u_char path[NGX_MAX_PATH];
|
||||
|
||||
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
|
||||
ngx_rtmp_auto_push_module);
|
||||
if (apcf->auto_push == 0) {
|
||||
return;
|
||||
}
|
||||
*ngx_snprintf(path, sizeof(path),
|
||||
"%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
|
||||
&apcf->socket_dir, ngx_process_slot)
|
||||
= 0;
|
||||
|
||||
ngx_delete_file(path);
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
static void *
|
||||
ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cycle)
|
||||
{
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
|
||||
apcf = ngx_pcalloc(cycle->pool, sizeof(ngx_rtmp_auto_push_conf_t));
|
||||
if (apcf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
apcf->auto_push = NGX_CONF_UNSET;
|
||||
apcf->push_reconnect = NGX_CONF_UNSET;
|
||||
|
||||
return apcf;
|
||||
}
|
||||
|
||||
|
||||
static char *
|
||||
ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf)
|
||||
{
|
||||
ngx_rtmp_auto_push_conf_t *apcf = conf;
|
||||
|
||||
ngx_conf_init_value(apcf->auto_push, 0);
|
||||
ngx_conf_init_msec_value(apcf->push_reconnect, 100);
|
||||
|
||||
if (apcf->socket_dir.len == 0) {
|
||||
ngx_str_set(&apcf->socket_dir, "/tmp");
|
||||
}
|
||||
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_auto_push_reconnect(ngx_event_t *ev)
|
||||
{
|
||||
ngx_rtmp_session_t *s = ev->data;
|
||||
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
ngx_rtmp_auto_push_ctx_t *ctx;
|
||||
ngx_int_t *slot;
|
||||
ngx_int_t n;
|
||||
ngx_rtmp_relay_target_t at;
|
||||
u_char path[sizeof("unix:") + NGX_MAX_PATH];
|
||||
u_char flash_ver[sizeof("APSH.") + 2
|
||||
+ NGX_OFF_T_LEN * 2];
|
||||
u_char *p;
|
||||
ngx_str_t *u;
|
||||
ngx_pid_t pid;
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"auto_push: reconnect");
|
||||
|
||||
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
|
||||
ngx_rtmp_auto_push_module);
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
|
||||
if (ctx == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
ngx_memzero(&at, sizeof(at));
|
||||
ngx_str_set(&at.page_url, NGX_RTMP_AUTO_PUSH_PAGEURL);
|
||||
at.tag = &ngx_rtmp_auto_push_module;
|
||||
|
||||
slot = ctx->slots;
|
||||
|
||||
for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) {
|
||||
if (n == ngx_process_slot) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pid = ngx_processes[n].pid;
|
||||
if (pid == 0 || pid == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (*slot) {
|
||||
continue;
|
||||
}
|
||||
|
||||
at.data = &ngx_processes[n];
|
||||
|
||||
ngx_memzero(&at.url, sizeof(at.url));
|
||||
u = &at.url.url;
|
||||
p = ngx_snprintf(path, sizeof(path) - 1,
|
||||
"unix:%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%i",
|
||||
&apcf->socket_dir, n);
|
||||
*p = 0;
|
||||
u->data = path;
|
||||
u->len = p - path;
|
||||
if (ngx_parse_url(s->connection->pool, &at.url) != NGX_OK) {
|
||||
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
|
||||
"auto_push: auto-push parse_url failed "
|
||||
"url='%V' name='%V'",
|
||||
u, &ctx->name);
|
||||
continue;
|
||||
}
|
||||
|
||||
p = ngx_snprintf(flash_ver, sizeof(flash_ver) - 1, "APSH,%i,%i",
|
||||
(ngx_int_t) ngx_process_slot, (ngx_int_t) ngx_pid);
|
||||
at.flash_ver.data = flash_ver;
|
||||
at.flash_ver.len = p - flash_ver;
|
||||
|
||||
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"auto_push: connect slot=%i pid=%i socket='%s' "
|
||||
"name='%V'",
|
||||
n, (ngx_int_t) pid, path, &ctx->name);
|
||||
|
||||
if (ngx_rtmp_relay_push(s, &ctx->name, &at) == NGX_OK) {
|
||||
*slot = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
|
||||
"auto_push: connect failed: slot=%i pid=%i socket='%s'"
|
||||
"url='%V' name='%V'",
|
||||
n, (ngx_int_t) pid, path, u, &ctx->name);
|
||||
|
||||
if (!ctx->push_evt.timer_set) {
|
||||
ngx_add_timer(&ctx->push_evt, apcf->push_reconnect);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
|
||||
{
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
ngx_rtmp_auto_push_ctx_t *ctx;
|
||||
|
||||
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
|
||||
ngx_rtmp_auto_push_module);
|
||||
if (apcf->auto_push == 0) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
/* auto-push from another worker? */
|
||||
if (s->page_url.len == sizeof(NGX_RTMP_AUTO_PUSH_PAGEURL) - 1 &&
|
||||
ngx_memcmp(s->page_url.data, NGX_RTMP_AUTO_PUSH_PAGEURL,
|
||||
s->page_url.len) == 0)
|
||||
{
|
||||
goto next;
|
||||
}
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
|
||||
if (ctx == NULL) {
|
||||
ctx = ngx_palloc(s->connection->pool,
|
||||
sizeof(ngx_rtmp_auto_push_ctx_t));
|
||||
if (ctx == NULL) {
|
||||
goto next;
|
||||
}
|
||||
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_auto_push_module);
|
||||
|
||||
}
|
||||
ngx_memzero(ctx, sizeof(*ctx));
|
||||
|
||||
ctx->push_evt.data = s;
|
||||
ctx->push_evt.log = s->connection->log;
|
||||
ctx->push_evt.handler = ngx_rtmp_auto_push_reconnect;
|
||||
|
||||
ctx->slots = ngx_pcalloc(s->connection->pool,
|
||||
sizeof(ngx_int_t) * NGX_MAX_PROCESSES);
|
||||
if (ctx->slots == NULL) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
ctx->name.len = ngx_strlen(v->name);
|
||||
ctx->name.data = ngx_palloc(s->connection->pool, ctx->name.len);
|
||||
if (ctx->name.data == NULL) {
|
||||
goto next;
|
||||
}
|
||||
ngx_memcpy(ctx->name.data, v->name, ctx->name.len);
|
||||
|
||||
ngx_rtmp_auto_push_reconnect(&ctx->push_evt);
|
||||
|
||||
next:
|
||||
return next_publish(s, v);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_delete_stream_t *v)
|
||||
{
|
||||
ngx_rtmp_auto_push_conf_t *apcf;
|
||||
ngx_rtmp_auto_push_ctx_t *ctx, *pctx;
|
||||
ngx_rtmp_relay_ctx_t *rctx;
|
||||
ngx_int_t slot;
|
||||
|
||||
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
|
||||
ngx_rtmp_auto_push_module);
|
||||
if (apcf->auto_push == 0) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_module);
|
||||
if (ctx) {
|
||||
if (ctx->push_evt.timer_set) {
|
||||
ngx_del_timer(&ctx->push_evt);
|
||||
}
|
||||
goto next;
|
||||
}
|
||||
|
||||
/* skip non-relays & publishers */
|
||||
rctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (rctx == NULL ||
|
||||
rctx->tag != &ngx_rtmp_auto_push_module ||
|
||||
rctx->publish == NULL)
|
||||
{
|
||||
goto next;
|
||||
}
|
||||
|
||||
slot = (ngx_process_t *) rctx->data - &ngx_processes[0];
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"auto_push: disconnect slot=%i app='%V' name='%V'",
|
||||
slot, &rctx->app, &rctx->name);
|
||||
|
||||
pctx = ngx_rtmp_get_module_ctx(rctx->publish->session,
|
||||
ngx_rtmp_auto_push_module);
|
||||
if (pctx == NULL) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
pctx->slots[slot] = 0;
|
||||
|
||||
/* push reconnect */
|
||||
if (!pctx->push_evt.timer_set) {
|
||||
ngx_add_timer(&pctx->push_evt, apcf->push_reconnect);
|
||||
}
|
||||
|
||||
next:
|
||||
return next_delete_stream(s, v);
|
||||
}
|
|
@ -36,33 +36,6 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
|
|||
*/
|
||||
|
||||
|
||||
typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
|
||||
|
||||
struct ngx_rtmp_relay_ctx_s {
|
||||
ngx_str_t name;
|
||||
ngx_str_t url;
|
||||
ngx_log_t log;
|
||||
ngx_rtmp_session_t *session;
|
||||
ngx_rtmp_relay_ctx_t *publish;
|
||||
ngx_rtmp_relay_ctx_t *play;
|
||||
ngx_rtmp_relay_ctx_t *next;
|
||||
unsigned relay:1;
|
||||
|
||||
ngx_str_t app;
|
||||
ngx_str_t tc_url;
|
||||
ngx_str_t page_url;
|
||||
ngx_str_t swf_url;
|
||||
ngx_str_t flash_ver;
|
||||
ngx_str_t play_path;
|
||||
ngx_int_t live;
|
||||
ngx_int_t start;
|
||||
ngx_int_t stop;
|
||||
|
||||
ngx_event_t push_evt;
|
||||
void *tag;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */
|
||||
ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */
|
||||
|
@ -221,7 +194,9 @@ ngx_rtmp_relay_reconnect(ngx_event_t *ev)
|
|||
}
|
||||
|
||||
for (pctx = ctx->play; pctx; pctx = pctx->next) {
|
||||
if (pctx->tag == target) {
|
||||
if (pctx->tag == &ngx_rtmp_relay_module &&
|
||||
pctx->data == target)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -318,6 +293,7 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
|
|||
}
|
||||
|
||||
rctx->tag = target->tag;
|
||||
rctx->data = target->data;
|
||||
|
||||
#define NGX_RTMP_RELAY_STR_COPY(to, from) \
|
||||
if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \
|
||||
|
@ -1239,7 +1215,9 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
|
|||
&ctx->app, &ctx->name);
|
||||
|
||||
/* push reconnect */
|
||||
if (ctx->relay && ctx->tag && !ctx->publish->push_evt.timer_set) {
|
||||
if (ctx->relay && ctx->tag == &ngx_rtmp_relay_module &&
|
||||
!ctx->publish->push_evt.timer_set)
|
||||
{
|
||||
ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect);
|
||||
}
|
||||
|
||||
|
@ -1326,7 +1304,8 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
|
|||
|
||||
*t = target;
|
||||
|
||||
target->tag = target;
|
||||
target->tag = &ngx_rtmp_relay_module;
|
||||
target->data = target;
|
||||
|
||||
u = &target->url;
|
||||
u->default_port = 1935;
|
||||
|
|
|
@ -23,10 +23,42 @@ typedef struct {
|
|||
ngx_int_t start;
|
||||
ngx_int_t stop;
|
||||
|
||||
void *tag;
|
||||
void *tag; /* usually module reference */
|
||||
void *data; /* module-specific data */
|
||||
} ngx_rtmp_relay_target_t;
|
||||
|
||||
|
||||
typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
|
||||
|
||||
struct ngx_rtmp_relay_ctx_s {
|
||||
ngx_str_t name;
|
||||
ngx_str_t url;
|
||||
ngx_log_t log;
|
||||
ngx_rtmp_session_t *session;
|
||||
ngx_rtmp_relay_ctx_t *publish;
|
||||
ngx_rtmp_relay_ctx_t *play;
|
||||
ngx_rtmp_relay_ctx_t *next;
|
||||
unsigned relay:1;
|
||||
|
||||
ngx_str_t app;
|
||||
ngx_str_t tc_url;
|
||||
ngx_str_t page_url;
|
||||
ngx_str_t swf_url;
|
||||
ngx_str_t flash_ver;
|
||||
ngx_str_t play_path;
|
||||
ngx_int_t live;
|
||||
ngx_int_t start;
|
||||
ngx_int_t stop;
|
||||
|
||||
ngx_event_t push_evt;
|
||||
void *tag;
|
||||
void *data;
|
||||
};
|
||||
|
||||
|
||||
extern ngx_module_t ngx_rtmp_relay_module;
|
||||
|
||||
|
||||
ngx_int_t ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name,
|
||||
ngx_rtmp_relay_target_t *target);
|
||||
ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
|
||||
|
|
Loading…
Reference in a new issue