352 lines
8.4 KiB
C
352 lines
8.4 KiB
C
|
#ifdef _WIN32
|
||
|
#include "rtmp-stream.h"
|
||
|
#include <winsock2.h>
|
||
|
|
||
|
static void fatal_sock_shutdown(struct rtmp_stream *stream)
|
||
|
{
|
||
|
closesocket(stream->rtmp.m_sb.sb_socket);
|
||
|
stream->rtmp.m_sb.sb_socket = -1;
|
||
|
stream->write_buf_len = 0;
|
||
|
os_event_signal(stream->buffer_space_available_event);
|
||
|
}
|
||
|
|
||
|
static bool socket_event(struct rtmp_stream *stream, bool *can_write,
|
||
|
uint64_t last_send_time)
|
||
|
{
|
||
|
WSANETWORKEVENTS net_events;
|
||
|
bool success;
|
||
|
|
||
|
success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
|
||
|
&net_events);
|
||
|
if (!success) {
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Aborting due to "
|
||
|
"WSAEnumNetworkEvents failure, %d",
|
||
|
WSAGetLastError());
|
||
|
fatal_sock_shutdown(stream);
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
if (net_events.lNetworkEvents & FD_WRITE)
|
||
|
*can_write = true;
|
||
|
|
||
|
if (net_events.lNetworkEvents & FD_CLOSE) {
|
||
|
if (last_send_time) {
|
||
|
uint32_t diff =
|
||
|
(os_gettime_ns() / 1000000) - last_send_time;
|
||
|
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Received "
|
||
|
"FD_CLOSE, %u ms since last send "
|
||
|
"(buffer: %d / %d)",
|
||
|
diff,
|
||
|
stream->write_buf_len,
|
||
|
stream->write_buf_size);
|
||
|
}
|
||
|
|
||
|
if (os_event_try(stream->stop_event) != EAGAIN)
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
|
||
|
"to FD_CLOSE during shutdown, "
|
||
|
"%d bytes lost, error %d",
|
||
|
stream->write_buf_len,
|
||
|
net_events.iErrorCode[FD_CLOSE_BIT]);
|
||
|
else
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
|
||
|
"to FD_CLOSE, error %d",
|
||
|
net_events.iErrorCode[FD_CLOSE_BIT]);
|
||
|
|
||
|
fatal_sock_shutdown(stream);
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
if (net_events.lNetworkEvents & FD_READ) {
|
||
|
char discard[16384];
|
||
|
int err_code;
|
||
|
bool fatal = false;
|
||
|
|
||
|
for (;;) {
|
||
|
int ret = recv(stream->rtmp.m_sb.sb_socket,
|
||
|
discard, sizeof(discard), 0);
|
||
|
if (ret == -1) {
|
||
|
err_code = WSAGetLastError();
|
||
|
if (err_code == WSAEWOULDBLOCK)
|
||
|
break;
|
||
|
|
||
|
fatal = true;
|
||
|
} else if (ret == 0) {
|
||
|
err_code = 0;
|
||
|
fatal = true;
|
||
|
}
|
||
|
|
||
|
if (fatal) {
|
||
|
blog(LOG_ERROR, "socket_thread_windows: "
|
||
|
"Socket error, recv() returned "
|
||
|
"%d, GetLastError() %d",
|
||
|
ret, err_code);
|
||
|
fatal_sock_shutdown(stream);
|
||
|
return false;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
static void ideal_send_backlog_event(struct rtmp_stream *stream,
|
||
|
bool *can_write)
|
||
|
{
|
||
|
ULONG ideal_send_backlog;
|
||
|
int ret;
|
||
|
|
||
|
ret = idealsendbacklogquery(
|
||
|
stream->rtmp.m_sb.sb_socket,
|
||
|
&ideal_send_backlog);
|
||
|
if (ret == 0) {
|
||
|
int cur_tcp_bufsize;
|
||
|
int size = sizeof(cur_tcp_bufsize);
|
||
|
|
||
|
ret = getsockopt(stream->rtmp.m_sb.sb_socket,
|
||
|
SOL_SOCKET,
|
||
|
SO_SNDBUF,
|
||
|
(char *)&cur_tcp_bufsize,
|
||
|
&size);
|
||
|
if (ret == 0) {
|
||
|
if (cur_tcp_bufsize < (int)ideal_send_backlog) {
|
||
|
int bufsize = (int)ideal_send_backlog;
|
||
|
setsockopt(stream->rtmp.m_sb.sb_socket,
|
||
|
SOL_SOCKET,
|
||
|
SO_SNDBUF,
|
||
|
(const char *)&bufsize,
|
||
|
sizeof(bufsize));
|
||
|
|
||
|
blog(LOG_INFO, "socket_thread_windows: "
|
||
|
"Increasing send buffer to "
|
||
|
"ISB %d (buffer: %d / %d)",
|
||
|
ideal_send_backlog,
|
||
|
stream->write_buf_len,
|
||
|
stream->write_buf_size);
|
||
|
}
|
||
|
} else {
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Got "
|
||
|
"send_backlog_event but "
|
||
|
"getsockopt() returned %d",
|
||
|
WSAGetLastError());
|
||
|
}
|
||
|
} else {
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Got "
|
||
|
"send_backlog_event but WSAIoctl() "
|
||
|
"returned %d",
|
||
|
WSAGetLastError());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
enum data_ret {
|
||
|
RET_BREAK,
|
||
|
RET_FATAL,
|
||
|
RET_CONTINUE
|
||
|
};
|
||
|
|
||
|
static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
|
||
|
uint64_t *last_send_time, size_t latency_packet_size,
|
||
|
int delay_time)
|
||
|
{
|
||
|
bool exit_loop = false;
|
||
|
|
||
|
pthread_mutex_lock(&stream->write_buf_mutex);
|
||
|
|
||
|
if (!stream->write_buf_len) {
|
||
|
/* this is now an expected occasional condition due to use of
|
||
|
* auto-reset events, we could end up emptying the buffer as
|
||
|
* it's filled in a previous loop cycle, especially if using
|
||
|
* low latency mode. */
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
/* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
|
||
|
"but no data available"); */
|
||
|
return RET_BREAK;
|
||
|
}
|
||
|
|
||
|
int ret;
|
||
|
if (stream->low_latency_mode) {
|
||
|
size_t send_len =
|
||
|
min(latency_packet_size, stream->write_buf_len);
|
||
|
|
||
|
ret = send(stream->rtmp.m_sb.sb_socket,
|
||
|
(const char *)stream->write_buf,
|
||
|
(int)send_len, 0);
|
||
|
} else {
|
||
|
ret = send(stream->rtmp.m_sb.sb_socket,
|
||
|
(const char *)stream->write_buf,
|
||
|
(int)stream->write_buf_len, 0);
|
||
|
}
|
||
|
|
||
|
if (ret > 0) {
|
||
|
if (stream->write_buf_len - ret)
|
||
|
memmove(stream->write_buf,
|
||
|
stream->write_buf + ret,
|
||
|
stream->write_buf_len - ret);
|
||
|
stream->write_buf_len -= ret;
|
||
|
|
||
|
*last_send_time = os_gettime_ns() / 1000000;
|
||
|
|
||
|
os_event_signal(stream->buffer_space_available_event);
|
||
|
} else {
|
||
|
int err_code;
|
||
|
bool fatal_err = false;
|
||
|
|
||
|
if (ret == -1) {
|
||
|
err_code = WSAGetLastError();
|
||
|
|
||
|
if (err_code == WSAEWOULDBLOCK) {
|
||
|
*can_write = false;
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
return RET_BREAK;
|
||
|
}
|
||
|
|
||
|
fatal_err = true;
|
||
|
} else if (ret == 0) {
|
||
|
err_code = 0;
|
||
|
fatal_err = true;
|
||
|
}
|
||
|
|
||
|
if (fatal_err) {
|
||
|
/* connection closed, or connection was aborted /
|
||
|
* socket closed / etc, that's a fatal error. */
|
||
|
blog(LOG_ERROR, "socket_thread_windows: "
|
||
|
"Socket error, send() returned %d, "
|
||
|
"GetLastError() %d",
|
||
|
ret, err_code);
|
||
|
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
fatal_sock_shutdown(stream);
|
||
|
return RET_FATAL;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* finish writing for now */
|
||
|
if (stream->write_buf_len <= 1000)
|
||
|
exit_loop = true;
|
||
|
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
|
||
|
if (delay_time)
|
||
|
os_sleep_ms(delay_time);
|
||
|
|
||
|
return exit_loop ? RET_BREAK : RET_CONTINUE;
|
||
|
}
|
||
|
|
||
|
#define LATENCY_FACTOR 20
|
||
|
|
||
|
static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
|
||
|
{
|
||
|
bool can_write = false;
|
||
|
|
||
|
int delay_time;
|
||
|
size_t latency_packet_size;
|
||
|
uint64_t last_send_time = 0;
|
||
|
|
||
|
HANDLE send_backlog_event;
|
||
|
OVERLAPPED send_backlog_overlapped;
|
||
|
|
||
|
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
|
||
|
|
||
|
WSAEventSelect(stream->rtmp.m_sb.sb_socket,
|
||
|
stream->socket_available_event,
|
||
|
FD_READ|FD_WRITE|FD_CLOSE);
|
||
|
|
||
|
send_backlog_event = CreateEvent(NULL, true, false, NULL);
|
||
|
|
||
|
if (stream->low_latency_mode) {
|
||
|
delay_time = 1000 / LATENCY_FACTOR;
|
||
|
latency_packet_size = stream->write_buf_size / (LATENCY_FACTOR - 2);
|
||
|
} else {
|
||
|
latency_packet_size = stream->write_buf_size;
|
||
|
delay_time = 0;
|
||
|
}
|
||
|
|
||
|
if (!stream->disable_send_window_optimization) {
|
||
|
memset(&send_backlog_overlapped, 0,
|
||
|
sizeof(send_backlog_overlapped));
|
||
|
send_backlog_overlapped.hEvent = send_backlog_event;
|
||
|
idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
|
||
|
&send_backlog_overlapped, NULL);
|
||
|
} else {
|
||
|
blog(LOG_INFO, "socket_thread_windows: Send window "
|
||
|
"optimization disabled by user.");
|
||
|
}
|
||
|
|
||
|
HANDLE objs[3];
|
||
|
|
||
|
objs[0] = stream->socket_available_event;
|
||
|
objs[1] = stream->buffer_has_data_event;
|
||
|
objs[2] = send_backlog_event;
|
||
|
|
||
|
for (;;) {
|
||
|
if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) {
|
||
|
pthread_mutex_lock(&stream->write_buf_mutex);
|
||
|
if (stream->write_buf_len == 0) {
|
||
|
//blog(LOG_DEBUG, "Exiting on empty buffer");
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
os_event_reset(stream->send_thread_signaled_exit);
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
pthread_mutex_unlock(&stream->write_buf_mutex);
|
||
|
}
|
||
|
|
||
|
int status = WaitForMultipleObjects(3, objs, false, INFINITE);
|
||
|
if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
|
||
|
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
|
||
|
"to WaitForMultipleObjects failure");
|
||
|
fatal_sock_shutdown(stream);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (status == WAIT_OBJECT_0) {
|
||
|
/* Socket event */
|
||
|
if (!socket_event(stream, &can_write, last_send_time))
|
||
|
return;
|
||
|
|
||
|
} else if (status == WAIT_OBJECT_0 + 2) {
|
||
|
/* Ideal send backlog event */
|
||
|
ideal_send_backlog_event(stream, &can_write);
|
||
|
|
||
|
ResetEvent(send_backlog_event);
|
||
|
idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
|
||
|
&send_backlog_overlapped, NULL);
|
||
|
continue;
|
||
|
}
|
||
|
|
||
|
if (can_write) {
|
||
|
for (;;) {
|
||
|
enum data_ret ret = write_data(
|
||
|
stream,
|
||
|
&can_write,
|
||
|
&last_send_time,
|
||
|
latency_packet_size,
|
||
|
delay_time);
|
||
|
|
||
|
switch (ret) {
|
||
|
case RET_BREAK:
|
||
|
goto exit_write_loop;
|
||
|
case RET_FATAL:
|
||
|
return;
|
||
|
case RET_CONTINUE:;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
exit_write_loop:;
|
||
|
}
|
||
|
|
||
|
if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET)
|
||
|
WSAEventSelect(stream->rtmp.m_sb.sb_socket,
|
||
|
stream->socket_available_event, 0);
|
||
|
|
||
|
blog(LOG_INFO, "socket_thread_windows: Normal exit");
|
||
|
}
|
||
|
|
||
|
void *socket_thread_windows(void *data)
|
||
|
{
|
||
|
struct rtmp_stream *stream = data;
|
||
|
socket_thread_windows_internal(stream);
|
||
|
return NULL;
|
||
|
}
|
||
|
#endif
|