From 107ea3699e1ba796b42667f025e7c2400d012098 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Fri, 18 May 2012 14:25:30 +0400 Subject: [PATCH] improved relay implementation --- ngx_rtmp.h | 2 +- ngx_rtmp_handshake.c | 10 +- ngx_rtmp_relay_module.c | 384 +++++++++++++++++++++++++--------------- ngx_rtmp_relay_module.h | 19 ++ 4 files changed, 266 insertions(+), 149 deletions(-) create mode 100644 ngx_rtmp_relay_module.h diff --git a/ngx_rtmp.h b/ngx_rtmp.h index b8cda07..506033d 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -348,7 +348,7 @@ ngx_rtmp_session_t * ngx_rtmp_init_session(ngx_connection_t *c, ngx_rtmp_addr_conf_t *addr_conf); void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s); void ngx_rtmp_handshake(ngx_rtmp_session_t *s); -void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s); +void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async); void ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s); void ngx_rtmp_cycle(ngx_rtmp_session_t *s); ngx_int_t ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt, diff --git a/ngx_rtmp_handshake.c b/ngx_rtmp_handshake.c index 5c25258..7d1f316 100644 --- a/ngx_rtmp_handshake.c +++ b/ngx_rtmp_handshake.c @@ -594,7 +594,7 @@ ngx_rtmp_handshake(ngx_rtmp_session_t *s) void -ngx_rtmp_client_handshake(ngx_rtmp_session_t *s) +ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async) { ngx_connection_t *c; @@ -614,6 +614,14 @@ ngx_rtmp_client_handshake(ngx_rtmp_session_t *s) return; } + if (async) { + ngx_add_timer(c->write, s->timeout); + if (ngx_handle_write_event(c->write, 0) != NGX_OK) { + ngx_rtmp_finalize_session(s); + } + return; + } + ngx_rtmp_handshake_send(c->write); } diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 979ce56..bad7cf6 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -3,7 +3,7 @@ */ -#include "ngx_rtmp.h" +#include "ngx_rtmp_relay_module.h" #include "ngx_rtmp_cmd_module.h" @@ -47,12 +47,12 @@ typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t; struct ngx_rtmp_relay_ctx_s { ngx_str_t name; + ngx_str_t app; ngx_rtmp_session_t *session; - ngx_rtmp_relay_target_t *target; - ngx_rtmp_relay_ctx_t *src; - ngx_rtmp_relay_ctx_t *dst; + ngx_rtmp_relay_ctx_t *publish; + ngx_rtmp_relay_ctx_t *play; ngx_rtmp_relay_ctx_t *next; - ngx_event_t recon; + unsigned relay:1; }; @@ -157,25 +157,6 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) } -static ngx_rtmp_relay_ctx_t ** -ngx_rtmp_relay_find_ctx(ngx_rtmp_relay_app_conf_t *racf, ngx_str_t *name) -{ - ngx_uint_t hash; - ngx_rtmp_relay_ctx_t **cctx; - - hash = ngx_hash_key(name->data, name->len); - cctx = &racf->ctx[hash % racf->nbuckets]; - for (; *cctx; cctx = &(*cctx)->next) { - if ((*cctx)->name.len == name->len - && !ngx_memcmp(name->data, (*cctx)->name.data, name->len)) - { - break; - } - } - return cctx; -} - - static ngx_int_t ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data) { @@ -190,31 +171,31 @@ ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data, } -static void -ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s, - ngx_rtmp_relay_target_t *target) +typedef ngx_rtmp_relay_ctx_t * (* ngx_rtmp_relay_create_ctx_pt) + (ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name, ngx_url_t *url); + + +static ngx_rtmp_relay_ctx_t * +ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t *app, + ngx_str_t *name, ngx_url_t *url) { + ngx_rtmp_relay_ctx_t *rctx; + ngx_rtmp_addr_conf_t addr_conf; + ngx_rtmp_conf_ctx_t addr_ctx; + ngx_rtmp_session_t *rs; ngx_rtmp_relay_app_conf_t *racf; ngx_peer_connection_t *pc; - ngx_rtmp_session_t *rs; ngx_connection_t *c; ngx_pool_t *pool; ngx_int_t rc; - ngx_rtmp_relay_ctx_t *rctx, **cctx; - ngx_rtmp_addr_conf_t addr_conf; - ngx_rtmp_conf_ctx_t addr_ctx; - ngx_rtmp_relay_ctx_t *ctx; + racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL) { - return; - } pool = NULL; pool = ngx_create_pool(4096, racf->log); if (pool == NULL) { - return; + return NULL; } rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t)); @@ -222,49 +203,36 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s, goto clear; } - rctx->target = target; - rctx->name.len = ctx->name.len; - rctx->name.data = ngx_palloc(pool, ctx->name.len); - ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len); - - cctx = ngx_rtmp_relay_find_ctx(racf, &rctx->name); - if (*cctx) { - /* add more pushes */ - if ((*cctx)->target == NULL) { - rctx->src = (*cctx)->src; - rctx->next = (*cctx)->dst; - (*cctx)->dst = rctx; - } - - } else if (target->push) { - /* the first push */ - ctx->next = *cctx; - *cctx = ctx; - ctx->src = ctx; - ctx->dst = rctx; - rctx->src = ctx; - - } else { - /* pull */ - rctx->next = *cctx; - *cctx = rctx; - rctx->src = rctx; - rctx->dst = ctx; - ctx->src = rctx; + rctx->name.len = name->len; + rctx->name.data = ngx_palloc(pool, name->len); + if (rctx->name.data == NULL) { + goto clear; } + ngx_memcpy(rctx->name.data, name->data, rctx->name.len); + + rctx->app.len = app->len; + rctx->app.data = ngx_palloc(pool, app->len); + if (rctx->app.data == NULL) { + goto clear; + } + ngx_memcpy(rctx->app.data, app->data, app->len); + + rctx->relay = 1; - /* connect */ pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t)); if (pc == NULL) { goto clear; } - - pc->log = racf->log; - pc->get = ngx_rtmp_relay_get_peer; - pc->free = ngx_rtmp_relay_free_peer; - pc->name = &target->url.host; - pc->socklen = target->url.socklen; - pc->sockaddr = (struct sockaddr *)&target->url.sockaddr; + pc->log = racf->log; + pc->get = ngx_rtmp_relay_get_peer; + pc->free = ngx_rtmp_relay_free_peer; + pc->name = &url->host; + pc->socklen = url->socklen; + pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen); + if (pc->sockaddr == NULL) { + goto clear; + } + ngx_memcpy(pc->sockaddr, &url->sockaddr, pc->socklen); rc = ngx_event_connect_peer(pc); if (rc != NGX_OK && rc != NGX_AGAIN ) { @@ -272,7 +240,6 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s, "relay: connection failed"); goto clear; } - c = pc->connection; c->pool = pool; @@ -285,102 +252,178 @@ ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s, rs = ngx_rtmp_init_session(c, &addr_conf); if (rs == NULL) { /* no need to destroy pool */ - return; + return NULL; } - rs->app_conf = s->app_conf; rctx->session = rs; ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module); - ngx_str_set(&rs->flashver, "ngx-local-relay"); - - ngx_rtmp_client_handshake(rs); - - return; + + ngx_rtmp_client_handshake(rs, 1); + return rctx; clear: if (pool) { ngx_destroy_pool(pool); } + return NULL; +} + + +static ngx_rtmp_relay_ctx_t * +ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *app, + ngx_str_t *name, ngx_url_t *url) +{ + ngx_rtmp_relay_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (ctx == NULL) { + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t)); + if (ctx == NULL) { + return NULL; + } + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module); + } + ctx->session = s; + + ctx->name.len = name->len; + ctx->name.data = ngx_palloc(s->connection->pool, name->len); + if (ctx->name.data == NULL) { + return NULL; + } + ngx_memcpy(ctx->name.data, name->data, ctx->name.len); + + ctx->app.len = app->len; + ctx->app.data = ngx_palloc(s->connection->pool, app->len); + if (ctx->app.data == NULL) { + return NULL; + } + ngx_memcpy(ctx->app.data, app->data, app->len); + + return ctx; } static ngx_int_t -ngx_rtmp_relay_init_local(ngx_rtmp_session_t *s, u_char *name) +ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *app, + ngx_str_t *name, ngx_url_t *url, + ngx_rtmp_relay_create_ctx_pt create_publish_ctx, + ngx_rtmp_relay_create_ctx_pt create_play_ctx) { - size_t n, len; - ngx_rtmp_relay_target_t *target; ngx_rtmp_relay_app_conf_t *racf; - ngx_rtmp_relay_ctx_t *ctx, **cctx; + ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx; + ngx_uint_t hash; + racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL) { return NGX_ERROR; } - len = ngx_strlen(name); - - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - - /* is this local relay end? */ - if (ctx && ctx->src == ctx) { - return NGX_OK; + play_ctx = create_play_ctx(s, app, name, url); + if (play_ctx == NULL) { + return NGX_ERROR; } - if (ctx == NULL) { - ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t)); - if (ctx == NULL) { - return NGX_ERROR; - } - ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module); - } - ctx->session = s; - ctx->name.len = len; - ctx->name.data = ngx_palloc(s->connection->pool, len); - ngx_memcpy(ctx->name.data, name, len); - - /* find relay stream */ - cctx = ngx_rtmp_relay_find_ctx(racf, &ctx->name); - if (*cctx) { - /* add player to pull stream */ - if ((*cctx)->target) { - ctx->src = (*cctx)->src; - ctx->next = (*cctx)->dst; - (*cctx)->dst = ctx; - } - return NGX_OK; - } - - /* create relays */ - target = racf->targets.elts; - for (n = 0; n < racf->targets.nelts; ++n, ++target) { - if (target->name.len == 0 || - (len == target->name.len - && !ngx_memcmp(name, target->name.data, len))) + hash = ngx_hash_key(name->data, name->len); + cctx = &racf->ctx[hash % racf->nbuckets]; + for (; *cctx; cctx = &(*cctx)->next) { + if ((*cctx)->name.len == name->len + && !ngx_memcmp(name->data, (*cctx)->name.data, + name->len)) { - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "relay: create: name='%s' url='%V'", - name, &target->url); - - ngx_rtmp_relay_init_remote(s, target); + break; } } + if (*cctx) { + play_ctx->publish = (*cctx)->publish; + play_ctx->next = (*cctx)->play; + (*cctx)->play = play_ctx; + return NGX_OK; + } + + publish_ctx = create_publish_ctx(s, app, name, url); + if (publish_ctx == NULL) { + ngx_rtmp_finalize_session(play_ctx->session); + return NGX_ERROR; + } + + publish_ctx->publish = publish_ctx; + publish_ctx->play = play_ctx; + play_ctx->publish = publish_ctx; + *cctx = publish_ctx; + return NGX_OK; } +ngx_int_t +ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name, + ngx_url_t *url) +{ + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "relay: create pull app='%V' name='%V' url='%V'", + app, name, url); + + return ngx_rtmp_relay_create(s, app, name, url, + ngx_rtmp_relay_create_remote_ctx, + ngx_rtmp_relay_create_local_ctx); +} + + +ngx_int_t +ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *app, ngx_str_t *name, + ngx_url_t *url) +{ + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "relay: create push app='%V' name='%V' url='%V'", + app, name, url); + + return ngx_rtmp_relay_create(s, app, name, url, + ngx_rtmp_relay_create_local_ctx, + ngx_rtmp_relay_create_remote_ctx); +} + + static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { ngx_rtmp_relay_app_conf_t *racf; + ngx_rtmp_relay_target_t *target; + ngx_str_t name; + size_t n; + ngx_rtmp_relay_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (ctx && ctx->relay) { + goto next; + } racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL || racf->targets.nelts == 0) { goto next; } - ngx_rtmp_relay_init_local(s, v->name); + name.len = ngx_strlen(v->name); + name.data = v->name; + + target = racf->targets.elts; + for (n = 0; n < racf->targets.nelts; ++n, ++target) { + if (target->push + && (target->name.len == 0 + || (name.len == target->name.len + && !ngx_memcmp(name.data, target->name.data, name.len)))) + { + if (ngx_rtmp_relay_push(s, &target->app, &name, &target->url) + != NGX_OK) + { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "relay: push failed app='%V' name='%V' url='%V'", + &target->app, &target->name, &target->url.url); + } + } + } next: return next_publish(s, v); @@ -391,13 +434,41 @@ static ngx_int_t ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { ngx_rtmp_relay_app_conf_t *racf; + ngx_rtmp_relay_target_t *target; + ngx_str_t name; + size_t n; + ngx_rtmp_relay_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); + if (ctx && ctx->relay) { + goto next; + } racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL || racf->targets.nelts == 0) { goto next; } - ngx_rtmp_relay_init_local(s, v->name); + name.len = ngx_strlen(v->name); + name.data = v->name; + + target = racf->targets.elts; + for (n = 0; n < racf->targets.nelts; ++n, ++target) { + if (!target->push + && (target->name.len == 0 + || (name.len == target->name.len + && !ngx_memcmp(name.data, target->name.data, name.len)))) + { + if (ngx_rtmp_relay_pull(s, &target->app, &name, &target->url) + != NGX_OK) + { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "relay: pull failed app='%V' name='%V' url='%V'", + &target->app, &target->name, &target->url.url); + } + break; + } + } next: return next_play(s, v); @@ -482,8 +553,8 @@ ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s) return NGX_ERROR; } - out_cmd[0].data = ctx->target->app.data; - out_cmd[0].len = ctx->target->app.len; + out_cmd[0].data = ctx->app.data; + out_cmd[0].len = ctx->app.len; ngx_memzero(&h, sizeof(h)); h.csid = NGX_RTMP_RELAY_CSID_AMF_INI; @@ -632,7 +703,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || ctx->target == NULL) { + if (ctx == NULL || !ctx->relay) { return NGX_OK; } @@ -649,7 +720,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return ngx_rtmp_relay_send_create_stream(s); case NGX_RTMP_RELAY_CREATE_STREAM_TRANS: - if (ctx->target->push) { + if (ctx->publish != ctx) { if (ngx_rtmp_relay_send_publish(s) != NGX_OK) { return NGX_ERROR; } @@ -675,7 +746,7 @@ ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_relay_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || ctx->src == NULL) { + if (ctx == NULL || ctx->publish == NULL) { return NGX_OK; } @@ -692,35 +763,54 @@ ngx_rtmp_relay_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_uint_t hash; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || ctx->src == NULL) { + if (ctx == NULL || ctx->publish == NULL) { return NGX_OK; } - /* destination end disconnecting */ - if (ctx->src != ctx) { - for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) { + /* play end disconnect? */ + if (ctx->publish != ctx) { + for (cctx = &ctx->publish->play; *cctx; cctx = &(*cctx)->next) { if (*cctx == ctx) { *cctx = ctx->next; break; } } - /*TODO: add push reconnect */ - /* if (ctx->target) {...} */ + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0, + "relay: play disconnect app='%V' name='%V'", + &ctx->app, &ctx->name); - if (ctx->src->dst == NULL) { - ngx_rtmp_finalize_session(ctx->src->session); + /*TODO: add push reconnect */ + /* + if (ctx->relay) { + ngx_rtmp_relay_push(ctx-publish->session, + &ctx->publish->name, &target); + }*/ + + if (ctx->publish->play == NULL) { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, + ctx->publish->session->connection->log, 0, + "relay: publish disconnect empty app='%V' name='%V'", + &ctx->app, &ctx->name); + ngx_rtmp_finalize_session(ctx->publish->session); } return NGX_OK; } - /* source end disconnecting */ - for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) { - (*cctx)->src = NULL; + /* publish end disconnect */ + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0, + "relay: publish disconnect app='%V' name='%V'", + &ctx->app, &ctx->name); + + for (cctx = &ctx->play; *cctx; cctx = &(*cctx)->next) { + (*cctx)->publish = NULL; + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, (*cctx)->session->connection->log, + 0, "relay: play disconnect orphan app='%V' name='%V'", + &(*cctx)->app, &(*cctx)->name); ngx_rtmp_finalize_session((*cctx)->session); } - ctx->src = NULL; + ctx->publish = NULL; racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); hash = ngx_hash_key(ctx->name.data, ctx->name.len); diff --git a/ngx_rtmp_relay_module.h b/ngx_rtmp_relay_module.h new file mode 100644 index 0000000..af086b0 --- /dev/null +++ b/ngx_rtmp_relay_module.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#ifndef _NGX_RTMP_RELAY_H_INCLUDED_ +#define _NGX_RTMP_RELAY_H_INCLUDED_ + + +#include "ngx_rtmp.h" + + +ngx_int_t ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *app, + ngx_str_t *name, ngx_url_t *url); +ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *app, + ngx_str_t *name, ngx_url_t *url); + + +#endif /* _NGX_RTMP_RELAY_H_INCLUDED_ */