Almost done. Need to create new session for pull command.

This commit is contained in:
Sergey Dryabzhinsky 2016-08-19 05:40:04 +03:00
parent c47cb2370f
commit ee6a5917cd
2 changed files with 354 additions and 1 deletions

View file

@ -90,5 +90,40 @@ http://server.com/control/redirect/publisher|subscriber|client?
srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&newname=NEWNAME
```
* srv, app, name, addr, clients - the same as above
* srv, app, name, addr, clientid - the same as above
* newname - new stream name to redirect to
# Push
Push publishing stream to the other location - local or remote server.
Syntax:
```sh
http://server.com/control/push/?
srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&location=NEWLOCATION
```
* srv=SRV - optional server{} block number within rtmp{} block, default to first server{} block
* app=APP - required application name
* name=NAME - required stream name
* addr - optional client address (the same as returned by rtmp_stat)
* clientid - optional nginx client id (displayed in log and stat)
* location - new stream location to push-publish to:
* `/app/newstream` for local
* `host[:port]/app/newstream` for remote, default port - 1935
# Pull
Pull stream to the other location - local or remote server.
Syntax:
```sh
http://server.com/control/pull/?
srv=SRV&app=APP&name=NAME&addr=ADDR&clientid=CLIENTID&location=NEWLOCATION&newname=NEWNAME
```
* srv=SRV - optional server{} block number within rtmp{} block, default to first server{} block
* app=APP - required application name
* name=NAME - optional stream name
* addr - optional client address (the same as returned by rtmp_stat)
* clientid - optional nginx client id (displayed in log and stat)
* location - stream location to pull-play from:
* `/app/newstream` for local
* `host[:port]/app/newstream` for remote, default port - 1935
* newname - new stream name to pull into

View file

@ -9,6 +9,7 @@
#include <ngx_http.h>
#include "ngx_rtmp.h"
#include "ngx_rtmp_live_module.h"
#include "ngx_rtmp_relay_module.h"
#include "ngx_rtmp_record_module.h"
@ -26,6 +27,8 @@ typedef const char * (*ngx_rtmp_control_handler_t)(ngx_http_request_t *r,
#define NGX_RTMP_CONTROL_RECORD 0x01
#define NGX_RTMP_CONTROL_DROP 0x02
#define NGX_RTMP_CONTROL_REDIRECT 0x04
#define NGX_RTMP_CONTROL_PUSH 0x08
#define NGX_RTMP_CONTROL_PULL 0x10
enum {
@ -54,6 +57,8 @@ static ngx_conf_bitmask_t ngx_rtmp_control_masks[] = {
{ ngx_string("record"), NGX_RTMP_CONTROL_RECORD },
{ ngx_string("drop"), NGX_RTMP_CONTROL_DROP },
{ ngx_string("redirect"), NGX_RTMP_CONTROL_REDIRECT },
{ ngx_string("push"), NGX_RTMP_CONTROL_PUSH },
{ ngx_string("pull"), NGX_RTMP_CONTROL_PULL },
{ ngx_null_string, 0 }
};
@ -224,6 +229,190 @@ ngx_rtmp_control_redirect_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
}
static const char *
ngx_rtmp_control_push_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
{
ngx_uint_t rc;
ngx_str_t name, location;
ngx_rtmp_live_ctx_t *lctx;
ngx_rtmp_control_ctx_t *ctx;
ngx_rtmp_relay_target_t target;
ngx_url_t *u;
if (ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name)
!= NGX_OK)
{
return "name not specified";
}
if (name.len >= NGX_RTMP_MAX_NAME) {
name.len = NGX_RTMP_MAX_NAME - 1;
}
if (ngx_http_arg(r, (u_char *) "location", sizeof("location") - 1, &location)
!= NGX_OK)
{
return "location not specified";
}
if (location.len >= NGX_RTMP_MAX_NAME) {
location.len = NGX_RTMP_MAX_NAME - 1;
}
ngx_memzero(&target, sizeof(target));
u = &target.url;
u->url = location;
u->default_port = 1935;
u->uri_part = 1;
if (!ngx_strncasecmp(location.data, (u_char *) "/", 1)) {
u->no_resolve = 1; /* want ip here */
rc = s->connection->addr_text.len + u->url.len + 1;
location.data = ngx_pcalloc(r->connection->pool, rc);
location.len = ngx_sprintf(location.data, "%V%V",
&s->connection->addr_text, &u->url) - location.data;
u->url = location;
}
if (ngx_parse_url(r->connection->pool, u) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"control: push failed '%V' by location url parse", &location);
return "push failed: location parse error";
}
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
"control: push to: host='%V', uri='%V'",
&u->host, &u->uri);
if (!u->naddrs) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"control: push failed '%V' by server address resolve", &u->url);
return "push failed: location host resolve error";
}
ngx_log_error(NGX_LOG_INFO, r->connection->log, 4,
"control: push to: host='%V', addr='%V', uri='%V', err='%s'",
&u->host, &u->addrs[0].name, &u->uri, &u->err);
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->count++;
lctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (lctx && lctx->publishing) {
/* publish */
/**
* TODO: update control config in memory, add new push option
*/
if (ngx_rtmp_relay_push(s, &name, &target) != NGX_OK) {
return "push failed";
}
}
return NGX_CONF_OK;
}
static const char *
ngx_rtmp_control_pull_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
{
ngx_uint_t rc;
ngx_str_t name, location;
ngx_rtmp_live_ctx_t *lctx;
ngx_rtmp_control_ctx_t *ctx;
ngx_rtmp_relay_target_t target;
ngx_url_t *u;
if (ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name)
!= NGX_OK)
{
return "newname not specified";
}
if (name.len >= NGX_RTMP_MAX_NAME) {
name.len = NGX_RTMP_MAX_NAME - 1;
}
if (ngx_http_arg(r, (u_char *) "location", sizeof("location") - 1, &location)
!= NGX_OK)
{
return "location not specified";
}
if (location.len >= NGX_RTMP_MAX_NAME) {
location.len = NGX_RTMP_MAX_NAME - 1;
}
ngx_memzero(&target, sizeof(target));
u = &target.url;
u->url = location;
u->default_port = 1935;
u->uri_part = 1;
if (!ngx_strncasecmp(location.data, (u_char *) "/", 1)) {
u->no_resolve = 1;
rc = s->connection->addr_text.len + u->url.len + 1;
location.data = ngx_pcalloc(r->connection->pool, rc);
location.len = ngx_sprintf(location.data, "%V%V",
&s->connection->addr_text, &u->url) - location.data;
u->url = location;
}
if (ngx_parse_url(r->connection->pool, u) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"control: pull failed '%V' by location url parse", &location);
return "pull failed: location parse error";
}
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
"control: pull to: host='%V', uri='%V'",
&u->host, &u->uri);
if (!u->naddrs) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"control: pull failed '%V' by server address resolve", &u->url);
return "pull failed: location host resolve error";
}
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
"control: pull to: host='%V', addr='%V', uri='%V', err='%s'",
&u->host, &u->addrs[0].name, &u->uri, &u->err);
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->count++;
lctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (lctx && !lctx->publishing) {
/* not publish */
/**
* TODO: update control config in memory, add new pull option
*/
target.app = s->app;
/**
* TODO: One pull for app?
*/
if (ngx_rtmp_relay_pull(s, &name, &target) != NGX_OK) {
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"control: can't pull to session...");
} else {
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
"control: pull to session...");
}
}
return NGX_CONF_OK;
}
static const char *
ngx_rtmp_control_walk_session(ngx_http_request_t *r,
ngx_rtmp_live_ctx_t *lctx)
@ -455,6 +644,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
b = ngx_create_temp_buf(r->pool, ctx->path.len);
if (b == NULL) {
msg = "temp buffer allocation error";
goto error;
}
@ -469,6 +659,8 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
return ngx_http_output_filter(r, &cl);
error:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 1,
"control: record handler error '%s'", msg);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
@ -517,6 +709,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
p = ngx_palloc(r->connection->pool, len);
if (p == NULL) {
msg = "poll allocation error";
return NGX_ERROR;
}
@ -527,6 +720,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
msg = "buffer allocation error";
goto error;
}
@ -543,6 +737,8 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
return ngx_http_output_filter(r, &cl);
error:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 1,
"control: drop handler error '%s'", msg);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
@ -591,6 +787,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
p = ngx_palloc(r->connection->pool, len);
if (p == NULL) {
msg = "poll allocation error";
goto error;
}
@ -601,6 +798,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
msg = "buffer allocation error";
goto error;
}
@ -617,6 +815,124 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
return ngx_http_output_filter(r, &cl);
error:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 1,
"control: redirect handler error '%s'", msg);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
static ngx_int_t
ngx_rtmp_control_push(ngx_http_request_t *r, ngx_str_t *method)
{
size_t len;
u_char *p;
ngx_buf_t *b;
ngx_chain_t cl;
const char *msg;
ngx_rtmp_control_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->filter = NGX_RTMP_CONTROL_FILTER_PUBLISHER;
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_push_handler);
if (msg != NGX_CONF_OK) {
goto error;
}
/* output count */
len = NGX_INT_T_LEN;
p = ngx_palloc(r->connection->pool, len);
if (p == NULL) {
msg = "poll allocation error";
goto error;
}
len = (size_t) (ngx_snprintf(p, len, "%ui", ctx->count) - p);
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = len;
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
msg = "buffer allocation error";
goto error;
}
b->start = b->pos = p;
b->end = b->last = p + len;
b->temporary = 1;
b->last_buf = 1;
ngx_memzero(&cl, sizeof(cl));
cl.buf = b;
ngx_http_send_header(r);
return ngx_http_output_filter(r, &cl);
error:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 1,
"control: push handler error '%s'", msg);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
static ngx_int_t
ngx_rtmp_control_pull(ngx_http_request_t *r, ngx_str_t *method)
{
size_t len;
u_char *p;
ngx_buf_t *b;
ngx_chain_t cl;
const char *msg;
ngx_rtmp_control_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->filter = NGX_RTMP_CONTROL_FILTER_SUBSCRIBER;
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_pull_handler);
if (msg != NGX_CONF_OK) {
goto error;
}
/* output count */
len = NGX_INT_T_LEN;
p = ngx_palloc(r->connection->pool, len);
if (p == NULL) {
msg = "poll allocation error";
goto error;
}
len = (size_t) (ngx_snprintf(p, len, "%ui", ctx->count) - p);
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = len;
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
msg = "buffer allocation error";
goto error;
}
b->start = b->pos = p;
b->end = b->last = p + len;
b->temporary = 1;
b->last_buf = 1;
ngx_memzero(&cl, sizeof(cl));
cl.buf = b;
ngx_http_send_header(r);
return ngx_http_output_filter(r, &cl);
error:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 1,
"control: pull handler error '%s'", msg);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
@ -685,6 +1001,8 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
NGX_RTMP_CONTROL_SECTION(RECORD, record);
NGX_RTMP_CONTROL_SECTION(DROP, drop);
NGX_RTMP_CONTROL_SECTION(REDIRECT, redirect);
NGX_RTMP_CONTROL_SECTION(PUSH, push);
NGX_RTMP_CONTROL_SECTION(PULL, pull);
#undef NGX_RTMP_CONTROL_SECTION