mirror of
https://github.com/zotanmew/nginx-rtmp-module.git
synced 2024-05-20 18:01:08 +02:00
improved closing session on error
This commit is contained in:
parent
4602695d5c
commit
8e32bae842
7
TODO
7
TODO
|
@ -1,4 +1,4 @@
|
||||||
- Implement modules support.
|
~ Implement modules support.
|
||||||
Move AMF0 handlers to modules.
|
Move AMF0 handlers to modules.
|
||||||
Move broadcast to module.
|
Move broadcast to module.
|
||||||
|
|
||||||
|
@ -6,6 +6,11 @@
|
||||||
|
|
||||||
- packet dropping
|
- packet dropping
|
||||||
|
|
||||||
|
- l <-> cl
|
||||||
|
|
||||||
|
- closing session on send error
|
||||||
|
causes crash because of double-freeing stack
|
||||||
|
|
||||||
- shortcuts for big-endian copy
|
- shortcuts for big-endian copy
|
||||||
|
|
||||||
- implement loc confs (=fms apps)
|
- implement loc confs (=fms apps)
|
||||||
|
|
|
@ -257,7 +257,8 @@ typedef struct {
|
||||||
|
|
||||||
|
|
||||||
void ngx_rtmp_init_connection(ngx_connection_t *c);
|
void ngx_rtmp_init_connection(ngx_connection_t *c);
|
||||||
void ngx_rtmp_close_session(ngx_rtmp_session_t *s);
|
/*void ngx_rtmp_close_session(ngx_rtmp_session_t *s);*/
|
||||||
|
void ngx_rtmp_close_connection(ngx_connection_t *c);
|
||||||
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
|
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
|
||||||
|
|
||||||
|
|
||||||
|
@ -271,9 +272,9 @@ ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
|
||||||
|
|
||||||
/* Sending messages */
|
/* Sending messages */
|
||||||
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
|
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
|
||||||
void ngx_rtmp_prepare_message(ngx_rtmp_header_t *h,
|
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
ngx_chain_t *out, uint8_t fmt);
|
ngx_chain_t *out, uint8_t fmt);
|
||||||
void ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
|
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
|
||||||
|
|
||||||
#define NGX_RTMP_LIMIT_SOFT 0
|
#define NGX_RTMP_LIMIT_SOFT 0
|
||||||
#define NGX_RTMP_LIMIT_HARD 1
|
#define NGX_RTMP_LIMIT_HARD 1
|
||||||
|
|
|
@ -39,11 +39,17 @@ ngx_rtmp_amf0_debug(const char* op, ngx_log_t *log, u_char *p, size_t n)
|
||||||
|
|
||||||
for(i = 0; i < n && i < NGX_RTMP_AMF0_DEBUG_SIZE; ++i) {
|
for(i = 0; i < n && i < NGX_RTMP_AMF0_DEBUG_SIZE; ++i) {
|
||||||
*hp++ = ' ';
|
*hp++ = ' ';
|
||||||
*hp++ = hex[(*p & 0xf0) >> 4];
|
if (p) {
|
||||||
*hp++ = hex[*p & 0x0f];
|
*hp++ = hex[(*p & 0xf0) >> 4];
|
||||||
*sp++ = (*p >= 0x20 && *p <= 0x7e) ?
|
*hp++ = hex[*p & 0x0f];
|
||||||
*p : (u_char)'?';
|
*sp++ = (*p >= 0x20 && *p <= 0x7e) ?
|
||||||
++p;
|
*p : (u_char)'?';
|
||||||
|
++p;
|
||||||
|
} else {
|
||||||
|
*hp++ = 'X';
|
||||||
|
*hp++ = 'X';
|
||||||
|
*sp++ = '?';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*hp = *sp = '\0';
|
*hp = *sp = '\0';
|
||||||
|
|
||||||
|
@ -69,7 +75,7 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
|
||||||
|
|
||||||
b = l->buf;
|
b = l->buf;
|
||||||
|
|
||||||
if (b->last > n + b->pos) {
|
if (b->last >= n + b->pos) {
|
||||||
if (p) {
|
if (p) {
|
||||||
p = ngx_cpymem(p, b->pos, n);
|
p = ngx_cpymem(p, b->pos, n);
|
||||||
}
|
}
|
||||||
|
@ -128,6 +134,7 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
|
||||||
l = ln;
|
l = ln;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx->link = l;
|
||||||
b = l->buf;
|
b = l->buf;
|
||||||
b->pos = b->last = b->start;
|
b->pos = b->last = b->start;
|
||||||
}
|
}
|
||||||
|
@ -156,6 +163,7 @@ ngx_rtmp_amf0_read_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts,
|
||||||
uint16_t len;
|
uint16_t len;
|
||||||
size_t n, namelen, maxlen;
|
size_t n, namelen, maxlen;
|
||||||
ngx_int_t rc;
|
ngx_int_t rc;
|
||||||
|
u_char buf[2];
|
||||||
|
|
||||||
maxlen = 0;
|
maxlen = 0;
|
||||||
for(n = 0; n < nelts; ++n) {
|
for(n = 0; n < nelts; ++n) {
|
||||||
|
@ -169,9 +177,11 @@ ngx_rtmp_amf0_read_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts,
|
||||||
char name[maxlen + 1];
|
char name[maxlen + 1];
|
||||||
|
|
||||||
/* read key */
|
/* read key */
|
||||||
if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK)
|
if (ngx_rtmp_amf0_get(ctx, buf, 2) != NGX_OK)
|
||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
|
|
||||||
|
ngx_rtmp_amf0_reverse_copy(&len, buf, 2);
|
||||||
|
|
||||||
if (!len)
|
if (!len)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -241,7 +251,7 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n
|
||||||
if (ngx_rtmp_amf0_get(ctx, buf, 8) != NGX_OK) {
|
if (ngx_rtmp_amf0_get(ctx, buf, 8) != NGX_OK) {
|
||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
}
|
}
|
||||||
ngx_rtmp_amf0_reverse_copy(data, buf, 0);
|
ngx_rtmp_amf0_reverse_copy(data, buf, 8);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case NGX_RTMP_AMF0_BOOLEAN:
|
case NGX_RTMP_AMF0_BOOLEAN:
|
||||||
|
|
|
@ -230,7 +230,7 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
p += (l->buf->end - l->buf->last);
|
p += (l->buf->end - l->buf->last);
|
||||||
}
|
}
|
||||||
|
|
||||||
ngx_rtmp_prepare_message(h, out, 0/*fmt*/);
|
ngx_rtmp_prepare_message(s, h, out, 0/*fmt*/);
|
||||||
|
|
||||||
/* broadcast to all subscribers */
|
/* broadcast to all subscribers */
|
||||||
for(cctx = *ngx_rtmp_broadcast_get_head(s);
|
for(cctx = *ngx_rtmp_broadcast_get_head(s);
|
||||||
|
@ -242,7 +242,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
|
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
|
||||||
ctx->stream.len))
|
ctx->stream.len))
|
||||||
{
|
{
|
||||||
ngx_rtmp_send_message(s, out);
|
if (ngx_rtmp_send_message(s, out) != NGX_OK) {
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,7 +263,7 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans,
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t in_cmd[] = {
|
static ngx_rtmp_amf0_elt_t in_cmd[] = {
|
||||||
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
|
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
|
||||||
{ NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(url) },
|
{ NGX_RTMP_AMF0_STRING, "tcUrl" , url, sizeof(url) },
|
||||||
};
|
};
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t out_inf[] = {
|
static ngx_rtmp_amf0_elt_t out_inf[] = {
|
||||||
|
@ -272,7 +274,6 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans,
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t in_elts[] = {
|
static ngx_rtmp_amf0_elt_t in_elts[] = {
|
||||||
{ NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) },
|
{ NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) },
|
||||||
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
||||||
|
|
|
@ -23,8 +23,6 @@ static void ngx_rtmp_send(ngx_event_t *rev);
|
||||||
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||||
ngx_rtmp_header_t *h, ngx_chain_t *in);
|
ngx_rtmp_header_t *h, ngx_chain_t *in);
|
||||||
|
|
||||||
static void ngx_rtmp_close_connection(ngx_connection_t *c);
|
|
||||||
|
|
||||||
#ifdef NGX_DEBUG
|
#ifdef NGX_DEBUG
|
||||||
static char*
|
static char*
|
||||||
ngx_rtmp_packet_type(uint8_t type) {
|
ngx_rtmp_packet_type(uint8_t type) {
|
||||||
|
@ -207,14 +205,14 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
||||||
|
|
||||||
s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module);
|
s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module);
|
||||||
if (s->ctx == NULL) {
|
if (s->ctx == NULL) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
|
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
|
||||||
* cscf->max_streams);
|
* cscf->max_streams);
|
||||||
if (s->in_streams == NULL) {
|
if (s->in_streams == NULL) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,6 +227,9 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
||||||
b->end = b->start + size;
|
b->end = b->start + size;
|
||||||
b->temporary = 1;
|
b->temporary = 1;
|
||||||
|
|
||||||
|
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"RTMP handshake done");
|
||||||
|
|
||||||
c->write->handler = ngx_rtmp_handshake_send;
|
c->write->handler = ngx_rtmp_handshake_send;
|
||||||
c->read->handler = ngx_rtmp_handshake_recv;
|
c->read->handler = ngx_rtmp_handshake_recv;
|
||||||
|
|
||||||
|
@ -249,10 +250,14 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
||||||
s = c->data;
|
s = c->data;
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
|
if (c->destroyed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (rev->timedout) {
|
if (rev->timedout) {
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
|
||||||
c->timedout = 1;
|
c->timedout = 1;
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,7 +272,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
||||||
n = c->recv(c, b->last, b->end - b->last);
|
n = c->recv(c, b->last, b->end - b->last);
|
||||||
|
|
||||||
if (n == NGX_ERROR || n == 0) {
|
if (n == NGX_ERROR || n == 0) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,7 +282,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
||||||
{
|
{
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||||
"invalid handshake signature");
|
"invalid handshake signature");
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
b->last += n;
|
b->last += n;
|
||||||
|
@ -286,7 +291,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
||||||
if (n == NGX_AGAIN) {
|
if (n == NGX_AGAIN) {
|
||||||
ngx_add_timer(rev, cscf->timeout);
|
ngx_add_timer(rev, cscf->timeout);
|
||||||
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -322,10 +327,14 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
|
||||||
s = c->data;
|
s = c->data;
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
|
if (c->destroyed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (wev->timedout) {
|
if (wev->timedout) {
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
|
||||||
c->timedout = 1;
|
c->timedout = 1;
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,14 +352,14 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n == NGX_ERROR) {
|
if (n == NGX_ERROR) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n == NGX_AGAIN) {
|
if (n == NGX_AGAIN) {
|
||||||
ngx_add_timer(c->write, cscf->timeout);
|
ngx_add_timer(c->write, cscf->timeout);
|
||||||
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
|
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -389,7 +398,11 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
b = NULL;
|
b = NULL;
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
for(;;) {
|
if (c->destroyed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for( ;; ) {
|
||||||
|
|
||||||
st = &s->in_streams[s->in_csid];
|
st = &s->in_streams[s->in_csid];
|
||||||
|
|
||||||
|
@ -399,7 +412,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
{
|
{
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||||
"chain alloc failed");
|
"chain alloc failed");
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,7 +423,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
if (st->in->buf->start == NULL) {
|
if (st->in->buf->start == NULL) {
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||||
"buf alloc failed");
|
"buf alloc failed");
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
st->in->buf->flush = 1;
|
st->in->buf->flush = 1;
|
||||||
|
@ -437,13 +450,13 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
n = c->recv(c, b->last, b->end - b->last);
|
n = c->recv(c, b->last, b->end - b->last);
|
||||||
|
|
||||||
if (n == NGX_ERROR || n == 0) {
|
if (n == NGX_ERROR || n == 0) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n == NGX_AGAIN) {
|
if (n == NGX_AGAIN) {
|
||||||
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -480,7 +493,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||||
"RTMP chunk stream too big: %D >= %D",
|
"RTMP chunk stream too big: %D >= %D",
|
||||||
csid, cscf->max_streams);
|
csid, cscf->max_streams);
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,7 +602,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
head = st->in->next;
|
head = st->in->next;
|
||||||
st->in->next = NULL;
|
st->in->next = NULL;
|
||||||
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
|
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
b->pos += h->mlen;
|
b->pos += h->mlen;
|
||||||
|
@ -607,30 +620,34 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
|
|
||||||
|
|
||||||
#define ngx_rtmp_buf_addref(b) \
|
#define ngx_rtmp_buf_addref(b) \
|
||||||
(++*(int*)&(b)->tag)
|
(++*(int*)&((b)->tag))
|
||||||
|
|
||||||
|
|
||||||
#define ngx_rtmp_buf_release(b) \
|
#define ngx_rtmp_buf_release(b) \
|
||||||
(--*(int*)&(b)->tag)
|
(--*(int*)&((b)->tag))
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
ngx_rtmp_send(ngx_event_t *wev)
|
ngx_rtmp_send(ngx_event_t *wev)
|
||||||
{
|
{
|
||||||
ngx_connection_t *c;
|
ngx_connection_t *c;
|
||||||
ngx_rtmp_session_t *s;
|
ngx_rtmp_session_t *s;
|
||||||
ngx_rtmp_core_srv_conf_t *cscf;
|
ngx_rtmp_core_srv_conf_t *cscf;
|
||||||
ngx_chain_t *out, *l, *ln;
|
ngx_chain_t *out, *l, *ln;
|
||||||
|
|
||||||
c = wev->data;
|
c = wev->data;
|
||||||
s = c->data;
|
s = c->data;
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
|
if (c->destroyed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (wev->timedout) {
|
if (wev->timedout) {
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
|
||||||
"client timed out");
|
"client timed out");
|
||||||
c->timedout = 1;
|
c->timedout = 1;
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +659,7 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||||
out = c->send_chain(c, s->out, 0);
|
out = c->send_chain(c, s->out, 0);
|
||||||
|
|
||||||
if (out == NGX_CHAIN_ERROR) {
|
if (out == NGX_CHAIN_ERROR) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -650,7 +667,7 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
ngx_add_timer(c->write, cscf->timeout);
|
ngx_add_timer(c->write, cscf->timeout);
|
||||||
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
|
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
|
||||||
ngx_rtmp_close_session(s);
|
ngx_rtmp_close_connection(c);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -713,14 +730,15 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
|
||||||
b = out->buf;
|
b = out->buf;
|
||||||
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
|
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
|
||||||
b->tag = (ngx_buf_tag_t)0;
|
b->tag = (ngx_buf_tag_t)0;
|
||||||
|
b->memory = 1;
|
||||||
|
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
uint8_t fmt)
|
ngx_chain_t *out, uint8_t fmt)
|
||||||
{
|
{
|
||||||
ngx_chain_t *l;
|
ngx_chain_t *l;
|
||||||
u_char *p, *pp;
|
u_char *p, *pp;
|
||||||
|
@ -735,13 +753,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||||
mlen += (out->buf->last - l->buf->pos);
|
mlen += (out->buf->last - l->buf->pos);
|
||||||
++nbufs;
|
++nbufs;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||||
"RTMP send %s (%d) csid=%D timestamp=%D "
|
"RTMP prep %s (%d) csid=%D timestamp=%D "
|
||||||
"mlen=%D msid=%D nbufs=%d",
|
"mlen=%D msid=%D nbufs=%d",
|
||||||
ngx_rtmp_packet_type(h->type), (int)h->type,
|
ngx_rtmp_packet_type(h->type), (int)h->type,
|
||||||
h->csid, h->timestamp, mlen, h->msid, nbufs);
|
h->csid, h->timestamp, mlen, h->msid, nbufs);
|
||||||
*/
|
|
||||||
/* determine initial header size */
|
/* determine initial header size */
|
||||||
hsize = hdrsize[fmt];
|
hsize = hdrsize[fmt];
|
||||||
|
|
||||||
|
@ -815,27 +833,41 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
|
||||||
* trailing fragments */
|
* trailing fragments */
|
||||||
p = out->buf->pos;
|
p = out->buf->pos;
|
||||||
for(out = out->next; out; out = out->next) {
|
for(out = out->next; out; out = out->next) {
|
||||||
out->buf->pos -= hsize;
|
out->buf->pos -= thsize;
|
||||||
ngx_memcpy(out->buf->pos, p, thsize);
|
ngx_memcpy(out->buf->pos, p, thsize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
ngx_int_t
|
||||||
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
|
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
|
||||||
{
|
{
|
||||||
ngx_chain_t *l, **ll;
|
ngx_chain_t *l, **ll;
|
||||||
|
size_t nbytes, nbufs;
|
||||||
|
ngx_connection_t *c;
|
||||||
|
|
||||||
|
c = s->connection;
|
||||||
|
nbytes = 0;
|
||||||
|
nbufs = 0;
|
||||||
|
|
||||||
for(l = out; l; l = l->next) {
|
for(l = out; l; l = l->next) {
|
||||||
ngx_rtmp_buf_addref(l->buf);
|
ngx_rtmp_buf_addref(l->buf);
|
||||||
|
nbytes += (l->buf->last - l->buf->pos);
|
||||||
|
++nbufs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||||
|
"RTMP send nbytes=%d, nbufs=%d",
|
||||||
|
nbytes, nbufs);
|
||||||
|
|
||||||
/* TODO: optimize lookup */
|
/* TODO: optimize lookup */
|
||||||
/* TODO: implement dropper */
|
/* TODO: implement dropper */
|
||||||
for(ll = &s->out; *ll; ll = &(*ll)->next);
|
for(ll = &s->out; *ll; ll = &(*ll)->next);
|
||||||
*ll = out;
|
*ll = out;
|
||||||
|
|
||||||
ngx_rtmp_send(s->connection->write);
|
ngx_rtmp_send(s->connection->write);
|
||||||
|
|
||||||
|
return c->destroyed ? NGX_ERROR : NGX_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -890,7 +922,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||||
return NGX_OK;
|
return NGX_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
void
|
void
|
||||||
ngx_rtmp_close_session(ngx_rtmp_session_t *s)
|
ngx_rtmp_close_session(ngx_rtmp_session_t *s)
|
||||||
{
|
{
|
||||||
|
@ -910,15 +942,38 @@ ngx_rtmp_close_session(ngx_rtmp_session_t *s)
|
||||||
ngx_destroy_pool(s->in_pool);
|
ngx_destroy_pool(s->in_pool);
|
||||||
ngx_rtmp_close_connection(s->connection);
|
ngx_rtmp_close_connection(s->connection);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
void
|
void
|
||||||
ngx_rtmp_close_connection(ngx_connection_t *c)
|
ngx_rtmp_close_connection(ngx_connection_t *c)
|
||||||
{
|
{
|
||||||
ngx_pool_t *pool;
|
ngx_rtmp_session_t *s;
|
||||||
|
ngx_pool_t *pool;
|
||||||
|
ngx_rtmp_core_main_conf_t *cmcf;
|
||||||
|
ngx_rtmp_disconnect_handler_pt *h;
|
||||||
|
size_t n;
|
||||||
|
|
||||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
if (c->destroyed) {
|
||||||
"close connection: %d", c->fd);
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
s = c->data;
|
||||||
|
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
|
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
|
||||||
|
|
||||||
|
if (s) {
|
||||||
|
h = cmcf->disconnect.elts;
|
||||||
|
for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) {
|
||||||
|
if (*h) {
|
||||||
|
(*h)(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s->in_pool) {
|
||||||
|
ngx_destroy_pool(s->in_pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c->destroyed = 1;
|
c->destroyed = 1;
|
||||||
pool = c->pool;
|
pool = c->pool;
|
||||||
|
|
|
@ -159,7 +159,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
|
||||||
ngx_rtmp_amf0_ctx_t act;
|
ngx_rtmp_amf0_ctx_t act;
|
||||||
ngx_connection_t *c;
|
ngx_connection_t *c;
|
||||||
ngx_rtmp_core_main_conf_t *cmcf;
|
ngx_rtmp_core_main_conf_t *cmcf;
|
||||||
ngx_rtmp_call_handler_pt *ch;
|
ngx_rtmp_call_handler_pt ch;
|
||||||
size_t len;
|
size_t len;
|
||||||
|
|
||||||
static double trans;
|
static double trans;
|
||||||
|
@ -167,7 +167,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t elts[] = {
|
static ngx_rtmp_amf0_elt_t elts[] = {
|
||||||
{ NGX_RTMP_AMF0_STRING, 0, func, sizeof(func) },
|
{ NGX_RTMP_AMF0_STRING, 0, func, sizeof(func) },
|
||||||
{ NGX_RTMP_AMF0_NUMBER, 0, NULL, 0 },
|
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
|
||||||
};
|
};
|
||||||
|
|
||||||
c = s->connection;
|
c = s->connection;
|
||||||
|
@ -197,13 +197,13 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
|
||||||
|
|
||||||
if (ch) {
|
if (ch) {
|
||||||
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
"AMF0 func '%s' @%f passed to handler", func, trans);
|
"AMF0 func '%s' trans=%f passed to handler", func, trans);
|
||||||
|
|
||||||
return (*ch)(s, trans, in);
|
return ch(s, trans, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
"AMF0 cmd '%s' @%f no handler", func, trans);
|
"AMF0 cmd '%s' trans=%f no handler", func, trans);
|
||||||
|
|
||||||
return NGX_OK;
|
return NGX_OK;
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
#include "ngx_rtmp_amf0.h"
|
#include "ngx_rtmp_amf0.h"
|
||||||
|
|
||||||
|
|
||||||
#define NGX_RTMP_USER_START(s, tp) \
|
#define NGX_RTMP_USER_START(s, tp) \
|
||||||
ngx_rtmp_header_t __h; \
|
ngx_rtmp_header_t __h; \
|
||||||
ngx_chain_t *__l; \
|
ngx_chain_t *__l; \
|
||||||
ngx_buf_t *__b; \
|
ngx_buf_t *__b; \
|
||||||
|
@ -22,23 +22,22 @@
|
||||||
__b = __l->buf;
|
__b = __l->buf;
|
||||||
|
|
||||||
#define NGX_RTMP_UCTL_START(s, type, utype) \
|
#define NGX_RTMP_UCTL_START(s, type, utype) \
|
||||||
NGX_RTMP_USER_START(s, type); \
|
NGX_RTMP_USER_START(s, type); \
|
||||||
*(__b->last++) = (u_char)((utype) >> 8); \
|
*(__b->last++) = (u_char)((utype) >> 8); \
|
||||||
*(__b->last++) = (u_char)(utype);
|
*(__b->last++) = (u_char)(utype);
|
||||||
|
|
||||||
#define NGX_RTMP_USER_OUT1(v) \
|
#define NGX_RTMP_USER_OUT1(v) \
|
||||||
*(__b->last++) = ((u_char*)&v)[0];
|
*(__b->last++) = ((u_char*)&v)[0];
|
||||||
|
|
||||||
#define NGX_RTMP_USER_OUT4(v) \
|
#define NGX_RTMP_USER_OUT4(v) \
|
||||||
*(__b->last++) = ((u_char*)&v)[3]; \
|
*(__b->last++) = ((u_char*)&v)[3]; \
|
||||||
*(__b->last++) = ((u_char*)&v)[2]; \
|
*(__b->last++) = ((u_char*)&v)[2]; \
|
||||||
*(__b->last++) = ((u_char*)&v)[1]; \
|
*(__b->last++) = ((u_char*)&v)[1]; \
|
||||||
*(__b->last++) = ((u_char*)&v)[0];
|
*(__b->last++) = ((u_char*)&v)[0];
|
||||||
|
|
||||||
#define NGX_RTMP_USER_END(s) \
|
#define NGX_RTMP_USER_END(s) \
|
||||||
ngx_rtmp_prepare_message(&__h, __l, 0); \
|
ngx_rtmp_prepare_message(s, &__h, __l, 0); \
|
||||||
ngx_rtmp_send_message(s, __l); \
|
return ngx_rtmp_send_message(s, __l); \
|
||||||
return NGX_OK;
|
|
||||||
|
|
||||||
|
|
||||||
/* Protocol control messages */
|
/* Protocol control messages */
|
||||||
|
@ -202,8 +201,8 @@ ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, uint32_t csid, uint32_t msid,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (act.first) {
|
if (act.first) {
|
||||||
ngx_rtmp_prepare_message(&h, act.first, 0);
|
ngx_rtmp_prepare_message(s, &h, act.first, 0);
|
||||||
ngx_rtmp_send_message(s, act.first);
|
return ngx_rtmp_send_message(s, act.first);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NGX_OK;
|
return NGX_OK;
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/
|
ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/helo
|
||||||
|
|
Loading…
Reference in a new issue