From 68c64116b223b13b3d00c1aec529ef8b1ccfcc97 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Wed, 14 Mar 2012 10:06:47 +0400 Subject: [PATCH] fixed errors; now translation goes up to real a/v broadcast --- TODO | 2 + ngx_rtmp.c | 3 +- ngx_rtmp.h | 5 +- ngx_rtmp_amf0.c | 1 - ngx_rtmp_broadcast_module.c | 227 +++++++++++++++++++++++++++++++----- ngx_rtmp_handler.c | 107 ++++++++++------- ngx_rtmp_receive.c | 2 +- 7 files changed, 270 insertions(+), 77 deletions(-) diff --git a/TODO b/TODO index 98f9af0..fceef65 100644 --- a/TODO +++ b/TODO @@ -11,6 +11,8 @@ - closing session on send error causes crash because of double-freeing stack +- recognize amf-meta + - shortcuts for big-endian copy - implement loc confs (=fms apps) diff --git a/ngx_rtmp.c b/ngx_rtmp.c index 2f70208..845fc2b 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -317,7 +317,6 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AMF0_CMD]); *eh = ngx_rtmp_amf0_message_handler; - /* init calls */ h = cmcf->calls.elts; for(n = 0; n < cmcf->calls.nelts; ++n, ++h) { @@ -326,7 +325,7 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) calls_hash.hash = &cmcf->calls_hash; calls_hash.key = ngx_hash_key_lc; - calls_hash.max_size = 1024; + calls_hash.max_size = 512; calls_hash.bucket_size = ngx_cacheline_size; calls_hash.name = "calls_hash"; calls_hash.pool = cf->pool; diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 7207f68..21f8136 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -150,7 +150,7 @@ typedef struct { typedef struct ngx_rtmp_stream_t { - ngx_rtmp_header_t hdr; + ngx_rtmp_header_t hdr; ngx_chain_t *in; } ngx_rtmp_stream_t; @@ -257,7 +257,6 @@ typedef struct { void ngx_rtmp_init_connection(ngx_connection_t *c); -/*void ngx_rtmp_close_session(ngx_rtmp_session_t *s);*/ void ngx_rtmp_close_connection(ngx_connection_t *c); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); @@ -272,6 +271,8 @@ ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, /* Sending messages */ ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s); +ngx_int_t ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s, + ngx_chain_t *out); void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt); ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out); diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index 7147c24..67ac2ff 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -136,7 +136,6 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) ctx->link = l; b = l->buf; - b->pos = b->last = b->start; } size = b->end - b->last; diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index e4cabeb..6838798 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -13,6 +13,30 @@ static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf); static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in); + + +typedef struct { + ngx_str_t name; + ngx_rtmp_call_handler_pt handler; +} ngx_rtmp_broadcast_map_t; + + +static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = { + { ngx_string("connect"), ngx_rtmp_broadcast_connect }, + { ngx_string("createStream"), ngx_rtmp_broadcast_create_stream }, + { ngx_string("publish"), ngx_rtmp_broadcast_publish }, + { ngx_string("releaseStream"), ngx_rtmp_broadcast_ok }, + { ngx_string("FCPublish"), ngx_rtmp_broadcast_ok }, +}; + typedef struct { /* use hash-map @@ -119,6 +143,20 @@ ngx_rtmp_broadcast_get_head(ngx_rtmp_session_t *s) } +static void +ngx_rtmp_broadcast_set_flags(ngx_rtmp_session_t *s, ngx_uint_t flags) +{ + ngx_rtmp_broadcast_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + if (ctx == NULL) { + return; + } + + ctx->flags |= flags; +} + + static void ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream, ngx_uint_t flags) @@ -187,8 +225,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, { ngx_connection_t *c; ngx_rtmp_broadcast_ctx_t *ctx, *cctx; - ngx_chain_t *out, *l; + ngx_chain_t *out, *l, **ll; u_char *p; + size_t nsubs; c = s->connection; @@ -206,22 +245,25 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, /* copy data to output stream */ out = NULL; + ll = &out; p = in->buf->pos; - for(;;) { + for ( ;; ) { l = ngx_rtmp_alloc_shared_buf(s); if (l == NULL || l->buf == NULL) { return NGX_ERROR; } - if (out == NULL) { - out = l; - } + *ll = l; + ll = &l->next; - while (l->buf->end - l->buf->last > in->buf->last - p) { + while (l->buf->end - l->buf->last >= in->buf->last - p) { l->buf->last = ngx_cpymem(l->buf->last, p, in->buf->last - p); in = in->next; + if (in == NULL) { + goto done; + } p = in->buf->pos; } @@ -230,10 +272,14 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, p += (l->buf->end - l->buf->last); } +done: + *ll = NULL; + ngx_rtmp_prepare_message(s, h, out, 0/*fmt*/); /* broadcast to all subscribers */ - for(cctx = *ngx_rtmp_broadcast_get_head(s); + nsubs = 0; + for (cctx = *ngx_rtmp_broadcast_get_head(s); cctx; cctx = cctx->next) { if (cctx != ctx @@ -245,9 +291,17 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (ngx_rtmp_send_message(s, out) != NGX_OK) { return NGX_ERROR; } + ++nsubs; } } + /* no one subscriber? */ + if (!nsubs + && ngx_rtmp_release_shared_buf(s, out) != NGX_OK) + { + return NGX_ERROR; + } + return NGX_OK; } @@ -298,22 +352,132 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, "connect() called; app='%s' url='%s'", app, url); - if (0) { - app_str.data = app; - app_str.len = ngx_strlen(app); - ngx_rtmp_broadcast_join(s, &app_str, 0); - } + /*FIXME: app_str allocation!!!!!!! */ + /*FIXME: add memsetting input data */ + /* join stream */ + ngx_str_set(&app_str, "preved"); + /* + app_str.data = app; + app_str.len = ngx_strlen(app); + */ + ngx_rtmp_broadcast_join(s, &app_str, 0); - return ngx_rtmp_send_ack_size(s, 65536) - || ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT) - || ngx_rtmp_send_user_stream_begin(s, 1) - || ngx_rtmp_send_amf0(s, 3, 1, out_elts, + return ngx_rtmp_send_ack_size(s, 2500000) + || ngx_rtmp_send_bandwidth(s, 2500000, NGX_RTMP_LIMIT_DYNAMIC) + || ngx_rtmp_send_user_stream_begin(s, 0) + || ngx_rtmp_send_amf0(s, 3, 0, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) ? NGX_ERROR : NGX_OK; } +static ngx_int_t +ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, double in_trans, + ngx_chain_t *in) +{ + static double trans; + static double stream; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) }, + }; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "createStream() called"); + + trans = in_trans; + stream = 1; + + return ngx_rtmp_send_amf0(s, 3, 0, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); +} + + +static ngx_int_t +ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans, + ngx_chain_t *in) +{ + static double trans; + static u_char pub_name[1024]; + static u_char pub_type[1024]; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t in_elts[] = { + { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, + { NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) }, + { NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + }; + + + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "publish() called; pubName='%s' pubType='%s'", + pub_name, pub_type); + + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER); + + trans = in_trans; + ngx_str_set(&out_inf[0], "NetStream.Publish.Start"); + ngx_str_set(&out_inf[1], "status"); + ngx_str_set(&out_inf[2], "Publish succeeded."); + + if (ngx_rtmp_send_amf0(s, 3, 0, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in) +{ + static double trans; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + }; + + + trans = in_trans; + + return ngx_rtmp_send_amf0(s, 3, 0, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); +} + + static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) { @@ -321,21 +485,11 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) ngx_hash_key_t *h; ngx_rtmp_disconnect_handler_pt *dh; ngx_rtmp_event_handler_pt *avh; + ngx_rtmp_broadcast_map_t *bm; + size_t n, ncalls; cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); - - /* add connect() handler */ - h = ngx_array_push(&cmcf->calls); - - if (h == NULL) { - return NGX_ERROR; - } - - ngx_str_set(&h->key, "connect"); - h->value = ngx_rtmp_broadcast_connect; - - /* register audio/video broadcast handler */ avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]); *avh = ngx_rtmp_broadcast_av; @@ -343,8 +497,7 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); *avh = ngx_rtmp_broadcast_av; - - /* add disconnect handler */ + /* register disconnect handler */ dh = ngx_array_push(&cmcf->disconnect); if (dh == NULL) { @@ -352,6 +505,20 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) } *dh = ngx_rtmp_broadcast_leave; + + /* register AMF0 call handlers */ + ncalls = sizeof(ngx_rtmp_broadcast_map) + / sizeof(ngx_rtmp_broadcast_map[0]); + h = ngx_array_push_n(&cmcf->calls, ncalls); + if (h == NULL) { + return NGX_ERROR; + } + + bm = ngx_rtmp_broadcast_map; + for(n = 0; n < ncalls; ++n, ++h, ++bm) { + h->key = bm->name; + h->value = bm->handler; + } return NGX_OK; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index d247d4a..1e5bd09 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -426,6 +426,7 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_rtmp_close_connection(c); return; } + st->in->buf->end = st->in->buf->start + size; st->in->buf->flush = 1; } @@ -434,6 +435,7 @@ ngx_rtmp_recv(ngx_event_t *rev) /* anything remained from last iteration? */ if (b != NULL && b->recycled && b->pos < b->last) { + st->in->buf->pos = st->in->buf->start; st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos, b->last - b->pos); b->recycled = 0; @@ -443,7 +445,7 @@ ngx_rtmp_recv(ngx_event_t *rev) b = in->buf; if (b->flush) { - b->pos = b->last = b->start; + b->pos = b->last = b->start; b->flush = 0; } @@ -609,8 +611,9 @@ ngx_rtmp_recv(ngx_event_t *rev) /* add used bufs to stream #0 */ st0 = &s->in_streams[0]; - st->in->next = st0->in->next; - st0->in->next = head; + st->in->next = st0->in; + st0->in = head; + st->in = NULL; } s->in_csid = 0; @@ -633,7 +636,7 @@ ngx_rtmp_send(ngx_event_t *wev) ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; - ngx_chain_t *out, *l, *ln; + ngx_chain_t *out, *l; c = wev->data; s = c->data; @@ -663,7 +666,9 @@ ngx_rtmp_send(ngx_event_t *wev) return; } - if (out == NULL) { + if (out == s->out + && s->out->buf->pos == s->out->buf->last) + { cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { @@ -672,20 +677,28 @@ ngx_rtmp_send(ngx_event_t *wev) return; } - if (out != s->out) { - for(l = s->out; l->next && l->next != out; ) { + while(s->out) { - /* anyone still using this buffer? */ - if (ngx_rtmp_buf_release(l->buf)) { - l = l->next; - continue; - } + l = s->out; + if (l->buf->pos < l->buf->last) { + break; + } - /* return buffer to core */ - ln = l->next; - l->next = cscf->out_free; - cscf->out_free = l; - l = ln; + s->out = s->out->next; + + /* anyone still using this buffer? */ + if (ngx_rtmp_buf_release(l->buf)) { + continue; + } + + /* return buffer to core */ + if (ngx_rtmp_release_shared_buf(s, l)) { + ngx_rtmp_close_connection(c); + return; + } + + if (s->out == out) { + break; } } } @@ -701,14 +714,23 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) ngx_buf_t *b; size_t size; ngx_rtmp_core_srv_conf_t *cscf; + ngx_connection_t *c; + c = s->connection; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); if (cscf->out_free) { out = cscf->out_free; cscf->out_free = out->next; + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "reuse shared buf"); + } else { + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "alloc shared buf"); + out = ngx_alloc_chain_link(cscf->out_pool); if (out == NULL) { return NULL; @@ -736,6 +758,30 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) } +ngx_int_t +ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s, + ngx_chain_t *out) +{ + ngx_rtmp_core_srv_conf_t *cscf; + size_t nbufs; + ngx_chain_t *cl; + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + for(nbufs = 1, cl = out; cl->next; + cl = cl->next, ++nbufs); + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "release %d shared bufs", nbufs); + + cl->next = cscf->out_free; + cscf->out_free = out; + + + return NGX_OK; +} + + void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt) @@ -754,10 +800,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ++nbufs; } - ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "RTMP prep %s (%d) csid=%D timestamp=%D " + ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP prep %s (%d) fmt=%d csid=%D timestamp=%D " "mlen=%D msid=%D nbufs=%d", - ngx_rtmp_packet_type(h->type), (int)h->type, + ngx_rtmp_packet_type(h->type), (int)h->type, (int)fmt, h->csid, h->timestamp, mlen, h->msid, nbufs); /* determine initial header size */ @@ -922,27 +968,6 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, return NGX_OK; } -/* -void -ngx_rtmp_close_session(ngx_rtmp_session_t *s) -{ - size_t n; - ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_disconnect_handler_pt *h; - - cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); - - h = cmcf->disconnect.elts; - for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) { - if (*h) { - (*h)(s); - } - } - - ngx_destroy_pool(s->in_pool); - ngx_rtmp_close_connection(s->connection); -} -*/ void ngx_rtmp_close_connection(ngx_connection_t *c) diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index a51df76..e0b5cbd 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -193,7 +193,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, * because ngx_hash_find only returns one item; * no good to patch NGINX core ;) */ ch = ngx_hash_find(&cmcf->calls_hash, - ngx_hash_key_lc(func, len), func, len); + ngx_hash_strlow(func, func, len), func, len); if (ch) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,