From dd75b749dab6aaebc65aeb8f20e5ed88f246a7c0 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Sat, 27 Apr 2013 19:51:01 +0400 Subject: [PATCH] implemented succeeded pushed count check in auto_push module --- ngx_rtmp_auto_push_module.c | 71 +++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/ngx_rtmp_auto_push_module.c b/ngx_rtmp_auto_push_module.c index 9da5dbf..21ce240 100644 --- a/ngx_rtmp_auto_push_module.c +++ b/ngx_rtmp_auto_push_module.c @@ -309,7 +309,8 @@ ngx_rtmp_auto_push_reconnect(ngx_event_t *ev) u_char *p; ngx_str_t *u; ngx_pid_t pid; - char *pname; + ngx_int_t npushed; + ngx_core_conf_t *ccf; ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "auto_push: reconnect"); @@ -336,6 +337,7 @@ ngx_rtmp_auto_push_reconnect(ngx_event_t *ev) } slot = ctx->slots; + npushed = 0; for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) { if (n == ngx_process_slot) { @@ -347,21 +349,8 @@ ngx_rtmp_auto_push_reconnect(ngx_event_t *ev) continue; } - /* - * This is a dirty way to skip "cache manager" and - * "cache loader" processes when pushing streams. - */ - - pname = ngx_processes[n].name; - if (pname && ngx_strstr(pname, "cache")) { - ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "auto_push: skip process slot=%i pid=%i " - "name='%s' pname='%s'", - n, (ngx_int_t) pid, ctx->name, pname); - continue; - } - if (*slot) { + npushed++; continue; } @@ -388,26 +377,54 @@ ngx_rtmp_auto_push_reconnect(ngx_event_t *ev) at.flash_ver.data = flash_ver; at.flash_ver.len = p - flash_ver; - ngx_log_debug5(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "auto_push: connect slot=%i pid=%i socket='%s' " - "name='%s' pname='%s'", - n, (ngx_int_t) pid, path, ctx->name, - pname ? pname : ""); + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: connect slot=%i pid=%P socket='%s' name='%s'", + n, pid, path, ctx->name); if (ngx_rtmp_relay_push(s, &name, &at) == NGX_OK) { *slot = 1; + npushed++; + continue; + } + + ngx_log_debug5(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: connect failed: slot=%i pid=%P socket='%s'" + "url='%V' name='%s'", + n, pid, path, u, ctx->name); + } + + ccf = (ngx_core_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx, + ngx_core_module); + + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "auto_push: pushed=%i total=%i failed=%i", + npushed, ccf->worker_processes, + ccf->worker_processes - 1 - npushed); + + if (ccf->worker_processes == npushed + 1) { + return; + } + + /* several workers failed */ + + slot = ctx->slots; + + for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) { + pid = ngx_processes[n].pid; + + if (n == ngx_process_slot || *slot == 1 || + pid == 0 || pid == NGX_INVALID_PID) + { continue; } ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, - "auto_push: connect failed: slot=%i pid=%i socket='%s'" - "url='%V' name='%s' pname='%s'", - n, (ngx_int_t) pid, path, u, ctx->name, - pname ? pname : ""); + "auto_push: connect failed: slot=%i pid=%P name='%s'", + n, pid, ctx->name); + } - if (!ctx->push_evt.timer_set) { - ngx_add_timer(&ctx->push_evt, apcf->push_reconnect); - } + if (!ctx->push_evt.timer_set) { + ngx_add_timer(&ctx->push_evt, apcf->push_reconnect); } }