From 96526896aa88e09f83af3efaa6da902ba44747ab Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 30 Aug 2012 18:40:12 +0400 Subject: [PATCH] live stream synchronization --- hls/ngx_rtmp_hls_module.c | 5 +-- ngx_rtmp_handler.c | 6 +++- ngx_rtmp_live_module.c | 69 ++++++++++++++++++++++++++++++++------- ngx_rtmp_live_module.h | 1 + 4 files changed, 67 insertions(+), 14 deletions(-) diff --git a/hls/ngx_rtmp_hls_module.c b/hls/ngx_rtmp_hls_module.c index f7ecbfb..bc5b63b 100644 --- a/hls/ngx_rtmp_hls_module.c +++ b/hls/ngx_rtmp_hls_module.c @@ -950,8 +950,9 @@ ngx_rtmp_hls_audio(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, --packet.size; } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "hls: audio buffer %uD", *(uint32_t*)packet.data); + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "hls: audio dts=%L timestamp=%uD", (int64_t)packet.dts, + h->timestamp); if (av_interleaved_write_frame(ctx->out_format, &packet) < 0) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 79465f7..dcf7582 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -694,9 +694,13 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, nmsg = (s->out_last - s->out_pos) % s->out_queue + 1; + if (priority > 3) { + priority = 3; + } + /* drop packet? * Note we always leave 1 slot free */ - if (nmsg + priority * s->out_queue / 16 >= s->out_queue) { + if (nmsg + priority * s->out_queue / 4 >= s->out_queue) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "RTMP drop message bufs=%ui, priority=%ui", nmsg, priority); diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index bca2c03..44617f0 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -53,6 +53,13 @@ static ngx_command_t ngx_rtmp_live_commands[] = { offsetof(ngx_rtmp_live_app_conf_t, buflen), NULL }, + { ngx_string("sync"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_live_app_conf_t, sync), + NULL }, + ngx_null_command }; @@ -99,6 +106,7 @@ ngx_rtmp_live_create_app_conf(ngx_conf_t *cf) lacf->meta = NGX_CONF_UNSET; lacf->nbuckets = NGX_CONF_UNSET; lacf->buflen = NGX_CONF_UNSET; + lacf->sync = NGX_CONF_UNSET; return lacf; } @@ -114,6 +122,7 @@ ngx_rtmp_live_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->meta, prev->meta, 1); ngx_conf_merge_value(conf->nbuckets, prev->nbuckets, 1024); ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 0); + ngx_conf_merge_msec_value(conf->sync, prev->sync, 0); conf->pool = ngx_create_pool(4096, &cf->cycle->new_log); if (conf->pool == NULL) { @@ -292,7 +301,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, { ngx_rtmp_live_ctx_t *ctx, *pctx; ngx_rtmp_codec_ctx_t *codec_ctx; - ngx_chain_t *out, *peer_out, *header_out, + ngx_chain_t *out, *peer_out, *header_out, *pheader_out, *meta; ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_live_app_conf_t *lacf; @@ -300,8 +309,10 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_header_t ch, lh; ngx_uint_t prio, peer_prio; ngx_uint_t peers, dropped_peers; - size_t header_offset; + size_t header_offset, last_offset; ngx_uint_t header_version, meta_version; + ngx_int_t diff_timestamp; + uint32_t *last; lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); if (lacf == NULL) { @@ -345,14 +356,16 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ch.csid = NGX_RTMP_LIVE_CSID_VIDEO; lh.timestamp = ctx->last_video; ctx->last_video = ch.timestamp; + last_offset = offsetof(ngx_rtmp_live_ctx_t, last_video); } else { - /* audio priority is the same as video key frame's */ - prio = NGX_RTMP_VIDEO_KEY_FRAME; + prio = 0; ch.csid = NGX_RTMP_LIVE_CSID_AUDIO; lh.timestamp = ctx->last_audio; ctx->last_audio = ch.timestamp; + last_offset = offsetof(ngx_rtmp_live_ctx_t, last_audio); } lh.csid = ch.csid; + diff_timestamp = ch.timestamp - lh.timestamp; out = ngx_rtmp_append_shared_bufs(cscf, NULL, in); ngx_rtmp_prepare_message(s, &ch, &lh, out); @@ -396,25 +409,39 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } ++peers; ss = pctx->session; + last = (uint32_t *)((u_char *)pctx + last_offset); + ch.timestamp = s->epoch + h->timestamp - ss->epoch; /* send absolute frame */ if ((pctx->msg_mask & (1 << h->type)) == 0) { - ch.timestamp = ngx_current_msec - ss->epoch; + + /* packet from the past for the peer */ + if (s->epoch + h->timestamp < ss->epoch) { + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: av: %s packet from the past %uD < %uD", + h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", + (uint32_t)(s->epoch + h->timestamp), (uint32_t)ss->epoch); + continue; + } + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: av: abs %s timestamp=%uD", h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", ch.timestamp); + /* send codec header as abs frame if any */ peer_out = ngx_rtmp_append_shared_bufs(cscf, NULL, header_out ? header_out : in); ngx_rtmp_prepare_message(s, &ch, NULL, peer_out); - pctx->msg_mask |= (1 << h->type); - if (ngx_rtmp_send_message(ss, peer_out, prio) == NGX_OK - && header_out) - { - *(ngx_uint_t *)((u_char *)pctx + header_offset) - = header_version; + if (ngx_rtmp_send_message(ss, peer_out, prio) == NGX_OK) { + pctx->msg_mask |= (1 << h->type); + if (header_out) { + *(ngx_uint_t *)((u_char *)pctx + header_offset) + = header_version; + *last = ch.timestamp; + } } + ngx_rtmp_free_shared_chain(cscf, peer_out); continue; } @@ -445,7 +472,27 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (ngx_rtmp_send_message(ss, out, peer_prio) != NGX_OK) { ++pctx->dropped; ++dropped_peers; + continue; } + + *last += diff_timestamp; + + if (lacf->sync == 0 || *last + lacf->sync >= ch.timestamp) { + continue; + } + + /* send absolute frame to sync stream */ + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, + "live: av: sync %s: %i", + h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio", + (ngx_int_t) (ch.timestamp - *last)); + + peer_out = ngx_rtmp_alloc_shared_buf(cscf); + ngx_rtmp_prepare_message(s, &ch, NULL, peer_out); + if (ngx_rtmp_send_message(ss, peer_out, 0) == NGX_OK) { + *last = ch.timestamp; + } + ngx_rtmp_free_shared_chain(cscf, peer_out); } ngx_rtmp_free_shared_chain(cscf, out); diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h index fdc5ffd..0d93e66 100644 --- a/ngx_rtmp_live_module.h +++ b/ngx_rtmp_live_module.h @@ -54,6 +54,7 @@ typedef struct { ngx_rtmp_live_stream_t **streams; ngx_flag_t live; ngx_flag_t meta; + ngx_msec_t sync; ngx_msec_t buflen; ngx_pool_t *pool; ngx_rtmp_live_stream_t *free_streams;