implemented static pulls

This commit is contained in:
Roman Arutyunyan 2013-01-14 22:58:10 +04:00
parent ba23d995d1
commit 3bd60857bd
4 changed files with 273 additions and 78 deletions

View file

@ -214,6 +214,7 @@ typedef struct {
/* auto-pushed? */
unsigned auto_pushed:1;
unsigned relay:1;
unsigned static_relay:1;
/* input stream 0 (reserved by RTMP spec)
* is used as free chain link */

View file

@ -401,7 +401,7 @@ ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
ngx_rtmp_auto_push_conf_t *apcf;
ngx_rtmp_auto_push_ctx_t *ctx;
if (s->auto_pushed || s->relay) {
if (s->auto_pushed || (s->relay && !s->static_relay)) {
goto next;
}

View file

@ -12,14 +12,18 @@ static ngx_rtmp_play_pt next_play;
static ngx_rtmp_delete_stream_pt next_delete_stream;
static ngx_int_t ngx_rtmp_relay_init_process(ngx_cycle_t *cycle);
static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf);
static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf,
void *parent, void *child);
void *parent, void *child);
static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
void *conf);
static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
ngx_rtmp_publish_t *v);
static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection(
ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target);
/* _____
@ -37,16 +41,25 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
typedef struct {
ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */
ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */
ngx_log_t *log;
ngx_uint_t nbuckets;
ngx_msec_t buflen;
ngx_msec_t push_reconnect;
ngx_rtmp_relay_ctx_t **ctx;
ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */
ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */
ngx_array_t static_pulls; /* ngx_rtmp_relay_target_t * */
ngx_array_t static_events; /* ngx_event_t * */
ngx_log_t *log;
ngx_uint_t nbuckets;
ngx_msec_t buflen;
ngx_msec_t push_reconnect;
ngx_msec_t pull_reconnect;
ngx_rtmp_relay_ctx_t **ctx;
} ngx_rtmp_relay_app_conf_t;
typedef struct {
ngx_rtmp_conf_ctx_t cctx;
ngx_rtmp_relay_target_t *target;
} ngx_rtmp_relay_static_t;
#define NGX_RTMP_RELAY_CONNECT_TRANS 1
#define NGX_RTMP_RELAY_CREATE_STREAM_TRANS 2
@ -89,6 +102,13 @@ static ngx_command_t ngx_rtmp_relay_commands[] = {
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, push_reconnect),
NULL },
{ ngx_string("pull_reconnect"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, pull_reconnect),
NULL },
ngx_null_command
@ -114,7 +134,7 @@ ngx_module_t ngx_rtmp_relay_module = {
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
ngx_rtmp_relay_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
@ -133,14 +153,31 @@ ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf)
return NULL;
}
ngx_array_init(&racf->pushes, cf->pool, 1,
sizeof(ngx_rtmp_relay_target_t *));
ngx_array_init(&racf->pulls, cf->pool, 1,
sizeof(ngx_rtmp_relay_target_t *));
if (ngx_array_init(&racf->pushes, cf->pool, 1, sizeof(void *)) != NGX_OK) {
return NULL;
}
if (ngx_array_init(&racf->pulls, cf->pool, 1, sizeof(void *)) != NGX_OK) {
return NULL;
}
if (ngx_array_init(&racf->static_pulls, cf->pool, 1, sizeof(void *))
!= NGX_OK)
{
return NULL;
}
if (ngx_array_init(&racf->static_events, cf->pool, 1, sizeof(void *))
!= NGX_OK)
{
return NULL;
}
racf->nbuckets = 1024;
racf->log = &cf->cycle->new_log;
racf->buflen = NGX_CONF_UNSET;
racf->push_reconnect = NGX_CONF_UNSET;
racf->pull_reconnect = NGX_CONF_UNSET;
return racf;
}
@ -158,13 +195,40 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 5000);
ngx_conf_merge_msec_value(conf->push_reconnect, prev->push_reconnect,
3000);
ngx_conf_merge_msec_value(conf->pull_reconnect, prev->pull_reconnect,
3000);
return NGX_CONF_OK;
}
static void
ngx_rtmp_relay_reconnect(ngx_event_t *ev)
ngx_rtmp_relay_static_pull_reconnect(ngx_event_t *ev)
{
ngx_rtmp_relay_static_t *rs = ev->data;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_relay_app_conf_t *racf;
racf = ngx_rtmp_get_module_app_conf(&rs->cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: reconnecting static pull");
ctx = ngx_rtmp_relay_create_connection(&rs->cctx, &rs->target->name,
rs->target);
if (ctx) {
ctx->session->static_relay = 1;
ctx->static_evt = ev;
return;
}
ngx_add_timer(ev, racf->pull_reconnect);
}
static void
ngx_rtmp_relay_push_reconnect(ngx_event_t *ev)
{
ngx_rtmp_session_t *s = ev->data;
@ -257,14 +321,14 @@ ngx_rtmp_relay_copy_str(ngx_pool_t *pool, ngx_str_t *dst, ngx_str_t *src)
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
ngx_rtmp_relay_app_conf_t *racf;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_pool_t *pool;
@ -272,10 +336,10 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_str_t v, *uri;
u_char *first, *last, *p;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create remote context");
racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: create remote context");
pool = NULL;
pool = ngx_create_pool(4096, racf->log);
@ -288,9 +352,11 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
goto clear;
}
if (ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK ||
ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK)
{
if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
goto clear;
}
if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
goto clear;
}
@ -357,8 +423,6 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
}
}
rctx->relay = 1;
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
@ -395,8 +459,8 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
goto clear;
}
addr_conf->ctx = addr_ctx;
addr_ctx->main_conf = s->main_conf;
addr_ctx->srv_conf = s->srv_conf;
addr_ctx->main_conf = cctx->main_conf;
addr_ctx->srv_conf = cctx->srv_conf;
ngx_str_set(&addr_conf->addr_text, "ngx-relay");
rs = ngx_rtmp_init_session(c, addr_conf);
@ -404,7 +468,7 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
/* no need to destroy pool */
return NULL;
}
rs->app_conf = s->app_conf;
rs->app_conf = cctx->app_conf;
rs->relay = 1;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
@ -421,6 +485,20 @@ clear:
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_conf_ctx_t cctx;
cctx.app_conf = s->app_conf;
cctx.srv_conf = s->srv_conf;
cctx.main_conf = s->main_conf;
return ngx_rtmp_relay_create_connection(&cctx, name, target);
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
@ -442,7 +520,7 @@ ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
ctx->push_evt.handler = ngx_rtmp_relay_reconnect;
ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;
if (ctx->publish) {
return NULL;
@ -554,7 +632,7 @@ ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && ctx->relay) {
if (ctx && s->relay) {
goto next;
}
@ -606,7 +684,7 @@ ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && ctx->relay) {
if (ctx && s->relay) {
goto next;
}
@ -1022,7 +1100,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !ctx->relay) {
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
@ -1042,7 +1120,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx) {
if (ctx->publish != ctx && !s->static_relay) {
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
@ -1105,7 +1183,7 @@ ngx_rtmp_relay_on_error(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !ctx->relay) {
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
@ -1175,7 +1253,7 @@ ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !ctx->relay) {
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
@ -1200,10 +1278,10 @@ static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || ctx->publish == NULL) {
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
@ -1221,7 +1299,16 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || ctx->publish == NULL) {
if (ctx == NULL) {
goto next;
}
if (s->static_relay) {
ngx_add_timer(ctx->static_evt, racf->pull_reconnect);
goto next;
}
if (ctx->publish == NULL) {
goto next;
}
@ -1239,7 +1326,7 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
&ctx->app, &ctx->name);
/* push reconnect */
if (ctx->relay && ctx->tag == &ngx_rtmp_relay_module &&
if (s->relay && ctx->tag == &ngx_rtmp_relay_module &&
!ctx->publish->push_evt.timer_set)
{
ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect);
@ -1255,7 +1342,7 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
}
#endif
if (ctx->publish->play == NULL && ctx->publish->relay) {
if (ctx->publish->play == NULL && ctx->publish->session->relay) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP,
ctx->publish->session->connection->log, 0,
"relay: publish disconnect empty app='%V' name='%V'",
@ -1306,28 +1393,23 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
ngx_rtmp_relay_target_t *target, **t;
ngx_url_t *u;
ngx_uint_t i;
ngx_int_t is_pull, is_static;
ngx_event_t **ee, *e;
ngx_rtmp_relay_static_t *rs;
u_char *p;
value = cf->args->elts;
racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module);
t = ngx_array_push(value[0].data[3] == 'h'
? &racf->pushes /* push */
: &racf->pulls /* pull */
);
if (t == NULL) {
return NGX_CONF_ERROR;
}
is_pull = (value[0].data[3] == 'l');
is_static = 0;
target = ngx_pcalloc(cf->pool, sizeof(*target));
if (target == NULL) {
return NGX_CONF_ERROR;
}
*t = target;
target->tag = &ngx_rtmp_relay_module;
target->data = target;
@ -1352,57 +1434,169 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
value += 2;
for (i = 2; i < cf->args->nelts; ++i, ++value) {
p = ngx_strlchr(value->data, value->data + value->len, '=');
if (p == NULL) {
return "key=value expected";
n = *value;
ngx_str_set(&v, "1");
} else {
n.data = value->data;
n.len = p - value->data;
v.data = p + 1;
v.len = value->data + value->len - p - 1;
}
if (p == value->data + value->len - 1) {
continue;
}
n.data = value->data;
n.len = p - value->data;
v.data = p + 1;
v.len = value->data + value->len - p - 1;
#define NGX_RTMP_RELAY_STR_PAR(name, var) \
if (n.len == sizeof(#name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *)#name, n.len) == 0) \
if (n.len == sizeof(name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \
{ \
target->var = v; \
continue; \
}
#define NGX_RTMP_RELAY_NUM_PAR(name, var) \
if (n.len == sizeof(#name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *)#name, n.len) == 0) \
if (n.len == sizeof(name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \
{ \
target->var = ngx_atoi(v.data, v.len); \
continue; \
}
NGX_RTMP_RELAY_STR_PAR(app, app);
NGX_RTMP_RELAY_STR_PAR(name, name);
NGX_RTMP_RELAY_STR_PAR(tcUrl, tc_url);
NGX_RTMP_RELAY_STR_PAR(pageUrl, page_url);
NGX_RTMP_RELAY_STR_PAR(swfUrl, swf_url);
NGX_RTMP_RELAY_STR_PAR(flashVer, flash_ver);
NGX_RTMP_RELAY_STR_PAR(playPath, play_path);
NGX_RTMP_RELAY_NUM_PAR(live, live);
NGX_RTMP_RELAY_NUM_PAR(start, start);
NGX_RTMP_RELAY_NUM_PAR(stop, stop);
NGX_RTMP_RELAY_STR_PAR("app", app);
NGX_RTMP_RELAY_STR_PAR("name", name);
NGX_RTMP_RELAY_STR_PAR("tcUrl", tc_url);
NGX_RTMP_RELAY_STR_PAR("pageUrl", page_url);
NGX_RTMP_RELAY_STR_PAR("swfUrl", swf_url);
NGX_RTMP_RELAY_STR_PAR("flashVer", flash_ver);
NGX_RTMP_RELAY_STR_PAR("playPath", play_path);
NGX_RTMP_RELAY_NUM_PAR("live", live);
NGX_RTMP_RELAY_NUM_PAR("start", start);
NGX_RTMP_RELAY_NUM_PAR("stop", stop);
#undef NGX_RTMP_RELAY_STR_PAR
#undef NGX_RTMP_RELAY_NUM_PAR
if (n.len == sizeof("static") - 1 &&
ngx_strncasecmp(n.data, (u_char *) "static", n.len) == 0 &&
ngx_atoi(v.data, v.len))
{
is_static = 1;
continue;
}
return "unsuppored parameter";
}
if (is_static) {
if (!is_pull) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"static push is not allowed");
return NGX_CONF_ERROR;
}
if (target->name.len == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"stream name missing in static pull "
"declaration");
return NGX_CONF_ERROR;
}
ee = ngx_array_push(&racf->static_events);
if (ee == NULL) {
return NGX_CONF_ERROR;
}
e = ngx_pcalloc(cf->pool, sizeof(ngx_event_t));
if (e == NULL) {
return NGX_CONF_ERROR;
}
*ee = e;
rs = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_static_t));
if (rs == NULL) {
return NGX_CONF_ERROR;
}
rs->target = target;
e->data = rs;
e->log = &cf->cycle->new_log;
e->handler = ngx_rtmp_relay_static_pull_reconnect;
t = ngx_array_push(&racf->static_pulls);
} else if (is_pull) {
t = ngx_array_push(&racf->pulls);
} else {
t = ngx_array_push(&racf->pushes);
}
if (t == NULL) {
return NGX_CONF_ERROR;
}
*t = target;
return NGX_CONF_OK;
}
static ngx_int_t
ngx_rtmp_relay_init_process(ngx_cycle_t *cycle)
{
ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf;
ngx_rtmp_core_srv_conf_t **pcscf, *cscf;
ngx_rtmp_core_app_conf_t **pcacf, *cacf;
ngx_rtmp_relay_app_conf_t *racf;
ngx_uint_t n, m, k;
ngx_rtmp_relay_static_t *rs;
ngx_rtmp_listen_t *lst;
ngx_event_t **pevent, *event;
if (cmcf->listen.nelts == 0) {
return NGX_OK;
}
/* only first worker does static pulling */
if (ngx_process_slot) {
return NGX_OK;
}
lst = cmcf->listen.elts;
pcscf = cmcf->servers.elts;
for (n = 0; n < cmcf->servers.nelts; ++n, ++pcscf) {
cscf = *pcscf;
pcacf = cscf->applications.elts;
for (m = 0; m < cscf->applications.nelts; ++m, ++pcacf) {
cacf = *pcacf;
racf = cacf->app_conf[ngx_rtmp_relay_module.ctx_index];
pevent = racf->static_events.elts;
for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) {
event = *pevent;
rs = event->data;
rs->cctx = *lst->ctx;
rs->cctx.app_conf = cacf->app_conf;
ngx_post_event(event, &ngx_posted_events);
}
}
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf)
{

View file

@ -38,7 +38,6 @@ struct ngx_rtmp_relay_ctx_s {
ngx_rtmp_relay_ctx_t *publish;
ngx_rtmp_relay_ctx_t *play;
ngx_rtmp_relay_ctx_t *next;
unsigned relay:1;
ngx_str_t app;
ngx_str_t tc_url;
@ -51,6 +50,7 @@ struct ngx_rtmp_relay_ctx_s {
ngx_int_t stop;
ngx_event_t push_evt;
ngx_event_t *static_evt;
void *tag;
void *data;
};