fixed errors; now translation goes up to real a/v broadcast

This commit is contained in:
Roman Arutyunyan 2012-03-14 10:06:47 +04:00
parent 8e32bae842
commit 68c64116b2
7 changed files with 270 additions and 77 deletions

2
TODO
View file

@ -11,6 +11,8 @@
- closing session on send error
causes crash because of double-freeing stack
- recognize amf-meta
- shortcuts for big-endian copy
- implement loc confs (=fms apps)

View file

@ -317,7 +317,6 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AMF0_CMD]);
*eh = ngx_rtmp_amf0_message_handler;
/* init calls */
h = cmcf->calls.elts;
for(n = 0; n < cmcf->calls.nelts; ++n, ++h) {
@ -326,7 +325,7 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
calls_hash.hash = &cmcf->calls_hash;
calls_hash.key = ngx_hash_key_lc;
calls_hash.max_size = 1024;
calls_hash.max_size = 512;
calls_hash.bucket_size = ngx_cacheline_size;
calls_hash.name = "calls_hash";
calls_hash.pool = cf->pool;

View file

@ -150,7 +150,7 @@ typedef struct {
typedef struct ngx_rtmp_stream_t {
ngx_rtmp_header_t hdr;
ngx_rtmp_header_t hdr;
ngx_chain_t *in;
} ngx_rtmp_stream_t;
@ -257,7 +257,6 @@ typedef struct {
void ngx_rtmp_init_connection(ngx_connection_t *c);
/*void ngx_rtmp_close_session(ngx_rtmp_session_t *s);*/
void ngx_rtmp_close_connection(ngx_connection_t *c);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
@ -272,6 +271,8 @@ ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
/* Sending messages */
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
ngx_int_t ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s,
ngx_chain_t *out);
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *out, uint8_t fmt);
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);

View file

@ -136,7 +136,6 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
ctx->link = l;
b = l->buf;
b->pos = b->last = b->start;
}
size = b->end - b->last;

View file

