obs-outputs: Add support for "RTMP Go Away" feature

This commit is contained in:
jp9000 2021-11-08 05:31:04 -08:00
parent b4fb1db460
commit a593fe6755
2 changed files with 95 additions and 16 deletions

View file

@ -89,6 +89,11 @@ static inline bool disconnected(struct rtmp_stream *stream)
return os_atomic_load_bool(&stream->disconnected);
}
static inline bool silently_reconnecting(struct rtmp_stream *stream)
{
return os_atomic_load_bool(&stream->silent_reconnect);
}
static void rtmp_stream_destroy(void *data)
{
struct rtmp_stream *stream = data;
@ -249,6 +254,27 @@ static inline bool get_next_packet(struct rtmp_stream *stream,
return new_packet;
}
static inline void peek_next_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
pthread_mutex_lock(&stream->packets_mutex);
circlebuf_peek_front(&stream->packets, packet,
sizeof(struct encoder_packet));
pthread_mutex_unlock(&stream->packets_mutex);
}
static void reinsert_packet_at_front(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
pthread_mutex_lock(&stream->packets_mutex);
circlebuf_push_front(&stream->packets, packet,
sizeof(struct encoder_packet));
pthread_mutex_unlock(&stream->packets_mutex);
os_sem_post(stream->send_sem);
}
#define RTMP_PACKET_TYPE_RECONNECT 0x20
static bool process_recv_data(struct rtmp_stream *stream, size_t size)
{
RTMP *rtmp = &stream->rtmp;
@ -265,7 +291,9 @@ static bool process_recv_data(struct rtmp_stream *stream, size_t size)
}
if (packet.m_body) {
/* do processing here */
if (packet.m_packetType == RTMP_PACKET_TYPE_RECONNECT) {
os_atomic_set_bool(&stream->silent_reconnect, true);
}
RTMPPacket_Free(&packet);
}
return true;
@ -553,6 +581,7 @@ static void dbr_add_frame(struct rtmp_stream *stream, struct dbr_frame *back)
}
static void dbr_set_bitrate(struct rtmp_stream *stream);
static bool rtmp_stream_start(void *data);
static void *send_thread(void *data)
{
@ -585,6 +614,14 @@ static void *send_thread(void *data)
}
}
/* silent reconnect signal received from server, reconnect on
* next keyframe */
if (silently_reconnecting(stream) &&
packet.type == OBS_ENCODER_VIDEO && packet.keyframe) {
reinsert_packet_at_front(stream, &packet);
break;
}
if (stream->dbr_enabled) {
dbr_frame.send_beg = os_gettime_ns();
dbr_frame.size = packet.size;
@ -610,6 +647,8 @@ static void *send_thread(void *data)
info("Disconnected from %s", stream->path.array);
} else if (encode_error) {
info("Encoder error, disconnecting");
} else if (silently_reconnecting(stream)) {
info("Silent reconnect signal received from server");
} else {
info("User stopped the stream");
}
@ -635,18 +674,35 @@ static void *send_thread(void *data)
if (!stopping(stream)) {
pthread_detach(stream->send_thread);
obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
if (!silently_reconnecting(stream))
obs_output_signal_stop(stream->output,
OBS_OUTPUT_DISCONNECTED);
} else if (encode_error) {
obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR);
} else {
obs_output_end_data_capture(stream->output);
}
free_packets(stream);
os_event_reset(stream->stop_event);
os_atomic_set_bool(&stream->active, false);
if (!silently_reconnecting(stream)) {
free_packets(stream);
os_event_reset(stream->stop_event);
os_atomic_set_bool(&stream->active, false);
}
stream->sent_headers = false;
/* reset bitrate on stop */
if (stream->dbr_enabled) {
if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
dbr_set_bitrate(stream);
}
}
if (silently_reconnecting(stream)) {
rtmp_stream_start(stream);
}
return NULL;
}
@ -768,7 +824,8 @@ static int init_send(struct rtmp_stream *stream)
adjust_sndbuf_size(stream, MIN_SENDBUF_SIZE);
#endif
reset_semaphore(stream);
if (!silently_reconnecting(stream))
reset_semaphore(stream);
ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
if (ret != 0) {
@ -883,7 +940,8 @@ static int init_send(struct rtmp_stream *stream)
return OBS_OUTPUT_DISCONNECTED;
}
obs_output_begin_data_capture(stream->output, 0);
if (!silently_reconnecting(stream))
obs_output_begin_data_capture(stream->output, 0);
return OBS_OUTPUT_SUCCESS;
}
@ -945,6 +1003,12 @@ static void win32_log_interface_type(struct rtmp_stream *stream)
}
#endif
static void add_connect_data(char **penc, char *pend)
{
const AVal val = AVC("supportsGoAway");
*penc = AMF_EncodeNamedBoolean(*penc, pend, &val, true);
}
static int try_connect(struct rtmp_stream *stream)
{
if (dstr_is_empty(&stream->path)) {
@ -975,6 +1039,7 @@ static int try_connect(struct rtmp_stream *stream)
set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
stream->rtmp.Link.customConnectEncode = add_connect_data;
if (dstr_is_empty(&stream->bind_ip) ||
dstr_cmp(&stream->bind_ip, "default") == 0) {
@ -1115,9 +1180,17 @@ static void *connect_thread(void *data)
os_set_thread_name("rtmp-stream: connect_thread");
if (!init_connect(stream)) {
obs_output_signal_stop(stream->output, OBS_OUTPUT_BAD_PATH);
return NULL;
if (!silently_reconnecting(stream)) {
if (!init_connect(stream)) {
obs_output_signal_stop(stream->output,
OBS_OUTPUT_BAD_PATH);
os_atomic_set_bool(&stream->silent_reconnect, false);
return NULL;
}
} else {
struct encoder_packet packet;
peek_next_packet(stream, &packet);
stream->start_dts_offset = get_ms_time(&packet, packet.dts);
}
ret = try_connect(stream);
@ -1130,6 +1203,7 @@ static void *connect_thread(void *data)
if (!stopping(stream))
pthread_detach(stream->connect_thread);
os_atomic_set_bool(&stream->silent_reconnect, false);
os_atomic_set_bool(&stream->connecting, false);
return NULL;
}
@ -1138,10 +1212,12 @@ static bool rtmp_stream_start(void *data)
{
struct rtmp_stream *stream = data;
if (!obs_output_can_begin_data_capture(stream->output, 0))
return false;
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
if (!silently_reconnecting(stream)) {
if (!obs_output_can_begin_data_capture(stream->output, 0))
return false;
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
}
os_atomic_set_bool(&stream->connecting, true);
return pthread_create(&stream->connect_thread, NULL, connect_thread,
@ -1389,8 +1465,10 @@ static void check_to_drop_frames(struct rtmp_stream *stream, bool pframes)
static bool add_video_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
check_to_drop_frames(stream, false);
check_to_drop_frames(stream, true);
if (!silently_reconnecting(stream)) {
check_to_drop_frames(stream, false);
check_to_drop_frames(stream, true);
}
/* if currently dropping frames, drop packets until it reaches the
* desired priority */

View file

@ -69,6 +69,7 @@ struct rtmp_stream {
volatile bool active;
volatile bool disconnected;
volatile bool encode_error;
volatile bool silent_reconnect;
pthread_t send_thread;
int max_shutdown_time_sec;