From ee6a5917cd92ed27f263835c089f03349e8332be Mon Sep 17 00:00:00 2001 From: Sergey Dryabzhinsky Date: Fri, 19 Aug 2016 05:40:04 +0300 Subject: [PATCH] Almost done. Need to create new session for pull command. --- doc/control_modul.md | 37 ++++- ngx_rtmp_control_module.c | 318 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 354 insertions(+), 1 deletion(-) diff --git a/doc/control_modul.md b/doc/control_modul.md index dbee1f5..802f29a 100644 --- a/doc/control_modul.md +++ b/doc/control_modul.md @@ -90,5 +90,40 @@ http://server.com/control/redirect/publisher|subscriber|client? srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&newname=NEWNAME ``` -* srv, app, name, addr, clients - the same as above +* srv, app, name, addr, clientid - the same as above * newname - new stream name to redirect to + +# Push +Push publishing stream to the other location - local or remote server. +Syntax: +```sh +http://server.com/control/push/? +srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&location=NEWLOCATION +``` + +* srv=SRV - optional server{} block number within rtmp{} block, default to first server{} block +* app=APP - required application name +* name=NAME - required stream name +* addr - optional client address (the same as returned by rtmp_stat) +* clientid - optional nginx client id (displayed in log and stat) +* location - new stream location to push-publish to: + * `/app/newstream` for local + * `host[:port]/app/newstream` for remote, default port - 1935 + +# Pull +Pull stream to the other location - local or remote server. +Syntax: +```sh +http://server.com/control/pull/? +srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&location=NEWLOCATION&newname=NEWNAME +``` + +* srv=SRV - optional server{} block number within rtmp{} block, default to first server{} block +* app=APP - required application name +* name=NAME - optional stream name +* addr - optional client address (the same as returned by rtmp_stat) +* clientid - optional nginx client id (displayed in log and stat) +* location - stream location to pull-play from: + * `/app/newstream` for local + * `host[:port]/app/newstream` for remote, default port - 1935 +* newname - new stream name to pull into diff --git a/ngx_rtmp_control_module.c b/ngx_rtmp_control_module.c index adf3ddc..a12627d 100644 --- a/ngx_rtmp_control_module.c +++ b/ngx_rtmp_control_module.c @@ -9,6 +9,7 @@ #include #include "ngx_rtmp.h" #include "ngx_rtmp_live_module.h" +#include "ngx_rtmp_relay_module.h" #include "ngx_rtmp_record_module.h" @@ -26,6 +27,8 @@ typedef const char * (*ngx_rtmp_control_handler_t)(ngx_http_request_t *r, #define NGX_RTMP_CONTROL_RECORD 0x01 #define NGX_RTMP_CONTROL_DROP 0x02 #define NGX_RTMP_CONTROL_REDIRECT 0x04 +#define NGX_RTMP_CONTROL_PUSH 0x08 +#define NGX_RTMP_CONTROL_PULL 0x10 enum { @@ -54,6 +57,8 @@ static ngx_conf_bitmask_t ngx_rtmp_control_masks[] = { { ngx_string("record"), NGX_RTMP_CONTROL_RECORD }, { ngx_string("drop"), NGX_RTMP_CONTROL_DROP }, { ngx_string("redirect"), NGX_RTMP_CONTROL_REDIRECT }, + { ngx_string("push"), NGX_RTMP_CONTROL_PUSH }, + { ngx_string("pull"), NGX_RTMP_CONTROL_PULL }, { ngx_null_string, 0 } }; @@ -224,6 +229,190 @@ ngx_rtmp_control_redirect_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s) } +static const char * +ngx_rtmp_control_push_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s) +{ + ngx_uint_t rc; + ngx_str_t name, location; + ngx_rtmp_live_ctx_t *lctx; + ngx_rtmp_control_ctx_t *ctx; + ngx_rtmp_relay_target_t target; + ngx_url_t *u; + + if (ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name) + != NGX_OK) + { + return "name not specified"; + } + + if (name.len >= NGX_RTMP_MAX_NAME) { + name.len = NGX_RTMP_MAX_NAME - 1; + } + + if (ngx_http_arg(r, (u_char *) "location", sizeof("location") - 1, &location) + != NGX_OK) + { + return "location not specified"; + } + + if (location.len >= NGX_RTMP_MAX_NAME) { + location.len = NGX_RTMP_MAX_NAME - 1; + } + + ngx_memzero(&target, sizeof(target)); + + u = &target.url; + + u->url = location; + u->default_port = 1935; + u->uri_part = 1; + if (!ngx_strncasecmp(location.data, (u_char *) "/", 1)) { + u->no_resolve = 1; /* want ip here */ + + rc = s->connection->addr_text.len + u->url.len + 1; + location.data = ngx_pcalloc(r->connection->pool, rc); + location.len = ngx_sprintf(location.data, "%V%V", + &s->connection->addr_text, &u->url) - location.data; + + u->url = location; + } + + if (ngx_parse_url(r->connection->pool, u) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "control: push failed '%V' by location url parse", &location); + return "push failed: location parse error"; + } + + ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, + "control: push to: host='%V', uri='%V'", + &u->host, &u->uri); + + if (!u->naddrs) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "control: push failed '%V' by server address resolve", &u->url); + return "push failed: location host resolve error"; + } + + ngx_log_error(NGX_LOG_INFO, r->connection->log, 4, + "control: push to: host='%V', addr='%V', uri='%V', err='%s'", + &u->host, &u->addrs[0].name, &u->uri, &u->err); + + ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module); + ctx->count++; + + lctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + + if (lctx && lctx->publishing) { + /* publish */ + /** + * TODO: update control config in memory, add new push option + */ + if (ngx_rtmp_relay_push(s, &name, &target) != NGX_OK) { + return "push failed"; + } + } + + return NGX_CONF_OK; +} + + +static const char * +ngx_rtmp_control_pull_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s) +{ + ngx_uint_t rc; + ngx_str_t name, location; + ngx_rtmp_live_ctx_t *lctx; + ngx_rtmp_control_ctx_t *ctx; + ngx_rtmp_relay_target_t target; + ngx_url_t *u; + + if (ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name) + != NGX_OK) + { + return "newname not specified"; + } + + if (name.len >= NGX_RTMP_MAX_NAME) { + name.len = NGX_RTMP_MAX_NAME - 1; + } + + if (ngx_http_arg(r, (u_char *) "location", sizeof("location") - 1, &location) + != NGX_OK) + { + return "location not specified"; + } + + if (location.len >= NGX_RTMP_MAX_NAME) { + location.len = NGX_RTMP_MAX_NAME - 1; + } + + ngx_memzero(&target, sizeof(target)); + + u = &target.url; + + u->url = location; + u->default_port = 1935; + u->uri_part = 1; + if (!ngx_strncasecmp(location.data, (u_char *) "/", 1)) { + u->no_resolve = 1; + + rc = s->connection->addr_text.len + u->url.len + 1; + location.data = ngx_pcalloc(r->connection->pool, rc); + location.len = ngx_sprintf(location.data, "%V%V", + &s->connection->addr_text, &u->url) - location.data; + + u->url = location; + } + + if (ngx_parse_url(r->connection->pool, u) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "control: pull failed '%V' by location url parse", &location); + return "pull failed: location parse error"; + } + + ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, + "control: pull to: host='%V', uri='%V'", + &u->host, &u->uri); + + if (!u->naddrs) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "control: pull failed '%V' by server address resolve", &u->url); + return "pull failed: location host resolve error"; + } + + ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, + "control: pull to: host='%V', addr='%V', uri='%V', err='%s'", + &u->host, &u->addrs[0].name, &u->uri, &u->err); + + ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module); + ctx->count++; + + lctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + + if (lctx && !lctx->publishing) { + /* not publish */ + /** + * TODO: update control config in memory, add new pull option + */ + + target.app = s->app; + + /** + * TODO: One pull for app? + */ + if (ngx_rtmp_relay_pull(s, &name, &target) != NGX_OK) { + ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, + "control: can't pull to session..."); + } else { + ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, + "control: pull to session..."); + } + } + + return NGX_CONF_OK; +} + + static const char * ngx_rtmp_control_walk_session(ngx_http_request_t *r, ngx_rtmp_live_ctx_t *lctx) @@ -455,6 +644,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) b = ngx_create_temp_buf(r->pool, ctx->path.len); if (b == NULL) { + msg = "temp buffer allocation error"; goto error; } @@ -469,6 +659,8 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) return ngx_http_output_filter(r, &cl); error: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 1, + "control: record handler error '%s'", msg); return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -517,6 +709,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) p = ngx_palloc(r->connection->pool, len); if (p == NULL) { + msg = "poll allocation error"; return NGX_ERROR; } @@ -527,6 +720,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) b = ngx_calloc_buf(r->pool); if (b == NULL) { + msg = "buffer allocation error"; goto error; } @@ -543,6 +737,8 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) return ngx_http_output_filter(r, &cl); error: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 1, + "control: drop handler error '%s'", msg); return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -591,6 +787,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method) p = ngx_palloc(r->connection->pool, len); if (p == NULL) { + msg = "poll allocation error"; goto error; } @@ -601,6 +798,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method) b = ngx_calloc_buf(r->pool); if (b == NULL) { + msg = "buffer allocation error"; goto error; } @@ -617,6 +815,124 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method) return ngx_http_output_filter(r, &cl); error: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 1, + "control: redirect handler error '%s'", msg); + return NGX_HTTP_INTERNAL_SERVER_ERROR; +} + + +static ngx_int_t +ngx_rtmp_control_push(ngx_http_request_t *r, ngx_str_t *method) +{ + size_t len; + u_char *p; + ngx_buf_t *b; + ngx_chain_t cl; + const char *msg; + ngx_rtmp_control_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module); + ctx->filter = NGX_RTMP_CONTROL_FILTER_PUBLISHER; + + msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_push_handler); + if (msg != NGX_CONF_OK) { + goto error; + } + + /* output count */ + + len = NGX_INT_T_LEN; + + p = ngx_palloc(r->connection->pool, len); + if (p == NULL) { + msg = "poll allocation error"; + goto error; + } + + len = (size_t) (ngx_snprintf(p, len, "%ui", ctx->count) - p); + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_length_n = len; + + b = ngx_calloc_buf(r->pool); + if (b == NULL) { + msg = "buffer allocation error"; + goto error; + } + + b->start = b->pos = p; + b->end = b->last = p + len; + b->temporary = 1; + b->last_buf = 1; + + ngx_memzero(&cl, sizeof(cl)); + cl.buf = b; + + ngx_http_send_header(r); + + return ngx_http_output_filter(r, &cl); + +error: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 1, + "control: push handler error '%s'", msg); + return NGX_HTTP_INTERNAL_SERVER_ERROR; +} + + +static ngx_int_t +ngx_rtmp_control_pull(ngx_http_request_t *r, ngx_str_t *method) +{ + size_t len; + u_char *p; + ngx_buf_t *b; + ngx_chain_t cl; + const char *msg; + ngx_rtmp_control_ctx_t *ctx; + + ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module); + ctx->filter = NGX_RTMP_CONTROL_FILTER_SUBSCRIBER; + + msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_pull_handler); + if (msg != NGX_CONF_OK) { + goto error; + } + + /* output count */ + + len = NGX_INT_T_LEN; + + p = ngx_palloc(r->connection->pool, len); + if (p == NULL) { + msg = "poll allocation error"; + goto error; + } + + len = (size_t) (ngx_snprintf(p, len, "%ui", ctx->count) - p); + + r->headers_out.status = NGX_HTTP_OK; + r->headers_out.content_length_n = len; + + b = ngx_calloc_buf(r->pool); + if (b == NULL) { + msg = "buffer allocation error"; + goto error; + } + + b->start = b->pos = p; + b->end = b->last = p + len; + b->temporary = 1; + b->last_buf = 1; + + ngx_memzero(&cl, sizeof(cl)); + cl.buf = b; + + ngx_http_send_header(r); + + return ngx_http_output_filter(r, &cl); + +error: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 1, + "control: pull handler error '%s'", msg); return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -685,6 +1001,8 @@ ngx_rtmp_control_handler(ngx_http_request_t *r) NGX_RTMP_CONTROL_SECTION(RECORD, record); NGX_RTMP_CONTROL_SECTION(DROP, drop); NGX_RTMP_CONTROL_SECTION(REDIRECT, redirect); + NGX_RTMP_CONTROL_SECTION(PUSH, push); + NGX_RTMP_CONTROL_SECTION(PULL, pull); #undef NGX_RTMP_CONTROL_SECTION