From de42f3801d646ffca31942ff468463a3586244c5 Mon Sep 17 00:00:00 2001 From: Jeffrey Wescott Date: Thu, 30 Oct 2014 15:17:51 -0700 Subject: [PATCH] Support for onTextData and onCuePoint in the data channel for the live module. --- ngx_rtmp_live_module.c | 152 +++++++++++++++++++++++++++++++++++++++++ ngx_rtmp_live_module.h | 3 +- ngx_rtmp_stat_module.c | 2 + 3 files changed, 156 insertions(+), 1 deletion(-) diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 5bebb9e..298a280 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -357,6 +357,9 @@ ngx_rtmp_live_set_status(ngx_rtmp_session_t *s, ngx_chain_t *control, ctx->cs[1].active = 0; ctx->cs[1].dropped = 0; + + ctx->cs[2].active = 0; + ctx->cs[2].dropped = 0; } @@ -556,6 +559,7 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher) ctx->cs[0].csid = NGX_RTMP_CSID_VIDEO; ctx->cs[1].csid = NGX_RTMP_CSID_AUDIO; + ctx->cs[2].csid = NGX_RTMP_CSID_AMF; if (!ctx->publishing && ctx->stream->active) { ngx_rtmp_live_start(s); @@ -1036,6 +1040,145 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_OK; } +static ngx_int_t +ngx_rtmp_live_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in, ngx_rtmp_amf_elt_t *out_elts, ngx_uint_t out_elts_size) +{ + ngx_rtmp_live_ctx_t *ctx, *pctx; + ngx_chain_t *data, *rpkt; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_session_t *ss; + ngx_rtmp_header_t ch; + ngx_int_t rc; + ngx_uint_t prio; + ngx_uint_t peers; + uint32_t delta; + ngx_rtmp_live_chunk_stream_t *cs; + + u_char *msg_type; + + msg_type = (u_char *)out_elts[0].data; + + lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); + if (lacf == NULL) { + return NGX_ERROR; + } + + if (!lacf->live || in == NULL || in->buf == NULL) { + return NGX_OK; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + if (ctx == NULL || ctx->stream == NULL) { + return NGX_OK; + } + + if (ctx->publishing == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: %s from non-publisher", msg_type); + return NGX_OK; + } + + /* drop the data packet if the stream is not active */ + if (!ctx->stream->active) { + return NGX_OK; + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: %s packet timestamp=%uD", + msg_type, h->timestamp); + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + cs = &ctx->cs[2]; + cs->active = 1; + + peers = 0; + prio = 0; + data = NULL; + rc = ngx_rtmp_append_amf(s, &data, NULL, out_elts, out_elts_size); + if (rc != NGX_OK) { + if (data) { + ngx_rtmp_free_shared_chain(cscf, data); + } + return NGX_ERROR; + } + + ngx_memzero(&ch, sizeof(ch)); + ch.timestamp = h->timestamp; + ch.msid = NGX_RTMP_MSID; + ch.csid = h->csid; + ch.type = NGX_RTMP_MSG_AMF_META; + + delta = ch.timestamp - cs->timestamp; + + rpkt = ngx_rtmp_append_shared_bufs(cscf, data, in); + ngx_rtmp_prepare_message(s, &ch, NULL, rpkt); + + for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) { + if (pctx == ctx || pctx->paused) { + continue; + } + + ss = pctx->session; + + if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) { + ++pctx->ndropped; + cs->dropped += delta; + continue; + } + + cs->timestamp += delta; + ++peers; + ss->current_time = cs->timestamp; + } + + if (data) { + ngx_rtmp_free_shared_chain(cscf, data); + } + + if (rpkt) { + ngx_rtmp_free_shared_chain(cscf, rpkt); + } + + ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen); + ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers); + ngx_rtmp_update_bandwidth(&ctx->stream->bw_in_data, h->mlen); + + return NGX_OK; +} + +static ngx_int_t +ngx_rtmp_live_on_cue_point(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + static ngx_rtmp_amf_elt_t out_elts[] = { + + { NGX_RTMP_AMF_STRING, + ngx_null_string, + "onCuePoint", 0 } + }; + + return ngx_rtmp_live_data(s, h, in, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); +} + +static ngx_int_t +ngx_rtmp_live_on_text_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + static ngx_rtmp_amf_elt_t out_elts[] = { + + { NGX_RTMP_AMF_STRING, + ngx_null_string, + "onTextData", 0 } + }; + + return ngx_rtmp_live_data(s, h, in, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); +} + static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) @@ -1118,6 +1261,7 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf) { ngx_rtmp_core_main_conf_t *cmcf; ngx_rtmp_handler_pt *h; + ngx_rtmp_amf_handler_t *ch; cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); @@ -1149,5 +1293,13 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf) next_stream_eof = ngx_rtmp_stream_eof; ngx_rtmp_stream_eof = ngx_rtmp_live_stream_eof; + ch = ngx_array_push(&cmcf->amf); + ngx_str_set(&ch->name, "onTextData"); + ch->handler = ngx_rtmp_live_on_text_data; + + ch = ngx_array_push(&cmcf->amf); + ngx_str_set(&ch->name, "onCuePoint"); + ch->handler = ngx_rtmp_live_on_cue_point; + return NGX_OK; } diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h index 71eca36..1eb59d4 100644 --- a/ngx_rtmp_live_module.h +++ b/ngx_rtmp_live_module.h @@ -33,7 +33,7 @@ struct ngx_rtmp_live_ctx_s { ngx_rtmp_live_stream_t *stream; ngx_rtmp_live_ctx_t *next; ngx_uint_t ndropped; - ngx_rtmp_live_chunk_stream_t cs[2]; + ngx_rtmp_live_chunk_stream_t cs[3]; ngx_uint_t meta_version; ngx_event_t idle_evt; unsigned active:1; @@ -50,6 +50,7 @@ struct ngx_rtmp_live_stream_s { ngx_rtmp_bandwidth_t bw_in; ngx_rtmp_bandwidth_t bw_in_audio; ngx_rtmp_bandwidth_t bw_in_video; + ngx_rtmp_bandwidth_t bw_in_data; ngx_rtmp_bandwidth_t bw_out; ngx_msec_t epoch; unsigned active:1; diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index 326a811..84bd48a 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -453,6 +453,8 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, NGX_RTMP_STAT_BW); ngx_rtmp_stat_bw(r, lll, &stream->bw_in_video, "video", NGX_RTMP_STAT_BW); + ngx_rtmp_stat_bw(r, lll, &stream->bw_in_data, "data", + NGX_RTMP_STAT_BW); nclients = 0; codec = NULL;