nginx-mod-rtmp/ngx_rtmp_receive.c

270 lines
6.7 KiB
C
Raw Normal View History

/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp.h"
2012-03-29 14:10:11 +02:00
#include "ngx_rtmp_amf.h"
#include <string.h>
ngx_int_t
2012-03-13 06:41:51 +01:00
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_buf_t *b;
u_char *p;
uint32_t val;
uint8_t limit;
ngx_connection_t *c;
c = s->connection;
b = in->buf;
if (b->last - b->pos < 4) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"too small buffer for %d message: %d",
(int)h->type, b->last - b->pos);
return NGX_OK;
}
2012-03-13 06:41:51 +01:00
p = (u_char*)&val;
p[0] = b->pos[3];
p[1] = b->pos[2];
p[2] = b->pos[1];
p[3] = b->pos[0];
switch(h->type) {
case NGX_RTMP_MSG_CHUNK_SIZE:
/* set chunk size =val */
2012-04-05 19:28:41 +02:00
ngx_rtmp_set_chunk_size(s, val);
break;
case NGX_RTMP_MSG_ABORT:
/* abort chunk stream =val */
break;
case NGX_RTMP_MSG_ACK:
/* receive ack with sequence number =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"receive ack seq=%uD", val);
break;
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"receive ack_size=%uD", val);
break;
case NGX_RTMP_MSG_BANDWIDTH:
if (b->last - b->pos >= 5) {
limit = *(uint8_t*)&b->pos[4];
(void)val;
(void)limit;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"receive bandwidth=%uD limit=%d",
val, (int)limit);
/* receive window size =val
* && limit */
}
break;
default:
return NGX_ERROR;
}
return NGX_OK;
}
ngx_int_t
2012-03-13 06:41:51 +01:00
ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_buf_t *b;
u_char *p;
uint16_t evt;
uint32_t val, arg;
ngx_connection_t *c;
c = s->connection;
b = in->buf;
if (b->last - b->pos < 6) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"too small buffer for user message: %d",
b->last - b->pos);
return NGX_OK;
}
2012-03-13 06:41:51 +01:00
p = (u_char*)&evt;
p[0] = b->pos[1];
p[1] = b->pos[0];
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP recv user evt %s (%d)",
ngx_rtmp_user_message_type(evt), (int)evt);
2012-03-13 06:41:51 +01:00
p = (u_char*)&val;
p[0] = b->pos[5];
p[1] = b->pos[4];
p[2] = b->pos[3];
p[3] = b->pos[2];
switch(evt) {
case NGX_RTMP_USER_STREAM_BEGIN:
/* use =val as stream id which started */
break;
case NGX_RTMP_USER_STREAM_EOF:
/* use =val as stream id which is over */
break;
case NGX_RTMP_USER_STREAM_DRY:
/* stream =val is dry */
break;
case NGX_RTMP_USER_SET_BUFLEN:
if (b->last - b->pos >= 10) {
2012-03-13 06:41:51 +01:00
p = (u_char*)&arg;
p[0] = b->pos[9];
p[1] = b->pos[8];
p[2] = b->pos[7];
p[3] = b->pos[6];
/* use =val as stream id && arg as buflen in msec*/
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"msid=%uD buflen: %uD (msec)", val, arg);
ngx_rtmp_send_user_stream_begin(s, val);
}
break;
case NGX_RTMP_USER_RECORDED:
/* stream =val is recorded */
break;
case NGX_RTMP_USER_PING_REQUEST:
ngx_rtmp_send_user_ping_response(s, val);
break;
case NGX_RTMP_USER_PING_RESPONSE:
/* use =val as incoming timestamp */
2012-05-26 06:33:41 +02:00
ngx_rtmp_reset_ping(s);
break;
default:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"unexpected user event: %d",
(int)evt);
return NGX_OK;
}
return NGX_OK;
}
2012-04-05 12:59:15 +02:00
static ngx_int_t
ngx_rtmp_amf_message_basic_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in, ngx_int_t name_typeless)
{
2012-04-05 12:59:15 +02:00
ngx_rtmp_amf_ctx_t act;
ngx_connection_t *c;
ngx_rtmp_core_main_conf_t *cmcf;
2012-03-21 16:08:59 +01:00
ngx_array_t *ch;
ngx_rtmp_handler_pt *ph;
size_t len, n;
static u_char func[128];
2012-04-05 12:59:15 +02:00
static ngx_rtmp_amf_elt_t elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
func, sizeof(func) },
};
2012-04-05 12:59:15 +02:00
/* AMF command names come with string type, but shared object names
* come without type */
if (name_typeless) {
elts[0].type |= NGX_RTMP_AMF_TYPELESS;
} else {
elts[0].type &= ~NGX_RTMP_AMF_TYPELESS;
}
c = s->connection;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
2012-03-29 14:10:11 +02:00
/* read AMF func name & transaction id */
act.link = in;
act.log = s->connection->log;
memset(func, 0, sizeof(func));
2012-03-29 14:10:11 +02:00
if (ngx_rtmp_amf_read(&act, elts,
sizeof(elts) / sizeof(elts[0])) != NGX_OK)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
2012-03-29 14:10:11 +02:00
"AMF cmd failed");
return NGX_ERROR;
}
len = ngx_strlen(func);
2012-03-29 14:10:11 +02:00
ch = ngx_hash_find(&cmcf->amf_hash,
ngx_hash_strlow(func, func, len), func, len);
2012-03-21 16:08:59 +01:00
if (ch && ch->nelts) {
ph = ch->elts;
for (n = 0; n < ch->nelts; ++n, ++ph) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0,
2012-03-29 14:10:11 +02:00
"AMF func '%s' passed to handler %d/%d",
2012-03-21 16:08:59 +01:00
func, n, ch->nelts);
switch ((*ph)(s, h, in)) {
case NGX_ERROR:
return NGX_ERROR;
case NGX_DONE:
return NGX_OK;
}
}
} else {
2012-03-17 23:16:59 +01:00
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
2012-03-29 14:10:11 +02:00
"AMF cmd '%s' no handler", func);
2012-03-21 16:08:59 +01:00
}
return NGX_OK;
}
2012-04-05 12:59:15 +02:00
ngx_int_t
ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
return ngx_rtmp_amf_message_basic_handler(s, h, in, 0);
}
ngx_int_t
ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
return ngx_rtmp_amf_message_basic_handler(s, h, in, 1);
}
ngx_int_t
2012-03-29 14:10:11 +02:00
ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
2012-03-29 14:10:11 +02:00
ngx_rtmp_amf_ctx_t act;
act.link = in;
act.log = s->connection->log;
2012-03-29 14:10:11 +02:00
return ngx_rtmp_amf_read(&act, elts, nelts);
}