@ -13,6 +13,30 @@ static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);
static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
typedef struct {
ngx_str_t name;
ngx_rtmp_call_handler_pt handler;
} ngx_rtmp_broadcast_map_t;
static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = {
{ ngx_string("connect"), ngx_rtmp_broadcast_connect },
{ ngx_string("createStream"), ngx_rtmp_broadcast_create_stream },
{ ngx_string("publish"), ngx_rtmp_broadcast_publish },
{ ngx_string("releaseStream"), ngx_rtmp_broadcast_ok },
{ ngx_string("FCPublish"), ngx_rtmp_broadcast_ok },
};
typedef struct {
/* use hash-map
@ -119,6 +143,20 @@ ngx_rtmp_broadcast_get_head(ngx_rtmp_session_t *s)
}
static void
ngx_rtmp_broadcast_set_flags(ngx_rtmp_session_t *s, ngx_uint_t flags)
{
ngx_rtmp_broadcast_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL) {
return;
}
ctx->flags |= flags;
}
static void
ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream,
ngx_uint_t flags)
@ -187,8 +225,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
{
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, *cctx;
ngx_chain_t *out, *l;
ngx_chain_t *out, *l, **ll;
u_char *p;
size_t nsubs;
c = s->connection;
@ -206,22 +245,25 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
/* copy data to output stream */
out = NULL;
ll = &out;
p = in->buf->pos;
for(;;) {
for ( ;; ) {
l = ngx_rtmp_alloc_shared_buf(s);
if (l == NULL || l->buf == NULL) {
return NGX_ERROR;
}
if (out == NULL) {
out = l;
}
*ll = l;
ll = &l->next;
while (l->buf->end - l->buf->last > in->buf->last - p) {
while (l->buf->end - l->buf->last >= in->buf->last - p) {
l->buf->last = ngx_cpymem(l->buf->last, p,
in->buf->last - p);
in = in->next;
if (in == NULL) {
goto done;
}
p = in->buf->pos;
}
@ -230,10 +272,14 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
p += (l->buf->end - l->buf->last);
}
done:
*ll = NULL;
ngx_rtmp_prepare_message(s, h, out, 0/*fmt*/);
/* broadcast to all subscribers */
for(cctx = *ngx_rtmp_broadcast_get_head(s);
nsubs = 0;
for (cctx = *ngx_rtmp_broadcast_get_head(s);
cctx; cctx = cctx->next)
{
if (cctx != ctx
@ -245,9 +291,17 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (ngx_rtmp_send_message(s, out) != NGX_OK) {
return NGX_ERROR;
}
++nsubs;
}
}
/* no one subscriber? */
if (!nsubs
&& ngx_rtmp_release_shared_buf(s, out) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
@ -298,22 +352,132 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans,
"connect() called; app='%s' url='%s'",
app, url);
if (0) {
app_str.data = app;
app_str.len = ngx_strlen(app);
ngx_rtmp_broadcast_join(s, &app_str, 0);
}
/*FIXME: app_str allocation!!!!!!! */
/*FIXME: add memsetting input data */
/* join stream */
ngx_str_set(&app_str, "preved");
/*
app_str.data = app;
app_str.len = ngx_strlen(app);
*/
ngx_rtmp_broadcast_join(s, &app_str, 0);
return ngx_rtmp_send_ack_size(s, 65536)
|| ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT)
|| ngx_rtmp_send_user_stream_begin(s, 1)
|| ngx_rtmp_send_amf0(s, 3, 1, out_elts,
return ngx_rtmp_send_ack_size(s, 2500000)
|| ngx_rtmp_send_bandwidth(s, 2500000, NGX_RTMP_LIMIT_DYNAMIC)
|| ngx_rtmp_send_user_stream_begin(s, 0)
|| ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_ERROR
: NGX_OK;
}
static ngx_int_t
ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, double in_trans,
ngx_chain_t *in)
{
static double trans;
static double stream;
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) },
};
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"createStream() called");
trans = in_trans;
stream = 1;
return ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans,
ngx_chain_t *in)
{
static double trans;
static u_char pub_name[1024];
static u_char pub_type[1024];
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
{ NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) },
{ NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"publish() called; pubName='%s' pubType='%s'",
pub_name, pub_type);
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER);
trans = in_trans;
ngx_str_set(&out_inf[0], "NetStream.Publish.Start");
ngx_str_set(&out_inf[1], "status");
ngx_str_set(&out_inf[2], "Publish succeeded.");
if (ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in)
{
static double trans;
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
trans = in_trans;
return ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
{
@ -321,21 +485,11 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
ngx_hash_key_t *h;
ngx_rtmp_disconnect_handler_pt *dh;
ngx_rtmp_event_handler_pt *avh;
ngx_rtmp_broadcast_map_t *bm;
size_t n, ncalls;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
/* add connect() handler */
h = ngx_array_push(&cmcf->calls);
if (h == NULL) {
return NGX_ERROR;
}
ngx_str_set(&h->key, "connect");
h->value = ngx_rtmp_broadcast_connect;
/* register audio/video broadcast handler */
avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
*avh = ngx_rtmp_broadcast_av;
@ -343,8 +497,7 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
*avh = ngx_rtmp_broadcast_av;
/* add disconnect handler */
/* register disconnect handler */
dh = ngx_array_push(&cmcf->disconnect);
if (dh == NULL) {
@ -352,6 +505,20 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
}
*dh = ngx_rtmp_broadcast_leave;
/* register AMF0 call handlers */
ncalls = sizeof(ngx_rtmp_broadcast_map)
/ sizeof(ngx_rtmp_broadcast_map[0]);
h = ngx_array_push_n(&cmcf->calls, ncalls);
if (h == NULL) {
return NGX_ERROR;
}
bm = ngx_rtmp_broadcast_map;
for(n = 0; n < ncalls; ++n, ++h, ++bm) {
h->key = bm->name;
h->value = bm->handler;
}
return NGX_OK;
}

View file

@ -426,6 +426,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
ngx_rtmp_close_connection(c);
return;
}
st->in->buf->end = st->in->buf->start + size;
st->in->buf->flush = 1;
}
@ -434,6 +435,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
/* anything remained from last iteration? */
if (b != NULL && b->recycled && b->pos < b->last) {
st->in->buf->pos = st->in->buf->start;
st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos,
b->last - b->pos);
b->recycled = 0;
@ -443,7 +445,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
b = in->buf;
if (b->flush) {
b->pos = b->last = b->start;
b->pos = b->last = b->start;
b->flush = 0;
}
@ -609,8 +611,9 @@ ngx_rtmp_recv(ngx_event_t *rev)
/* add used bufs to stream #0 */
st0 = &s->in_streams[0];
st->in->next = st0->in->next;
st0->in->next = head;
st->in->next = st0->in;
st0->in = head;
st->in = NULL;
}
s->in_csid = 0;
@ -633,7 +636,7 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *out, *l, *ln;
ngx_chain_t *out, *l;
c = wev->data;
s = c->data;
@ -663,7 +666,9 @@ ngx_rtmp_send(ngx_event_t *wev)
return;
}
if (out == NULL) {
if (out == s->out
&& s->out->buf->pos == s->out->buf->last)
{
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
@ -672,20 +677,28 @@ ngx_rtmp_send(ngx_event_t *wev)
return;
}
if (out != s->out) {
for(l = s->out; l->next && l->next != out; ) {
while(s->out) {
/* anyone still using this buffer? */
if (ngx_rtmp_buf_release(l->buf)) {
l = l->next;
continue;
}
l = s->out;
if (l->buf->pos < l->buf->last) {
break;
}
/* return buffer to core */
ln = l->next;
l->next = cscf->out_free;
cscf->out_free = l;
l = ln;
s->out = s->out->next;
/* anyone still using this buffer? */
if (ngx_rtmp_buf_release(l->buf)) {
continue;
}
/* return buffer to core */
if (ngx_rtmp_release_shared_buf(s, l)) {
ngx_rtmp_close_connection(c);
return;
}
if (s->out == out) {
break;
}
}
}
@ -701,14 +714,23 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
ngx_buf_t *b;
size_t size;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_connection_t *c;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (cscf->out_free) {
out = cscf->out_free;
cscf->out_free = out->next;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"reuse shared buf");
} else {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"alloc shared buf");
out = ngx_alloc_chain_link(cscf->out_pool);
if (out == NULL) {
return NULL;
@ -736,6 +758,30 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
}
ngx_int_t
ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s,
ngx_chain_t *out)
{
ngx_rtmp_core_srv_conf_t *cscf;
size_t nbufs;
ngx_chain_t *cl;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
for(nbufs = 1, cl = out; cl->next;
cl = cl->next, ++nbufs);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"release %d shared bufs", nbufs);
cl->next = cscf->out_free;
cscf->out_free = out;
return NGX_OK;
}
void
ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *out, uint8_t fmt)
@ -754,10 +800,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
++nbufs;
}
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP prep %s (%d) csid=%D timestamp=%D "
ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP prep %s (%d) fmt=%d csid=%D timestamp=%D "
"mlen=%D msid=%D nbufs=%d",
ngx_rtmp_packet_type(h->type), (int)h->type,
ngx_rtmp_packet_type(h->type), (int)h->type, (int)fmt,
h->csid, h->timestamp, mlen, h->msid, nbufs);
/* determine initial header size */
@ -922,27 +968,6 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
return NGX_OK;
}
/*
void
ngx_rtmp_close_session(ngx_rtmp_session_t *s)
{
size_t n;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_disconnect_handler_pt *h;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
h = cmcf->disconnect.elts;
for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) {
if (*h) {
(*h)(s);
}
}
ngx_destroy_pool(s->in_pool);
ngx_rtmp_close_connection(s->connection);
}
*/
void
ngx_rtmp_close_connection(ngx_connection_t *c)

View file

@ -193,7 +193,7 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
* because ngx_hash_find only returns one item;
* no good to patch NGINX core ;) */
ch = ngx_hash_find(&cmcf->calls_hash,
ngx_hash_key_lc(func, len), func, len);
ngx_hash_strlow(func, func, len), func, len);
if (ch) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,