Support for onTextData and onCuePoint in the data channel for the live module.

This commit is contained in:
Jeffrey Wescott 2014-10-30 15:17:51 -07:00
parent 5fb4c99ca9
commit de42f3801d
3 changed files with 156 additions and 1 deletions

View file

@ -357,6 +357,9 @@ ngx_rtmp_live_set_status(ngx_rtmp_session_t *s, ngx_chain_t *control,
ctx->cs[1].active = 0;
ctx->cs[1].dropped = 0;
ctx->cs[2].active = 0;
ctx->cs[2].dropped = 0;
}
@ -556,6 +559,7 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher)
ctx->cs[0].csid = NGX_RTMP_CSID_VIDEO;
ctx->cs[1].csid = NGX_RTMP_CSID_AUDIO;
ctx->cs[2].csid = NGX_RTMP_CSID_AMF;
if (!ctx->publishing && ctx->stream->active) {
ngx_rtmp_live_start(s);
@ -1036,6 +1040,145 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_live_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in, ngx_rtmp_amf_elt_t *out_elts, ngx_uint_t out_elts_size)
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_chain_t *data, *rpkt;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch;
ngx_int_t rc;
ngx_uint_t prio;
ngx_uint_t peers;
uint32_t delta;
ngx_rtmp_live_chunk_stream_t *cs;
u_char *msg_type;
msg_type = (u_char *)out_elts[0].data;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return NGX_ERROR;
}
if (!lacf->live || in == NULL || in->buf == NULL) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || ctx->stream == NULL) {
return NGX_OK;
}
if (ctx->publishing == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", msg_type);
return NGX_OK;
}
/* drop the data packet if the stream is not active */
if (!ctx->stream->active) {
return NGX_OK;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s packet timestamp=%uD",
msg_type, h->timestamp);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
cs = &ctx->cs[2];
cs->active = 1;
peers = 0;
prio = 0;
data = NULL;
rc = ngx_rtmp_append_amf(s, &data, NULL, out_elts, out_elts_size);
if (rc != NGX_OK) {
if (data) {
ngx_rtmp_free_shared_chain(cscf, data);
}
return NGX_ERROR;
}
ngx_memzero(&ch, sizeof(ch));
ch.timestamp = h->timestamp;
ch.msid = NGX_RTMP_MSID;
ch.csid = h->csid;
ch.type = NGX_RTMP_MSG_AMF_META;
delta = ch.timestamp - cs->timestamp;
rpkt = ngx_rtmp_append_shared_bufs(cscf, data, in);
ngx_rtmp_prepare_message(s, &ch, NULL, rpkt);
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx == ctx || pctx->paused) {
continue;
}
ss = pctx->session;
if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {
++pctx->ndropped;
cs->dropped += delta;
continue;
}
cs->timestamp += delta;
++peers;
ss->current_time = cs->timestamp;
}
if (data) {
ngx_rtmp_free_shared_chain(cscf, data);
}
if (rpkt) {
ngx_rtmp_free_shared_chain(cscf, rpkt);
}
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in_data, h->mlen);
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_live_on_cue_point(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onCuePoint", 0 }
};
return ngx_rtmp_live_data(s, h, in, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_live_on_text_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onTextData", 0 }
};
return ngx_rtmp_live_data(s, h, in, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
@ -1118,6 +1261,7 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_handler_pt *h;
ngx_rtmp_amf_handler_t *ch;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
@ -1149,5 +1293,13 @@ ngx_rtmp_live_postconfiguration(ngx_conf_t *cf)
next_stream_eof = ngx_rtmp_stream_eof;
ngx_rtmp_stream_eof = ngx_rtmp_live_stream_eof;
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "onTextData");
ch->handler = ngx_rtmp_live_on_text_data;
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "onCuePoint");
ch->handler = ngx_rtmp_live_on_cue_point;
return NGX_OK;
}

View file

@ -33,7 +33,7 @@ struct ngx_rtmp_live_ctx_s {
ngx_rtmp_live_stream_t *stream;
ngx_rtmp_live_ctx_t *next;
ngx_uint_t ndropped;
ngx_rtmp_live_chunk_stream_t cs[2];
ngx_rtmp_live_chunk_stream_t cs[3];
ngx_uint_t meta_version;
ngx_event_t idle_evt;
unsigned active:1;
@ -50,6 +50,7 @@ struct ngx_rtmp_live_stream_s {
ngx_rtmp_bandwidth_t bw_in;
ngx_rtmp_bandwidth_t bw_in_audio;
ngx_rtmp_bandwidth_t bw_in_video;
ngx_rtmp_bandwidth_t bw_in_data;
ngx_rtmp_bandwidth_t bw_out;
ngx_msec_t epoch;
unsigned active:1;

View file

@ -453,6 +453,8 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll,
NGX_RTMP_STAT_BW);
ngx_rtmp_stat_bw(r, lll, &stream->bw_in_video, "video",
NGX_RTMP_STAT_BW);
ngx_rtmp_stat_bw(r, lll, &stream->bw_in_data, "data",
NGX_RTMP_STAT_BW);
nclients = 0;
codec = NULL;