From 8e32bae8429fae69d561f987fcf1b2033b91b195 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Tue, 13 Mar 2012 17:51:41 +0400 Subject: [PATCH] improved closing session on error --- TODO | 7 +- ngx_rtmp.h | 7 +- ngx_rtmp_amf0.c | 26 ++++--- ngx_rtmp_broadcast_module.c | 9 +-- ngx_rtmp_handler.c | 137 +++++++++++++++++++++++++----------- ngx_rtmp_receive.c | 10 +-- ngx_rtmp_send.c | 19 +++-- test/ffstream.sh | 2 +- 8 files changed, 144 insertions(+), 73 deletions(-) diff --git a/TODO b/TODO index 41f0fbf..98f9af0 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,4 @@ -- Implement modules support. +~ Implement modules support. Move AMF0 handlers to modules. Move broadcast to module. @@ -6,6 +6,11 @@ - packet dropping +- l <-> cl + +- closing session on send error + causes crash because of double-freeing stack + - shortcuts for big-endian copy - implement loc confs (=fms apps) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 80906ca..7207f68 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -257,7 +257,8 @@ typedef struct { void ngx_rtmp_init_connection(ngx_connection_t *c); -void ngx_rtmp_close_session(ngx_rtmp_session_t *s); +/*void ngx_rtmp_close_session(ngx_rtmp_session_t *s);*/ +void ngx_rtmp_close_connection(ngx_connection_t *c); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); @@ -271,9 +272,9 @@ ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, /* Sending messages */ ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s); -void ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, +void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt); -void ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out); +ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out); #define NGX_RTMP_LIMIT_SOFT 0 #define NGX_RTMP_LIMIT_HARD 1 diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index 4971fe0..7147c24 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -39,11 +39,17 @@ ngx_rtmp_amf0_debug(const char* op, ngx_log_t *log, u_char *p, size_t n) for(i = 0; i < n && i < NGX_RTMP_AMF0_DEBUG_SIZE; ++i) { *hp++ = ' '; - *hp++ = hex[(*p & 0xf0) >> 4]; - *hp++ = hex[*p & 0x0f]; - *sp++ = (*p >= 0x20 && *p <= 0x7e) ? - *p : (u_char)'?'; - ++p; + if (p) { + *hp++ = hex[(*p & 0xf0) >> 4]; + *hp++ = hex[*p & 0x0f]; + *sp++ = (*p >= 0x20 && *p <= 0x7e) ? + *p : (u_char)'?'; + ++p; + } else { + *hp++ = 'X'; + *hp++ = 'X'; + *sp++ = '?'; + } } *hp = *sp = '\0'; @@ -69,7 +75,7 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) b = l->buf; - if (b->last > n + b->pos) { + if (b->last >= n + b->pos) { if (p) { p = ngx_cpymem(p, b->pos, n); } @@ -128,6 +134,7 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) l = ln; } + ctx->link = l; b = l->buf; b->pos = b->last = b->start; } @@ -156,6 +163,7 @@ ngx_rtmp_amf0_read_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, uint16_t len; size_t n, namelen, maxlen; ngx_int_t rc; + u_char buf[2]; maxlen = 0; for(n = 0; n < nelts; ++n) { @@ -169,9 +177,11 @@ ngx_rtmp_amf0_read_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, char name[maxlen + 1]; /* read key */ - if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, buf, 2) != NGX_OK) return NGX_ERROR; + ngx_rtmp_amf0_reverse_copy(&len, buf, 2); + if (!len) break; @@ -241,7 +251,7 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n if (ngx_rtmp_amf0_get(ctx, buf, 8) != NGX_OK) { return NGX_ERROR; } - ngx_rtmp_amf0_reverse_copy(data, buf, 0); + ngx_rtmp_amf0_reverse_copy(data, buf, 8); break; case NGX_RTMP_AMF0_BOOLEAN: diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index a58bb65..e4cabeb 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -230,7 +230,7 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, p += (l->buf->end - l->buf->last); } - ngx_rtmp_prepare_message(h, out, 0/*fmt*/); + ngx_rtmp_prepare_message(s, h, out, 0/*fmt*/); /* broadcast to all subscribers */ for(cctx = *ngx_rtmp_broadcast_get_head(s); @@ -242,7 +242,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, && !ngx_strncmp(cctx->stream.data, ctx->stream.data, ctx->stream.len)) { - ngx_rtmp_send_message(s, out); + if (ngx_rtmp_send_message(s, out) != NGX_OK) { + return NGX_ERROR; + } } } @@ -261,7 +263,7 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, static ngx_rtmp_amf0_elt_t in_cmd[] = { { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, - { NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(url) }, + { NGX_RTMP_AMF0_STRING, "tcUrl" , url, sizeof(url) }, }; static ngx_rtmp_amf0_elt_t out_inf[] = { @@ -272,7 +274,6 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, static ngx_rtmp_amf0_elt_t in_elts[] = { { NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) }, - { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, }; static ngx_rtmp_amf0_elt_t out_elts[] = { diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index b15d342..d247d4a 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -23,8 +23,6 @@ static void ngx_rtmp_send(ngx_event_t *rev); static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -static void ngx_rtmp_close_connection(ngx_connection_t *c); - #ifdef NGX_DEBUG static char* ngx_rtmp_packet_type(uint8_t type) { @@ -207,14 +205,14 @@ ngx_rtmp_init_session(ngx_connection_t *c) s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module); if (s->ctx == NULL) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cscf->max_streams); if (s->in_streams == NULL) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -229,6 +227,9 @@ ngx_rtmp_init_session(ngx_connection_t *c) b->end = b->start + size; b->temporary = 1; + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP handshake done"); + c->write->handler = ngx_rtmp_handshake_send; c->read->handler = ngx_rtmp_handshake_recv; @@ -249,10 +250,14 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) s = c->data; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + if (c->destroyed) { + return; + } + if (rev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -267,7 +272,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -277,7 +282,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "invalid handshake signature"); - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } b->last += n; @@ -286,7 +291,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) if (n == NGX_AGAIN) { ngx_add_timer(rev, cscf->timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); } return; } @@ -322,10 +327,14 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) s = c->data; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + if (c->destroyed) { + return; + } + if (wev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -343,14 +352,14 @@ restart: } if (n == NGX_ERROR) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } if (n == NGX_AGAIN) { ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } } @@ -389,7 +398,11 @@ ngx_rtmp_recv(ngx_event_t *rev) b = NULL; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - for(;;) { + if (c->destroyed) { + return; + } + + for( ;; ) { st = &s->in_streams[s->in_csid]; @@ -399,7 +412,7 @@ ngx_rtmp_recv(ngx_event_t *rev) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "chain alloc failed"); - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -410,7 +423,7 @@ ngx_rtmp_recv(ngx_event_t *rev) if (st->in->buf->start == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "buf alloc failed"); - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } st->in->buf->flush = 1; @@ -437,13 +450,13 @@ ngx_rtmp_recv(ngx_event_t *rev) n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); } return; } @@ -480,7 +493,7 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "RTMP chunk stream too big: %D >= %D", csid, cscf->max_streams); - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -589,7 +602,7 @@ ngx_rtmp_recv(ngx_event_t *rev) head = st->in->next; st->in->next = NULL; if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } b->pos += h->mlen; @@ -607,30 +620,34 @@ ngx_rtmp_recv(ngx_event_t *rev) #define ngx_rtmp_buf_addref(b) \ - (++*(int*)&(b)->tag) + (++*(int*)&((b)->tag)) #define ngx_rtmp_buf_release(b) \ - (--*(int*)&(b)->tag) + (--*(int*)&((b)->tag)) static void ngx_rtmp_send(ngx_event_t *wev) { - ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_chain_t *out, *l, *ln; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_chain_t *out, *l, *ln; c = wev->data; s = c->data; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + if (c->destroyed) { + return; + } + if (wev->timedout) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); c->timedout = 1; - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -642,7 +659,7 @@ ngx_rtmp_send(ngx_event_t *wev) out = c->send_chain(c, s->out, 0); if (out == NGX_CHAIN_ERROR) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); return; } @@ -650,7 +667,7 @@ ngx_rtmp_send(ngx_event_t *wev) cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { - ngx_rtmp_close_session(s); + ngx_rtmp_close_connection(c); } return; } @@ -713,14 +730,15 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) b = out->buf; b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER; b->tag = (ngx_buf_tag_t)0; + b->memory = 1; return out; } void -ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, - uint8_t fmt) +ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *out, uint8_t fmt) { ngx_chain_t *l; u_char *p, *pp; @@ -735,13 +753,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, mlen += (out->buf->last - l->buf->pos); ++nbufs; } -/* - ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP send %s (%d) csid=%D timestamp=%D " + + ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP prep %s (%d) csid=%D timestamp=%D " "mlen=%D msid=%D nbufs=%d", ngx_rtmp_packet_type(h->type), (int)h->type, h->csid, h->timestamp, mlen, h->msid, nbufs); -*/ + /* determine initial header size */ hsize = hdrsize[fmt]; @@ -815,27 +833,41 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, * trailing fragments */ p = out->buf->pos; for(out = out->next; out; out = out->next) { - out->buf->pos -= hsize; + out->buf->pos -= thsize; ngx_memcpy(out->buf->pos, p, thsize); } } -void +ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) { - ngx_chain_t *l, **ll; + ngx_chain_t *l, **ll; + size_t nbytes, nbufs; + ngx_connection_t *c; + + c = s->connection; + nbytes = 0; + nbufs = 0; for(l = out; l; l = l->next) { ngx_rtmp_buf_addref(l->buf); + nbytes += (l->buf->last - l->buf->pos); + ++nbufs; } + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP send nbytes=%d, nbufs=%d", + nbytes, nbufs); + /* TODO: optimize lookup */ /* TODO: implement dropper */ for(ll = &s->out; *ll; ll = &(*ll)->next); *ll = out; ngx_rtmp_send(s->connection->write); + + return c->destroyed ? NGX_ERROR : NGX_OK; } @@ -890,7 +922,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, return NGX_OK; } - +/* void ngx_rtmp_close_session(ngx_rtmp_session_t *s) { @@ -910,15 +942,38 @@ ngx_rtmp_close_session(ngx_rtmp_session_t *s) ngx_destroy_pool(s->in_pool); ngx_rtmp_close_connection(s->connection); } - +*/ void ngx_rtmp_close_connection(ngx_connection_t *c) { - ngx_pool_t *pool; + ngx_rtmp_session_t *s; + ngx_pool_t *pool; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_disconnect_handler_pt *h; + size_t n; - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "close connection: %d", c->fd); + if (c->destroyed) { + return; + } + + s = c->data; + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); + + if (s) { + h = cmcf->disconnect.elts; + for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) { + if (*h) { + (*h)(s); + } + } + + if (s->in_pool) { + ngx_destroy_pool(s->in_pool); + } + } c->destroyed = 1; pool = c->pool; diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index be90d81..a51df76 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -159,7 +159,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_amf0_ctx_t act; ngx_connection_t *c; ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_call_handler_pt *ch; + ngx_rtmp_call_handler_pt ch; size_t len; static double trans; @@ -167,7 +167,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, static ngx_rtmp_amf0_elt_t elts[] = { { NGX_RTMP_AMF0_STRING, 0, func, sizeof(func) }, - { NGX_RTMP_AMF0_NUMBER, 0, NULL, 0 }, + { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, }; c = s->connection; @@ -197,13 +197,13 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, if (ch) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "AMF0 func '%s' @%f passed to handler", func, trans); + "AMF0 func '%s' trans=%f passed to handler", func, trans); - return (*ch)(s, trans, in); + return ch(s, trans, in); } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "AMF0 cmd '%s' @%f no handler", func, trans); + "AMF0 cmd '%s' trans=%f no handler", func, trans); return NGX_OK; } diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index 28d7aba..8566ce1 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -7,7 +7,7 @@ #include "ngx_rtmp_amf0.h" -#define NGX_RTMP_USER_START(s, tp) \ +#define NGX_RTMP_USER_START(s, tp) \ ngx_rtmp_header_t __h; \ ngx_chain_t *__l; \ ngx_buf_t *__b; \ @@ -22,23 +22,22 @@ __b = __l->buf; #define NGX_RTMP_UCTL_START(s, type, utype) \ - NGX_RTMP_USER_START(s, type); \ + NGX_RTMP_USER_START(s, type); \ *(__b->last++) = (u_char)((utype) >> 8); \ *(__b->last++) = (u_char)(utype); -#define NGX_RTMP_USER_OUT1(v) \ +#define NGX_RTMP_USER_OUT1(v) \ *(__b->last++) = ((u_char*)&v)[0]; -#define NGX_RTMP_USER_OUT4(v) \ +#define NGX_RTMP_USER_OUT4(v) \ *(__b->last++) = ((u_char*)&v)[3]; \ *(__b->last++) = ((u_char*)&v)[2]; \ *(__b->last++) = ((u_char*)&v)[1]; \ *(__b->last++) = ((u_char*)&v)[0]; -#define NGX_RTMP_USER_END(s) \ - ngx_rtmp_prepare_message(&__h, __l, 0); \ - ngx_rtmp_send_message(s, __l); \ - return NGX_OK; +#define NGX_RTMP_USER_END(s) \ + ngx_rtmp_prepare_message(s, &__h, __l, 0); \ + return ngx_rtmp_send_message(s, __l); \ /* Protocol control messages */ @@ -202,8 +201,8 @@ ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, uint32_t csid, uint32_t msid, } if (act.first) { - ngx_rtmp_prepare_message(&h, act.first, 0); - ngx_rtmp_send_message(s, act.first); + ngx_rtmp_prepare_message(s, &h, act.first, 0); + return ngx_rtmp_send_message(s, act.first); } return NGX_OK; diff --git a/test/ffstream.sh b/test/ffstream.sh index 1a9a561..146219c 100755 --- a/test/ffstream.sh +++ b/test/ffstream.sh @@ -1 +1 @@ -ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/ +ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/helo