mirror of
https://github.com/zotanmew/nginx-rtmp-module.git
synced 2024-05-15 00:11:08 +02:00
reimplemented output buffering && added shortcuts for sending all major kinds of messages
This commit is contained in:
parent
6769a5ac5d
commit
cdaf112e76
2
config
2
config
|
@ -7,5 +7,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
|
|||
$ngx_addon_dir/ngx_rtmp_handler.c \
|
||||
$ngx_addon_dir/ngx_rtmp_core_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_amf0.c \
|
||||
$ngx_addon_dir/ngx_rtmp_send.c \
|
||||
\
|
||||
$ngx_addon_dir/ngx_rtmp_netconn.c \
|
||||
$ngx_addon_dir/ngx_rtmp_netstream.c"
|
||||
|
|
144
ngx_rtmp.h
144
ngx_rtmp.h
|
@ -137,15 +137,19 @@ struct ngx_rtmp_session_s {
|
|||
|
||||
ngx_str_t *addr_text;
|
||||
|
||||
ngx_chain_t *free;
|
||||
|
||||
/* handshake */
|
||||
ngx_buf_t buf;
|
||||
ngx_uint_t hs_stage;
|
||||
|
||||
/* input
|
||||
* stream 0 (reserved by RTMP spec)
|
||||
* used for free chain link (0) */
|
||||
* used for free chain link */
|
||||
|
||||
/* TODO: make stream #1 handle ANY single stream;
|
||||
* that'll introduce support for
|
||||
* unlimited number of streams given
|
||||
* there's no interleaving between them */
|
||||
|
||||
ngx_rtmp_stream_t *streams;
|
||||
uint32_t in_csid;
|
||||
ngx_uint_t in_chunk_size;
|
||||
|
@ -172,12 +176,16 @@ typedef struct ngx_rtmp_session_s ngx_rtmp_session_t;
|
|||
typedef struct {
|
||||
ngx_msec_t timeout;
|
||||
ngx_flag_t so_keepalive;
|
||||
/*ngx_int_t buffers;*/
|
||||
ngx_int_t max_streams;
|
||||
ngx_msec_t resolver_timeout;
|
||||
ngx_resolver_t *resolver;
|
||||
ngx_rtmp_conf_ctx_t *ctx;
|
||||
|
||||
/* shared output buffers */
|
||||
ngx_uint_t out_chunk_size;
|
||||
ngx_pool_t *pool;
|
||||
ngx_chain_t *free;
|
||||
|
||||
ngx_rtmp_session_t **sessions; /* session hash map: name->session */
|
||||
|
||||
ngx_rtmp_conf_ctx_t *ctx;
|
||||
} ngx_rtmp_core_srv_conf_t;
|
||||
|
||||
|
||||
|
@ -201,42 +209,42 @@ typedef struct {
|
|||
* max 3 basic header
|
||||
* + max 11 message header
|
||||
* + max 4 extended header (timestamp) */
|
||||
#define NGX_RTMP_MAX_CHUNK_HEADER 18
|
||||
#define NGX_RTMP_MAX_CHUNK_HEADER 18
|
||||
|
||||
|
||||
/* RTMP packet types*/
|
||||
#define NGX_RTMP_PACKET_CHUNK_SIZE 1
|
||||
#define NGX_RTMP_PACKET_ABORT 2
|
||||
#define NGX_RTMP_PACKET_ACK 3
|
||||
#define NGX_RTMP_PACKET_CTL 4
|
||||
#define NGX_RTMP_PACKET_ACK_SIZE 5
|
||||
#define NGX_RTMP_PACKET_BANDWIDTH 6
|
||||
#define NGX_RTMP_PACKET_EDGE 7
|
||||
#define NGX_RTMP_PACKET_AUDIO 8
|
||||
#define NGX_RTMP_PACKET_VIDEO 9
|
||||
#define NGX_RTMP_PACKET_AMF3_META 15
|
||||
#define NGX_RTMP_PACKET_AMF3_SHARED 16
|
||||
#define NGX_RTMP_PACKET_AMF3_CMD 17
|
||||
#define NGX_RTMP_PACKET_AMF0_META 18
|
||||
#define NGX_RTMP_PACKET_AMF0_SHARED 19
|
||||
#define NGX_RTMP_PACKET_AMF0_CMD 20
|
||||
#define NGX_RTMP_PACKET_AGGREGATE 22
|
||||
/* RTMP message types*/
|
||||
#define NGX_RTMP_MSG_CHUNK_SIZE 1
|
||||
#define NGX_RTMP_MSG_ABORT 2
|
||||
#define NGX_RTMP_MSG_ACK 3
|
||||
#define NGX_RTMP_MSG_USER 4
|
||||
#define NGX_RTMP_MSG_ACK_SIZE 5
|
||||
#define NGX_RTMP_MSG_BANDWIDTH 6
|
||||
#define NGX_RTMP_MSG_EDGE 7
|
||||
#define NGX_RTMP_MSG_AUDIO 8
|
||||
#define NGX_RTMP_MSG_VIDEO 9
|
||||
#define NGX_RTMP_MSG_AMF3_META 15
|
||||
#define NGX_RTMP_MSG_AMF3_SHARED 16
|
||||
#define NGX_RTMP_MSG_AMF3_CMD 17
|
||||
#define NGX_RTMP_MSG_AMF0_META 18
|
||||
#define NGX_RTMP_MSG_AMF0_SHARED 19
|
||||
#define NGX_RTMP_MSG_AMF0_CMD 20
|
||||
#define NGX_RTMP_MSG_AGGREGATE 22
|
||||
|
||||
|
||||
/* RMTP control message types */
|
||||
#define NGX_RTMP_CTL_STREAM_BEGIN 0
|
||||
#define NGX_RTMP_CTL_STREAM_EOF 1
|
||||
#define NGX_RTMP_CTL_STREAM_DRY 2
|
||||
#define NGX_RTMP_CTL_SET_BUFLEN 3
|
||||
#define NGX_RTMP_CTL_RECORDED 4
|
||||
#define NGX_RTMP_CTL_PING_REQUEST 6
|
||||
#define NGX_RTMP_CTL_PING_RESPONSE 7
|
||||
#define NGX_RTMP_USER_STREAM_BEGIN 0
|
||||
#define NGX_RTMP_USER_STREAM_EOF 1
|
||||
#define NGX_RTMP_USER_STREAM_DRY 2
|
||||
#define NGX_RTMP_USER_SET_BUFLEN 3
|
||||
#define NGX_RTMP_USER_RECORDED 4
|
||||
#define NGX_RTMP_USER_PING_REQUEST 6
|
||||
#define NGX_RTMP_USER_PING_RESPONSE 7
|
||||
|
||||
|
||||
#define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */
|
||||
#define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */
|
||||
|
||||
#define NGX_RTMP_MAIN_CONF 0x02000000
|
||||
#define NGX_RTMP_SRV_CONF 0x04000000
|
||||
#define NGX_RTMP_MAIN_CONF 0x02000000
|
||||
#define NGX_RTMP_SRV_CONF 0x04000000
|
||||
|
||||
|
||||
#define NGX_RTMP_MAIN_CONF_OFFSET offsetof(ngx_rtmp_conf_ctx_t, main_conf)
|
||||
|
@ -261,26 +269,58 @@ void ngx_rtmp_init_connection(ngx_connection_t *c);
|
|||
void ngx_rtmp_close_session(ngx_rtmp_session_t *s);
|
||||
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
|
||||
|
||||
void ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size);
|
||||
void ngx_rtmp_set_bytes_read(ngx_rtmp_session_t *s, uint32_t bytes_read);
|
||||
void ngx_rtmp_set_client_buffer_time(ngx_rtmp_session_t *s, int16_t msec);
|
||||
void ngx_rtmp_clear_buffer(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_set_ping_time(ngx_rtmp_session_t *s, int16_t msec);
|
||||
void ngx_rtmp_set_server_bw(ngx_rtmp_session_t *s, uint32_t bw,
|
||||
uint8_t limit_type);
|
||||
void ngx_rtmp_set_client_bw(ngx_rtmp_session_t *s, uint32_t bw,
|
||||
uint8_t limit_type);
|
||||
/* Sending messages */
|
||||
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h,
|
||||
ngx_chain_t *out, uint8_t fmt);
|
||||
void ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
|
||||
|
||||
#define NGX_RTMP_LIMIT_SOFT 0
|
||||
#define NGX_RTMP_LIMIT_HARD 1
|
||||
#define NGX_RTMP_LIMIT_DYNAMIC 2
|
||||
|
||||
/* Protocol control messages */
|
||||
ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s,
|
||||
uint32_t chunk_size);
|
||||
ngx_int_t ngx_rtmp_send_abort(ngx_rtmp_session_t *s,
|
||||
uint32_t csid);
|
||||
ngx_int_t ngx_rtmp_send_ack(ngx_rtmp_session_t *s,
|
||||
uint32_t seq);
|
||||
ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s,
|
||||
uint32_t ack_size);
|
||||
ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_r *s,
|
||||
uint32_t ack_size, uint8_t limit_type);
|
||||
|
||||
/* User control messages */
|
||||
ngx_int_t ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s,
|
||||
uint32_t msid);
|
||||
ngx_int_t ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s,
|
||||
uint32_t msid);
|
||||
ngx_int_t ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s,
|
||||
uint32_t msid);
|
||||
ngx_int_t ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s,
|
||||
uint32_t msid, uint32_t buflen_msec);
|
||||
ngx_int_t ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s,
|
||||
uint32_t msid);
|
||||
ngx_int_t ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s,
|
||||
uint32_t timestamp);
|
||||
ngx_int_t ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s,
|
||||
uint32_t timestamp);
|
||||
|
||||
/* AMF0 sender/receiver */
|
||||
ngx_int_t ngx_rtmp_send_amf0(ngx_session_t *s,
|
||||
uint32_t csid, uint32_t msid,
|
||||
ngx_rtmp_amf0_elt_t *elts, size_t nelts);
|
||||
ngx_int_t ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in,
|
||||
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
|
||||
|
||||
|
||||
/************** will go to modules */
|
||||
|
||||
/* Broadcasting */
|
||||
void ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags);
|
||||
void ngx_rtmp_leave(ngx_rtmp_session_t *s);
|
||||
|
||||
ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_packet_hdr_t *h, ngx_chain_t *b);
|
||||
|
||||
void ngx_rtmp_send_packet(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_packet_hdr_t *h, ngx_chain_t *b);
|
||||
|
||||
|
||||
/* NetConnection methods */
|
||||
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s,
|
||||
double trans_id, ngx_chain_t *l);
|
||||
|
@ -291,7 +331,6 @@ ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s,
|
|||
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s
|
||||
double trans_id, , ngx_chain_t *l);
|
||||
|
||||
|
||||
/* NetStream methods */
|
||||
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s,
|
||||
double trans_id, ngx_chain_t *l);
|
||||
|
@ -312,7 +351,6 @@ ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s,
|
|||
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s,
|
||||
double trans_id, ngx_chain_t *l);
|
||||
|
||||
|
||||
extern ngx_uint_t ngx_rtmp_max_module;
|
||||
extern ngx_module_t ngx_rtmp_core_module;
|
||||
|
||||
|
|
|
@ -5,13 +5,7 @@
|
|||
#include "ngx_rtmp_amf0.h"
|
||||
#include "ngx_rtmp.h"
|
||||
#include <string.h>
|
||||
/*
|
||||
#define NGX_RTMP_AMF0_SWAP_VALUES(x, y) \
|
||||
(x) ^= (y); (y) ^= (x); (x) ^= (y)
|
||||
|
||||
#define NGX_RTMP_AMF0_REVERSE2(x) \
|
||||
NGX_RTMP_AMF0_SV(*(uint8_t*)(&x) , *((uint8_t*)(&x) + 1))
|
||||
*/
|
||||
static inline void*
|
||||
ngx_rtmp_amf0_reverse_copy(void *dst, void* src, size_t len)
|
||||
{
|
||||
|
@ -108,32 +102,33 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
|
|||
{
|
||||
ngx_buf_t *b;
|
||||
size_t size;
|
||||
ngx_chain_t **l, **free;
|
||||
ngx_chain_t *l, *ln;
|
||||
|
||||
#ifdef NGX_DEBUG
|
||||
ngx_rtmp_amf0_debug("write", ctx->log, (u_char*)p, n);
|
||||
#endif
|
||||
|
||||
l = ctx->link;
|
||||
free = ctx->free;
|
||||
|
||||
while(n) {
|
||||
b = (*l) ? (*l)->buf : NULL;
|
||||
b = l ? l->buf : NULL;
|
||||
|
||||
if (b == NULL || b->last == b->end) {
|
||||
if (*free == NULL)
|
||||
return NGX_ERROR;
|
||||
|
||||
if (*l == NULL) {
|
||||
*l = *free;
|
||||
*free = (*free)->next;
|
||||
} else {
|
||||
(*l)->next = *free;
|
||||
*free = (*free)->next;
|
||||
*l = (*l)->next;
|
||||
ln = ctx->alloc(ctx->arg);
|
||||
if (ln == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
(*l)->next = NULL;
|
||||
b = (*l)->buf;
|
||||
|
||||
if (l == NULL) {
|
||||
l = ln;
|
||||
ctx->first = l;
|
||||
} else {
|
||||
l->next = ln;
|
||||
l = ln;
|
||||
}
|
||||
|
||||
b = l->buf;
|
||||
b->pos = b->last = b->start;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
#include <ngx_config.h>
|
||||
#include <ngx_core.h>
|
||||
|
||||
|
||||
/*TODO: char -> u_char */
|
||||
|
||||
typedef struct {
|
||||
ngx_int_t type;
|
||||
char *name;
|
||||
|
@ -26,13 +29,19 @@ typedef struct {
|
|||
} ngx_rtmp_amf0_elt_t;
|
||||
|
||||
|
||||
typedef ngx_chain_t * (*ngx_rtmp_amf0_alloc_pt)
|
||||
|
||||
typedef struct {
|
||||
ngx_chain_t **link, **free;
|
||||
ngx_chain_t *link, *first;
|
||||
ngx_rtmp_amf0_alloc_pt alloc;
|
||||
void *arg;
|
||||
ngx_log_t *log;
|
||||
} ngx_rtmp_amf0_ctx_t;
|
||||
|
||||
|
||||
/*
|
||||
*
|
||||
* Examples:
|
||||
|
||||
struct {
|
||||
char name[32];
|
||||
|
|
|
@ -18,8 +18,6 @@ static char *ngx_rtmp_core_server(ngx_conf_t *cf, ngx_command_t *cmd,
|
|||
void *conf);
|
||||
static char *ngx_rtmp_core_listen(ngx_conf_t *cf, ngx_command_t *cmd,
|
||||
void *conf);
|
||||
static char *ngx_rtmp_core_resolver(ngx_conf_t *cf, ngx_command_t *cmd,
|
||||
void *conf);
|
||||
|
||||
|
||||
static ngx_conf_deprecated_t ngx_conf_deprecated_so_keepalive = {
|
||||
|
@ -58,13 +56,6 @@ static ngx_command_t ngx_rtmp_core_commands[] = {
|
|||
offsetof(ngx_rtmp_core_srv_conf_t, timeout),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("buffers"),
|
||||
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_num_slot,
|
||||
NGX_RTMP_SRV_CONF_OFFSET,
|
||||
offsetof(ngx_rtmp_core_srv_conf_t, buffers),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("max_streams"),
|
||||
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_num_slot,
|
||||
|
@ -72,18 +63,11 @@ static ngx_command_t ngx_rtmp_core_commands[] = {
|
|||
offsetof(ngx_rtmp_core_srv_conf_t, max_streams),
|
||||
NULL },
|
||||
|
||||
{ ngx_string("resolver"),
|
||||
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_1MORE,
|
||||
ngx_rtmp_core_resolver,
|
||||
NGX_RTMP_SRV_CONF_OFFSET,
|
||||
0,
|
||||
NULL },
|
||||
|
||||
{ ngx_string("resolver_timeout"),
|
||||
{ ngx_string("out_chunk_size"),
|
||||
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
|
||||
ngx_conf_set_msec_slot,
|
||||
ngx_conf_set_num_slot,
|
||||
NGX_RTMP_SRV_CONF_OFFSET,
|
||||
offsetof(ngx_rtmp_core_srv_conf_t, resolver_timeout),
|
||||
offsetof(ngx_rtmp_core_srv_conf_t, out_chunk_size),
|
||||
NULL },
|
||||
|
||||
ngx_null_command
|
||||
|
@ -159,12 +143,10 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf)
|
|||
*/
|
||||
|
||||
cscf->timeout = NGX_CONF_UNSET_MSEC;
|
||||
cscf->resolver_timeout = NGX_CONF_UNSET_MSEC;
|
||||
cscf->so_keepalive = NGX_CONF_UNSET;
|
||||
cscf->buffers = NGX_CONF_UNSET;
|
||||
conf->max_streams = NGX_CONF_UNSET;
|
||||
|
||||
cscf->resolver = NGX_CONF_UNSET_PTR;
|
||||
conf->out_chunk_size = NGX_CONF_UNSET;
|
||||
|
||||
return cscf;
|
||||
}
|
||||
|
@ -177,14 +159,12 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
|
|||
ngx_rtmp_core_srv_conf_t *conf = child;
|
||||
|
||||
ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 60000);
|
||||
ngx_conf_merge_msec_value(conf->resolver_timeout, prev->resolver_timeout,
|
||||
30000);
|
||||
|
||||
ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0);
|
||||
ngx_conf_merge_value(conf->buffers, prev->buffers, 16);
|
||||
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16);
|
||||
ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 128);
|
||||
|
||||
ngx_conf_merge_ptr_value(conf->resolver, prev->resolver, NULL);
|
||||
conf->pool = ngx_create_pool(4096, cf->log);
|
||||
|
||||
conf->sessions = ngx_pcalloc(cf->pool,
|
||||
sizeof(ngx_rtmp_session_t*) * NGX_RTMP_SESSION_HASH_SIZE);
|
||||
|
@ -511,30 +491,3 @@ ngx_rtmp_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
|
|||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
|
||||
static char *
|
||||
ngx_rtmp_core_resolver(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
|
||||
{
|
||||
ngx_rtmp_core_srv_conf_t *cscf = conf;
|
||||
|
||||
ngx_str_t *value;
|
||||
|
||||
value = cf->args->elts;
|
||||
|
||||
if (cscf->resolver != NGX_CONF_UNSET_PTR) {
|
||||
return "is duplicate";
|
||||
}
|
||||
|
||||
if (ngx_strcmp(value[1].data, "off") == 0) {
|
||||
cscf->resolver = NULL;
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
cscf->resolver = ngx_resolver_create(cf, &value[1], cf->args->nelts - 1);
|
||||
if (cscf->resolver == NULL) {
|
||||
return NGX_CONF_ERROR;
|
||||
}
|
||||
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
|
|
|
@ -394,8 +394,8 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
|||
st = &s->streams[s->csid];
|
||||
|
||||
if (st->in == NULL) {
|
||||
if ((st->in = ngx_alloc_chain_link(c->in_pool)) == NULL
|
||||
|| (sin->in->buf = ngx_calloc_buf(c->in_pool)) == NULL)
|
||||
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
|
||||
|| (sin->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||
"chain alloc failed");
|
||||
|
@ -588,7 +588,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
|||
/* handle! */
|
||||
head = st->in->next;
|
||||
st->in->next = NULL;
|
||||
if (ngx_rtmp_receive_packet(s, h, head) != NGX_OK) {
|
||||
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
|
||||
ngx_rtmp_close_session(s);
|
||||
return;
|
||||
}
|
||||
|
@ -606,13 +606,21 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
#define ngx_rtmp_buf_addref(b) \
|
||||
(++(ngx_int_t)(b)->tag)
|
||||
|
||||
|
||||
#define ngx_rtmp_buf_release(b) \
|
||||
(--(ngx_int_t)(b)->tag)
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_send(ngx_event_t *wev)
|
||||
{
|
||||
ngx_connection_t *c;
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
ngx_chain_t *l, *ll;
|
||||
ngx_chain_t *out, *l, *ln;
|
||||
|
||||
c = wev->data;
|
||||
s = c->data;
|
||||
|
@ -631,14 +639,14 @@ ngx_rtmp_send(ngx_event_t *wev)
|
|||
}
|
||||
|
||||
while(s->out) {
|
||||
l = c->send_chain(c, s->out, 0);
|
||||
out = c->send_chain(c, s->out, 0);
|
||||
|
||||
if (l == NGX_CHAIN_ERROR) {
|
||||
if (out == NGX_CHAIN_ERROR) {
|
||||
ngx_rtmp_close_session(s);
|
||||
return;
|
||||
}
|
||||
|
||||
if (l == NULL) {
|
||||
if (out == NULL) {
|
||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||
ngx_add_timer(c->write, cscf->timeout);
|
||||
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
|
||||
|
@ -647,12 +655,21 @@ ngx_rtmp_send(ngx_event_t *wev)
|
|||
return;
|
||||
}
|
||||
|
||||
if (l != s->out) {
|
||||
for(ll = s->out;
|
||||
ll->next && ll->next != l;
|
||||
ll = ll->next);
|
||||
ll->next = s->free;
|
||||
s->out = l;
|
||||
if (out != s->out) {
|
||||
for(l = s->out; l->next && l->next != out; ) {
|
||||
|
||||
/* anyone still using this buffer? */
|
||||
if (ngx_rtmp_buf_release(l->buf)) {
|
||||
l = l->next;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* return buffer to core */
|
||||
ln = l->next;
|
||||
l->next = cscf->free;
|
||||
cscf->free = l;
|
||||
l = ln;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -660,245 +677,169 @@ ngx_rtmp_send(ngx_event_t *wev)
|
|||
}
|
||||
|
||||
|
||||
ngx_rtmp_session_t**
|
||||
ngx_rtmp_get_session_head(ngx_rtmp_session_t *s)
|
||||
ngx_chain_t *
|
||||
ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_chain_t *out;
|
||||
ngx_buf_t *b;
|
||||
size_t size;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
|
||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
if (cscf->free) {
|
||||
out = cscf->free;
|
||||
cscf->free = out->next;
|
||||
|
||||
return &cscf->sessions[
|
||||
ngx_hash_key(s->name.data, s->name.len)
|
||||
% NGX_RTMP_SESSION_HASH_SIZE];
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags)
|
||||
{
|
||||
ngx_rtmp_session_t **ps;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
|
||||
if (s->name.len) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"already joined");
|
||||
return;
|
||||
}
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP join '%V'",
|
||||
&name);
|
||||
|
||||
s->name = *name;
|
||||
ps = ngx_rtmp_get_session_head(s);
|
||||
s->next = *ps;
|
||||
s->flags = flags;
|
||||
*ps = s;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_leave(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_rtmp_session_t **ps;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
|
||||
if (!s->name.len)
|
||||
return;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP leave '%V'",
|
||||
&s->name);
|
||||
|
||||
ps = ngx_rtmp_get_session_head(s);
|
||||
|
||||
ngx_str_null(&s->name);
|
||||
|
||||
for(; *ps; ps = &(*ps)->next) {
|
||||
if (*ps == s) {
|
||||
*ps = (*ps)->next;
|
||||
return;
|
||||
} else {
|
||||
out = ngx_alloc_chain_link(cscf->pool);
|
||||
if (out == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
out->buf = ngx_calloc_buf(cscf->pool);
|
||||
if (out->buf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
|
||||
|
||||
b = out->buf;
|
||||
b->start = ngx_palloc(cscf->pool, size);
|
||||
b->end = b->start + size;
|
||||
}
|
||||
|
||||
out->next = NULL;
|
||||
b = out->buf;
|
||||
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
|
||||
b->tag = (ngx_buf_tag_t)0;
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_send_packet(ngx_rtmp_session_t *s, ngx_rtmp_packet_hdr_t *h,
|
||||
ngx_chain_t *ll)
|
||||
ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h, ngx_chain_t *out,
|
||||
uint8_t fmt)
|
||||
{
|
||||
ngx_rtmp_packet_hdr_t *lh;
|
||||
ngx_int_t hsize, size, nbufs;
|
||||
ngx_chain_t *l, **pl;
|
||||
ngx_buf_t *b, *bb;
|
||||
u_char *p, *pp;
|
||||
uint8_t fmt;
|
||||
uint32_t timestamp, ext_timestamp, mlen;
|
||||
ngx_connection_t *c;
|
||||
|
||||
if (ll == NULL) {
|
||||
return;
|
||||
}
|
||||
ngx_chain_t *l;
|
||||
u_char *p;
|
||||
ngx_int_t hsize, thsize, nbufs;
|
||||
uint32_t mlen, timestamp, ext_timestamp;
|
||||
static uint8_t hdrsize[] = { 12, 8, 4, 1 };
|
||||
|
||||
/* detect packet size */
|
||||
mlen = 0;
|
||||
l = ll;
|
||||
nbufs = 0;
|
||||
while(l) {
|
||||
mlen += (l->buf->last - l->buf->pos);
|
||||
for(l = out; l; l = l->next) {
|
||||
mlen += (out->buf->last - l->buf->pos);
|
||||
++nbufs;
|
||||
l = l->next;
|
||||
}
|
||||
|
||||
c = s->connection;
|
||||
bb = ll->buf;
|
||||
pp = bb->pos;
|
||||
lh = &s->out_hdr;
|
||||
|
||||
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP send %s (%d) csid=%D timestamp=%D "
|
||||
"mlen=%D msid=%D nbufs=%d",
|
||||
ngx_rtmp_packet_type(h->type), (int)h->type,
|
||||
h->csid, h->timestamp, mlen, h->msid, nbufs);
|
||||
|
||||
while(ll) {
|
||||
if (s->free == NULL) {
|
||||
/* FIXME: implement proper packet dropper */
|
||||
return;
|
||||
}
|
||||
/* determine initial header size */
|
||||
hsize = hdrsize[fmt];
|
||||
|
||||
/* append new output buffer */
|
||||
l = s->free;
|
||||
s->free = s->free->next;
|
||||
l->next = NULL;
|
||||
for(pl = &s->out; *pl; pl = &(*pl)->next);
|
||||
*pl = l;
|
||||
b = l->buf;
|
||||
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
|
||||
|
||||
/* copy payload to new buffer leaving space for header */
|
||||
while (b->last < b->end) {
|
||||
size = b->end - b->last;
|
||||
if (size < bb->last - pp) {
|
||||
b->last = ngx_cpymem(b->last, pp, size);
|
||||
pp += size;
|
||||
break;
|
||||
}
|
||||
b->last = ngx_cpymem(b->last, pp, bb->last - pp);
|
||||
|
||||
ll = ll->next;
|
||||
if (ll == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bb = ll->buf;
|
||||
pp = bb->pos;
|
||||
}
|
||||
|
||||
/* FIXME: there can be some occasional
|
||||
* matches (h->msid == 0) on first out
|
||||
* packet when we compare it
|
||||
* against initially zeroed header;
|
||||
* Though it maybe OK */
|
||||
|
||||
/* fill header
|
||||
* we have
|
||||
* h - new header
|
||||
* lh - old header for diffs */
|
||||
fmt = 0;
|
||||
hsize = 12;
|
||||
|
||||
if (h->msid && lh->msid == h->msid) {
|
||||
++fmt;
|
||||
hsize -= 4;
|
||||
if (lh->type == h->type && lh->mlen == mlen) {
|
||||
++fmt;
|
||||
hsize -= 4;
|
||||
if (lh->timestamp == h->timestamp) {
|
||||
++fmt;
|
||||
hsize -= 3;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* message header */
|
||||
timestamp = (fmt ? h->timestamp
|
||||
: h->timestamp - lh->timestamp);
|
||||
if (h->timestamp >= 0x00ffffff) {
|
||||
timestamp = 0x00ffffff;
|
||||
ext_timestamp = h->timestamp;
|
||||
hsize += 4;
|
||||
} else {
|
||||
timestamp = h->timestamp;
|
||||
ext_timestamp = 0;
|
||||
|
||||
if (timestamp >= 0x00ffffff) {
|
||||
ext_timestamp = timestamp;
|
||||
timestamp = 0x00ffffff;
|
||||
hsize += 4;
|
||||
}
|
||||
|
||||
if (h->csid >= 64) {
|
||||
++hsize;
|
||||
if (h->csid >= 320) {
|
||||
++hsize;
|
||||
}
|
||||
}
|
||||
|
||||
/* now we know header size */
|
||||
b->pos -= hsize;
|
||||
p = b->pos;
|
||||
|
||||
/* basic header */
|
||||
*p = (fmt << 6);
|
||||
if (h->csid >= 2 && h->csid <= 63) {
|
||||
*p++ |= (((uint8_t)h->csid) & 0x3f);
|
||||
} else if (h->csid >= 64 && h->csid < 320) {
|
||||
++p;
|
||||
*p++ = (uint8_t)(h->csid - 64);
|
||||
} else {
|
||||
*p++ |= 1;
|
||||
*p++ = (uint8_t)(h->csid - 64);
|
||||
*p++ = (uint8_t)((h->csid - 64) >> 8);
|
||||
}
|
||||
|
||||
/* message header */
|
||||
if (fmt <= 2) {
|
||||
pp = (u_char*)×tamp;
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
if (fmt <= 1) {
|
||||
pp = (u_char*)&mlen;
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
*p++ = h->type;
|
||||
if (fmt == 0) {
|
||||
pp = (u_char*)&h->msid;
|
||||
*p++ = pp[0];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[3];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* extended header */
|
||||
if (ext_timestamp) {
|
||||
pp = (u_char*)&ext_timestamp;
|
||||
*p++ = pp[3];
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
}
|
||||
|
||||
*lh = *h;
|
||||
}
|
||||
|
||||
if (h->csid >= 64) {
|
||||
++hsize;
|
||||
if (h->csid >= 320) {
|
||||
++hsize;
|
||||
}
|
||||
}
|
||||
|
||||
/* fill initial header */
|
||||
out->buf->pos -= hsize;
|
||||
p = out->buf->pos;
|
||||
|
||||
/* basic header */
|
||||
*p = (fmt << 6);
|
||||
if (h->csid >= 2 && h->csid <= 63) {
|
||||
*p++ |= (((uint8_t)h->csid) & 0x3f);
|
||||
} else if (h->csid >= 64 && h->csid < 320) {
|
||||
++p;
|
||||
*p++ = (uint8_t)(h->csid - 64);
|
||||
} else {
|
||||
*p++ |= 1;
|
||||
*p++ = (uint8_t)(h->csid - 64);
|
||||
*p++ = (uint8_t)((h->csid - 64) >> 8);
|
||||
}
|
||||
|
||||
thsize = p - b->pos;
|
||||
|
||||
/* message header */
|
||||
if (fmt <= 2) {
|
||||
pp = (u_char*)×tamp;
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
if (fmt <= 1) {
|
||||
pp = (u_char*)&mlen;
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
*p++ = h->type;
|
||||
if (fmt == 0) {
|
||||
pp = (u_char*)&h->msid;
|
||||
*p++ = pp[0];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[3];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* extended header */
|
||||
if (ext_timestamp) {
|
||||
pp = (u_char*)&ext_timestamp;
|
||||
*p++ = pp[3];
|
||||
*p++ = pp[2];
|
||||
*p++ = pp[1];
|
||||
*p++ = pp[0];
|
||||
}
|
||||
|
||||
/* use the smallest fmt (3) for
|
||||
* trailing fragments */
|
||||
for(out = out->next; out; out = out->next) {
|
||||
out->pos -= hsize;
|
||||
ngx_memcpy(out->pos, b->pos, thsize);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
|
||||
{
|
||||
ngx_chain_t *l, **ll;
|
||||
|
||||
for(l = out; l; l = l->next) {
|
||||
ngx_rtmp_buf_addref(l->buf);
|
||||
}
|
||||
|
||||
/* TODO: optimize lookup */
|
||||
/* TODO: implement dropper */
|
||||
for(ll = &s->out; *ll; ll = &(*ll)->next);
|
||||
*ll = out;
|
||||
|
||||
ngx_rtmp_send(c->write);
|
||||
}
|
||||
|
||||
|
||||
ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
|
||||
static ngx_int_t
|
||||
ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_packet_hdr_t *h, ngx_chain_t *l)
|
||||
{
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
|
@ -951,16 +892,16 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
|
|||
#endif
|
||||
|
||||
switch(h->type) {
|
||||
case NGX_RTMP_PACKET_CHUNK_SIZE:
|
||||
case NGX_RTMP_MSG_CHUNK_SIZE:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_ABORT:
|
||||
case NGX_RTMP_MSG_ABORT:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_ACK:
|
||||
case NGX_RTMP_MSG_ACK:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_CTL:
|
||||
case NGX_RTMP_MSG_CTL:
|
||||
if (b->last - b->pos < 6)
|
||||
return NGX_ERROR;
|
||||
|
||||
|
@ -969,43 +910,44 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
|
|||
ping.v3 = (uint16_t*)(b->pos + 4);
|
||||
|
||||
switch(*ping.v1) {
|
||||
case NGX_RTMP_CTL_STREAM_BEGIN:
|
||||
case NGX_RTMP_USER_STREAM_BEGIN:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_STREAM_EOF:
|
||||
case NGX_RTMP_USER_STREAM_EOF:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_STREAM_DRY:
|
||||
case NGX_RTMP_USER_STREAM_DRY:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_SET_BUFLEN:
|
||||
case NGX_RTMP_USER_SET_BUFLEN:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_RECORDED:
|
||||
case NGX_RTMP_USER_RECORDED:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_PING_REQUEST:
|
||||
case NGX_RTMP_USER_PING_REQUEST:
|
||||
/* ping client from server */
|
||||
/**ping.v1 = NGX_RTMP_PING_PONG;
|
||||
ngx_rtmp_send_packet(s, h, l);*/
|
||||
ngx_rtmp_send_message(s, h, l);*/
|
||||
break;
|
||||
|
||||
case NGX_RTMP_CTL_PING_RESPONSE:
|
||||
case NGX_RTMP_USER_PING_RESPONSE:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_ACK_SIZE:
|
||||
case NGX_RTMP_MSG_ACK_SIZE:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_BANDWIDTH:
|
||||
case NGX_RTMP_MSG_BANDWIDTH:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_EDGE:
|
||||
case NGX_RTMP_MSG_EDGE:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_AUDIO:
|
||||
case NGX_RTMP_PACKET_VIDEO:
|
||||
case NGX_RTMP_MSG_AUDIO:
|
||||
case NGX_RTMP_MSG_VIDEO:
|
||||
/*
|
||||
if (!(s->flags & NGX_RTMP_PUBLISHER)) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"received audio/video from non-publisher");
|
||||
|
@ -1021,25 +963,25 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
|
|||
&& !ngx_strncmp(s->name.data, ss->name.data,
|
||||
s->name.len))
|
||||
{
|
||||
ngx_rtmp_send_packet(ss, h, l);
|
||||
ngx_rtmp_send_message(ss, h, l);
|
||||
}
|
||||
|
||||
}
|
||||
}*/
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_AMF3_META:
|
||||
case NGX_RTMP_PACKET_AMF3_SHARED:
|
||||
case NGX_RTMP_PACKET_AMF3_CMD:
|
||||
case NGX_RTMP_MSG_AMF3_META:
|
||||
case NGX_RTMP_MSG_AMF3_SHARED:
|
||||
case NGX_RTMP_MSG_AMF3_CMD:
|
||||
/* FIXME: AMF3 it not yet supported */
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_AMF0_META:
|
||||
case NGX_RTMP_MSG_AMF0_META:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_AMF0_SHARED:
|
||||
case NGX_RTMP_MSG_AMF0_SHARED:
|
||||
break;
|
||||
|
||||
case NGX_RTMP_PACKET_AMF0_CMD:
|
||||
case NGX_RTMP_MSG_AMF0_CMD:
|
||||
amf_ctx.link = &l;
|
||||
amf_ctx.free = &s->free;
|
||||
amf_ctx.log = c->log;
|
||||
|
@ -1147,3 +1089,71 @@ ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len)
|
|||
return p;
|
||||
}
|
||||
|
||||
/************* this will go to module ****************/
|
||||
ngx_rtmp_session_t **
|
||||
ngx_rtmp_get_session_head(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
|
||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
return &cscf->sessions[
|
||||
ngx_hash_key(s->name.data, s->name.len)
|
||||
% NGX_RTMP_SESSION_HASH_SIZE];
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags)
|
||||
{
|
||||
ngx_rtmp_session_t **ps;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
|
||||
if (s->name.len) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"already joined");
|
||||
return;
|
||||
}
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP join '%V'",
|
||||
&name);
|
||||
|
||||
s->name = *name;
|
||||
ps = ngx_rtmp_get_session_head(s);
|
||||
s->next = *ps;
|
||||
s->flags = flags;
|
||||
*ps = s;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_leave(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_rtmp_session_t **ps;
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
|
||||
if (!s->name.len)
|
||||
return;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||
"RTMP leave '%V'",
|
||||
&s->name);
|
||||
|
||||
ps = ngx_rtmp_get_session_head(s);
|
||||
|
||||
ngx_str_null(&s->name);
|
||||
|
||||
for(; *ps; ps = &(*ps)->next) {
|
||||
if (*ps == s) {
|
||||
*ps = (*ps)->next;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -2,37 +2,11 @@
|
|||
* Copyright (c) 2012 Roman Arutyunyan
|
||||
*/
|
||||
|
||||
#include "ngx_rtmp.h"
|
||||
#include "ngx_rtmp_amf0.h"
|
||||
|
||||
ngx_int_t
|
||||
ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *li)
|
||||
ngx_rtmp_connect(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in)
|
||||
{
|
||||
ngx_rtmp_packet_hdr_t h;
|
||||
ngx_chain_t lo, *lo_amf;
|
||||
ngx_buf_t bo;
|
||||
u_char buf[6], *p;
|
||||
uint16_t ctl_evt;
|
||||
uint32_t msid;
|
||||
uint32_t ack_size;
|
||||
uint8_t limit_type;
|
||||
ngx_rtmp_amf0_ctx_t amf_ctx;
|
||||
|
||||
static double trans;
|
||||
|
||||
static ngx_rtmp_amf0_elt_t inf[] = {
|
||||
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
|
||||
};
|
||||
|
||||
static ngx_rtmp_amf0_elt_t elts[] = {
|
||||
{ NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 },
|
||||
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
|
||||
{ NGX_RTMP_AMF0_NULL , 0, NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_OBJECT, 0, inf, sizeof(inf) },
|
||||
};
|
||||
|
||||
/* 1) send 'Window Acknowledgement Size'
|
||||
*
|
||||
* 2) send 'Set Peer Bandwidth'
|
||||
|
@ -50,85 +24,54 @@ ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *li)
|
|||
* level : "status",
|
||||
* description : "Connection succeeded." }
|
||||
*/
|
||||
memset(&h, 0, sizeof(h));
|
||||
h.timestamp = 0;
|
||||
h.csid = 2; /* standard */
|
||||
h.msid = 0;
|
||||
lo.buf = &bo;
|
||||
lo.next = NULL;
|
||||
|
||||
/* send Window Acknowledgement Size*/
|
||||
h.type = NGX_RTMP_PACKET_ACK_SIZE;
|
||||
ack_size = 65536;
|
||||
p = (u_char*)&ack_size;
|
||||
buf[0] = p[3];
|
||||
buf[1] = p[2];
|
||||
buf[2] = p[1];
|
||||
buf[3] = p[0];
|
||||
bo.start = bo.pos = buf;
|
||||
bo.end = bo.last = bo.start + 4;
|
||||
ngx_rtmp_send_packet(s, &h, &lo);
|
||||
static double trans;
|
||||
static char app[128];
|
||||
static char flashver[128];
|
||||
static char svfurl[128];
|
||||
|
||||
/* send Set Peer Bandwidth */
|
||||
h.type = NGX_RTMP_PACKET_BANDWIDTH;
|
||||
ack_size = 65536;
|
||||
limit_type = 1;
|
||||
p = (u_char*)&ack_size;
|
||||
buf[0] = p[3];
|
||||
buf[1] = p[2];
|
||||
buf[2] = p[1];
|
||||
buf[3] = p[0];
|
||||
buf[4] = limit_type;
|
||||
bo.start = bo.pos = buf;
|
||||
bo.end = bo.last = bo.start + 5;
|
||||
ngx_rtmp_send_packet(s, &h, &lo);
|
||||
static ngx_rtmp_amf0_elt_t in_cmd[] = {
|
||||
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
|
||||
{ NGX_RTMP_AMF0_STRING, "flashver", flashver, sizeof(flashver)},
|
||||
{ NGX_RTMP_AMF0_STRING, "swfurl", svfurl, sizeof(svfurl) },
|
||||
};
|
||||
|
||||
/* send STREAM_BEGIN */
|
||||
static ngx_rtmp_amf0_elt_t in_elts[] = {
|
||||
{ NGX_RTMP_AMF0_OBJECT, 0, in_cmd, sizeof(in_cmd) },
|
||||
{ NGX_RTMP_AMF0_NULL, 0, NULL, 0 },
|
||||
};
|
||||
|
||||
h.type = NGX_RTMP_PACKET_CTL;
|
||||
static ngx_rtmp_amf0_elt_t out_inf[] = {
|
||||
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
|
||||
};
|
||||
|
||||
msid = 1;
|
||||
ctl_evt = NGX_RTMP_CTL_STREAM_BEGIN;
|
||||
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
||||
{ NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 },
|
||||
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
|
||||
{ NGX_RTMP_AMF0_NULL , 0, NULL, 0 },
|
||||
{ NGX_RTMP_AMF0_OBJECT, 0, out_inf, sizeof(out_inf) },
|
||||
};
|
||||
|
||||
p = (u_char*)&ctl_evt;
|
||||
buf[0] = p[1];
|
||||
buf[1] = p[0];
|
||||
|
||||
p = (u_char*)&msid;
|
||||
buf[2] = p[3];
|
||||
buf[3] = p[2];
|
||||
buf[4] = p[1];
|
||||
buf[5] = p[0];
|
||||
|
||||
bo.start = bo.pos = buf;
|
||||
bo.end = bo.last = bo.start + sizeof(buf);
|
||||
|
||||
ngx_rtmp_send_packet(s, &h, &lo);
|
||||
|
||||
/* send 'connect' reply */
|
||||
h.type = NGX_RTMP_PACKET_AMF0_CMD;
|
||||
h.csid = s->csid;
|
||||
inf[0].data = "NetConnection.Connect.Success"; /* code */
|
||||
inf[0].len = strlen(inf[0].data);
|
||||
inf[1].data = "status"; /* level */
|
||||
inf[1].len = strlen(inf[1].data);
|
||||
inf[2].data = "Connection succeeded."; /* description */
|
||||
inf[2].len = strlen(inf[2].data);
|
||||
trans = 1;
|
||||
|
||||
lo_amf = NULL;
|
||||
amf_ctx.link = &lo_amf;
|
||||
amf_ctx.free = &s->free;
|
||||
amf_ctx.log = s->connection->log;
|
||||
if (ngx_rtmp_amf0_write(&amf_ctx, elts,
|
||||
sizeof(elts) / sizeof(elts[0])) != NGX_OK)
|
||||
if (ngx_rtmp_receive_amf0(s, in, in_elts,
|
||||
sizeof(in_elts) / sizeof(in_elts[0])))
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_rtmp_send_packet(s, &h, lo_amf);
|
||||
trans = in_trans;
|
||||
ngx_str_set(&inf[0], "NetConnection.Connect.Success");
|
||||
ngx_str_set(&inf[1], "status");
|
||||
ngx_str_set(&inf[2], "Connection succeeded.");
|
||||
|
||||
return NGX_OK;
|
||||
return ngx_rtmp_send_ack_size(s, 65536)
|
||||
|| ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT)
|
||||
|| ngx_rtmp_send_user_stream_begin(s, 1)
|
||||
|| ngx_rtmp_send_amf0(s, 3, 1, out_elts,
|
||||
sizeof(out_elts) / sizeof(out_elts[0]))
|
||||
? NGX_ERROR
|
||||
: NGX_OK;
|
||||
}
|
||||
|
||||
ngx_int_t
|
||||
|
|
Loading…
Reference in a new issue