improved relay implementation

This commit is contained in:
Roman Arutyunyan 2012-05-18 14:25:30 +04:00
parent 32279ddf26
commit 107ea3699e
4 changed files with 266 additions and 149 deletions

View file

@ -348,7 +348,7 @@ ngx_rtmp_session_t * ngx_rtmp_init_session(ngx_connection_t *c,
ngx_rtmp_addr_conf_t *addr_conf);
void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s);
void ngx_rtmp_handshake(ngx_rtmp_session_t *s);
void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s);
void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async);
void ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s);
void ngx_rtmp_cycle(ngx_rtmp_session_t *s);
ngx_int_t ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt,

View file

@ -594,7 +594,7 @@ ngx_rtmp_handshake(ngx_rtmp_session_t *s)
void
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s)
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async)
{
ngx_connection_t *c;
@ -614,6 +614,14 @@ ngx_rtmp_client_handshake(ngx_rtmp_session_t *s)
return;
}
if (async) {
ngx_add_timer(c->write, s->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
return;
}
ngx_rtmp_handshake_send(c->write);
}

View file

@ -3,7 +3,7 @@
*/
#include "ngx_rtmp.h"
#include "ngx_rtmp_relay_module.h"
#include "ngx_rtmp_cmd_module.h"
@ -47,12 +47,12 @@ typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
struct ngx_rtmp_relay_ctx_s {
ngx_str_t name;
ngx_str_t app;
ngx_rtmp_session_t *session;
ngx_rtmp_relay_target_t *target;
ngx_rtmp_relay_ctx_t *src;
ngx_rtmp_relay_ctx_t *dst;
ngx_rtmp_relay_ctx_t *publish;
ngx_rtmp_relay_ctx_t *play;
ngx_rtmp_relay_ctx_t *next;
ngx_event_t recon;
unsigned relay:1;
};
@ -157,25 +157,6 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
}
static ngx_rtmp_relay_ctx_t **
ngx_rtmp_relay_find_ctx(ngx_rtmp_relay_app_conf_t *racf, ngx_str_t *name)
{
ngx_uint_t hash;
ngx_rtmp_relay_ctx_t **cctx;
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data, name->len))
{
break;
}
}
return cctx;
}
static ngx_int_t
ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data)
{
@ -190,31 +171,31 @@ ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data,
}
static void
ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s,
ngx_rtmp_relay_target_t *target)
typedef ngx_rtmp_relay_ctx_t * (* ngx_rtmp_relay_create_ctx_pt)
(ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name, ngx_url_t *url);
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t *app,
ngx_str_t *name, ngx_url_t *url)
{
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t addr_conf;
ngx_rtmp_conf_ctx_t addr_ctx;
ngx_rtmp_session_t *rs;
ngx_rtmp_relay_app_conf_t *racf;
ngx_peer_connection_t *pc;
ngx_rtmp_session_t *rs;
ngx_connection_t *c;
ngx_pool_t *pool;
ngx_int_t rc;
ngx_rtmp_relay_ctx_t *rctx, **cctx;
ngx_rtmp_addr_conf_t addr_conf;
ngx_rtmp_conf_ctx_t addr_ctx;
ngx_rtmp_relay_ctx_t *ctx;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return;
}
pool = NULL;
pool = ngx_create_pool(4096, racf->log);
if (pool == NULL) {
return;
return NULL;
}
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
@ -222,49 +203,36 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s,
goto clear;
}
rctx->target = target;
rctx->name.len = ctx->name.len;
rctx->name.data = ngx_palloc(pool, ctx->name.len);
ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len);
cctx = ngx_rtmp_relay_find_ctx(racf, &rctx->name);
if (*cctx) {
/* add more pushes */
if ((*cctx)->target == NULL) {
rctx->src = (*cctx)->src;
rctx->next = (*cctx)->dst;
(*cctx)->dst = rctx;
}
} else if (target->push) {
/* the first push */
ctx->next = *cctx;
*cctx = ctx;
ctx->src = ctx;
ctx->dst = rctx;
rctx->src = ctx;
} else {
/* pull */
rctx->next = *cctx;
*cctx = rctx;
rctx->src = rctx;
rctx->dst = ctx;
ctx->src = rctx;
rctx->name.len = name->len;
rctx->name.data = ngx_palloc(pool, name->len);
if (rctx->name.data == NULL) {
goto clear;
}
ngx_memcpy(rctx->name.data, name->data, rctx->name.len);
rctx->app.len = app->len;
rctx->app.data = ngx_palloc(pool, app->len);
if (rctx->app.data == NULL) {
goto clear;
}
ngx_memcpy(rctx->app.data, app->data, app->len);
rctx->relay = 1;
/* connect */
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
}
pc->log = racf->log;
pc->get = ngx_rtmp_relay_get_peer;
pc->free = ngx_rtmp_relay_free_peer;
pc->name = &target->url.host;
pc->socklen = target->url.socklen;
pc->sockaddr = (struct sockaddr *)&target->url.sockaddr;
pc->log = racf->log;
pc->get = ngx_rtmp_relay_get_peer;
pc->free = ngx_rtmp_relay_free_peer;
pc->name = &url->host;
pc->socklen = url->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
ngx_memcpy(pc->sockaddr, &url->sockaddr, pc->socklen);
rc = ngx_event_connect_peer(pc);
if (rc != NGX_OK && rc != NGX_AGAIN ) {
@ -272,7 +240,6 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s,
"relay: connection failed");
goto clear;
}
c = pc->connection;
c->pool = pool;
@ -285,102 +252,178 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s,
rs = ngx_rtmp_init_session(c, &addr_conf);
if (rs == NULL) {
/* no need to destroy pool */
return;
return NULL;
}
rs->app_conf = s->app_conf;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
ngx_str_set(&rs->flashver, "ngx-local-relay");
ngx_rtmp_client_handshake(rs);
return;
ngx_rtmp_client_handshake(rs, 1);
return rctx;
clear:
if (pool) {
ngx_destroy_pool(pool);
}
return NULL;
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *app,
ngx_str_t *name, ngx_url_t *url)
{
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->name.len = name->len;
ctx->name.data = ngx_palloc(s->connection->pool, name->len);
if (ctx->name.data == NULL) {
return NULL;
}
ngx_memcpy(ctx->name.data, name->data, ctx->name.len);
ctx->app.len = app->len;
ctx->app.data = ngx_palloc(s->connection->pool, app->len);
if (ctx->app.data == NULL) {
return NULL;
}
ngx_memcpy(ctx->app.data, app->data, app->len);
return ctx;
}
static ngx_int_t
ngx_rtmp_relay_init_local(ngx_rtmp_session_t *s, u_char *name)
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *app,
ngx_str_t *name, ngx_url_t *url,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
size_t n, len;
ngx_rtmp_relay_target_t *target;
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *ctx, **cctx;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
len = ngx_strlen(name);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
/* is this local relay end? */
if (ctx && ctx->src == ctx) {
return NGX_OK;
play_ctx = create_play_ctx(s, app, name, url);
if (play_ctx == NULL) {
return NGX_ERROR;
}
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->name.len = len;
ctx->name.data = ngx_palloc(s->connection->pool, len);
ngx_memcpy(ctx->name.data, name, len);
/* find relay stream */
cctx = ngx_rtmp_relay_find_ctx(racf, &ctx->name);
if (*cctx) {
/* add player to pull stream */
if ((*cctx)->target) {
ctx->src = (*cctx)->src;
ctx->next = (*cctx)->dst;
(*cctx)->dst = ctx;
}
return NGX_OK;
}
/* create relays */
target = racf->targets.elts;
for (n = 0; n < racf->targets.nelts; ++n, ++target) {
if (target->name.len == 0 ||
(len == target->name.len
&& !ngx_memcmp(name, target->name.data, len)))
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create: name='%s' url='%V'",
name, &target->url);
ngx_rtmp_relay_init_remote(s, target);
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
publish_ctx = create_publish_ctx(s, app, name, url);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
ngx_int_t
ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name,
ngx_url_t *url)
{
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create pull app='%V' name='%V' url='%V'",
app, name, url);
return ngx_rtmp_relay_create(s, app, name, url,
ngx_rtmp_relay_create_remote_ctx,
ngx_rtmp_relay_create_local_ctx);
}
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name,
ngx_url_t *url)
{
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create push app='%V' name='%V' url='%V'",
app, name, url);
return ngx_rtmp_relay_create(s, app, name, url,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && ctx->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->targets.nelts == 0) {
goto next;
}
ngx_rtmp_relay_init_local(s, v->name);
name.len = ngx_strlen(v->name);
name.data = v->name;
target = racf->targets.elts;
for (n = 0; n < racf->targets.nelts; ++n, ++target) {
if (target->push
&& (target->name.len == 0
|| (name.len == target->name.len
&& !ngx_memcmp(name.data, target->name.data, name.len))))
{
if (ngx_rtmp_relay_push(s, &target->app, &name, &target->url)
!= NGX_OK)
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push failed app='%V' name='%V' url='%V'",
&target->app, &target->name, &target->url.url);
}
}
}
next:
return next_publish(s, v);
@ -391,13 +434,41 @@ static ngx_int_t
ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && ctx->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->targets.nelts == 0) {
goto next;
}
ngx_rtmp_relay_init_local(s, v->name);
name.len = ngx_strlen(v->name);
name.data = v->name;
target = racf->targets.elts;
for (n = 0; n < racf->targets.nelts; ++n, ++target) {
if (!target->push
&& (target->name.len == 0
|| (name.len == target->name.len
&& !ngx_memcmp(name.data, target->name.data, name.len))))
{
if (ngx_rtmp_relay_pull(s, &target->app, &name, &target->url)
!= NGX_OK)
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: pull failed app='%V' name='%V' url='%V'",
&target->app, &target->name, &target->url.url);
}
break;
}
}
next:
return next_play(s, v);
@ -482,8 +553,8 @@ ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
return NGX_ERROR;
}
out_cmd[0].data = ctx->target->app.data;
out_cmd[0].len = ctx->target->app.len;
out_cmd[0].data = ctx->app.data;
out_cmd[0].len = ctx->app.len;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
@ -632,7 +703,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || ctx->target == NULL) {
if (ctx == NULL || !ctx->relay) {
return NGX_OK;
}
@ -649,7 +720,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->target->push) {
if (ctx->publish != ctx) {
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
@ -675,7 +746,7 @@ ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || ctx->src == NULL) {
if (ctx == NULL || ctx->publish == NULL) {
return NGX_OK;
}
@ -692,35 +763,54 @@ ngx_rtmp_relay_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_uint_t hash;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || ctx->src == NULL) {
if (ctx == NULL || ctx->publish == NULL) {
return NGX_OK;
}
/* destination end disconnecting */
if (ctx->src != ctx) {
for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) {
/* play end disconnect? */
if (ctx->publish != ctx) {
for (cctx = &ctx->publish->play; *cctx; cctx = &(*cctx)->next) {
if (*cctx == ctx) {
*cctx = ctx->next;
break;
}
}
/*TODO: add push reconnect */
/* if (ctx->target) {...} */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0,
"relay: play disconnect app='%V' name='%V'",
&ctx->app, &ctx->name);
if (ctx->src->dst == NULL) {
ngx_rtmp_finalize_session(ctx->src->session);
/*TODO: add push reconnect */
/*
if (ctx->relay) {
ngx_rtmp_relay_push(ctx-publish->session,
&ctx->publish->name, &target);
}*/
if (ctx->publish->play == NULL) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP,
ctx->publish->session->connection->log, 0,
"relay: publish disconnect empty app='%V' name='%V'",
&ctx->app, &ctx->name);
ngx_rtmp_finalize_session(ctx->publish->session);
}
return NGX_OK;
}
/* source end disconnecting */
for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) {
(*cctx)->src = NULL;
/* publish end disconnect */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0,
"relay: publish disconnect app='%V' name='%V'",
&ctx->app, &ctx->name);
for (cctx = &ctx->play; *cctx; cctx = &(*cctx)->next) {
(*cctx)->publish = NULL;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, (*cctx)->session->connection->log,
0, "relay: play disconnect orphan app='%V' name='%V'",
&(*cctx)->app, &(*cctx)->name);
ngx_rtmp_finalize_session((*cctx)->session);
}
ctx->src = NULL;
ctx->publish = NULL;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
hash = ngx_hash_key(ctx->name.data, ctx->name.len);

19
ngx_rtmp_relay_module.h Normal file
View file

@ -0,0 +1,19 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#ifndef _NGX_RTMP_RELAY_H_INCLUDED_
#define _NGX_RTMP_RELAY_H_INCLUDED_
#include "ngx_rtmp.h"
ngx_int_t ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *app,
ngx_str_t *name, ngx_url_t *url);
ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *app,
ngx_str_t *name, ngx_url_t *url);
#endif /* _NGX_RTMP_RELAY_H_INCLUDED_ */