forked from Ocean/datum_gateway
Merge remote-tracking branch 'github-pull/76/head'
This commit is contained in:
commit
196d8860e0
@ -1163,6 +1163,20 @@ enum MHD_Result datum_api_answer(void *cls, struct MHD_Connection *connection, c
|
||||
return ret;
|
||||
}
|
||||
|
||||
static struct MHD_Daemon *datum_api_try_start(unsigned int flags) {
|
||||
flags |= MHD_USE_AUTO; // event loop API
|
||||
flags |= MHD_USE_INTERNAL_POLLING_THREAD;
|
||||
return MHD_start_daemon(
|
||||
flags,
|
||||
datum_config.api_listen_port,
|
||||
NULL, NULL, // accept policy filter
|
||||
&datum_api_answer, NULL, // default URI handler
|
||||
MHD_OPTION_CONNECTION_LIMIT, 128,
|
||||
MHD_OPTION_NOTIFY_COMPLETED, datum_api_request_completed, NULL,
|
||||
MHD_OPTION_LISTENING_ADDRESS_REUSE, (unsigned int)1,
|
||||
MHD_OPTION_END);
|
||||
}
|
||||
|
||||
void *datum_api_thread(void *ptr) {
|
||||
struct MHD_Daemon *daemon;
|
||||
|
||||
@ -1171,11 +1185,8 @@ void *datum_api_thread(void *ptr) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
daemon = MHD_start_daemon(MHD_USE_AUTO | MHD_USE_INTERNAL_POLLING_THREAD, datum_config.api_listen_port, NULL, NULL, &datum_api_answer, NULL,
|
||||
MHD_OPTION_CONNECTION_LIMIT, 128,
|
||||
MHD_OPTION_NOTIFY_COMPLETED, datum_api_request_completed, NULL,
|
||||
MHD_OPTION_LISTENING_ADDRESS_REUSE, (unsigned int)1,
|
||||
MHD_OPTION_END);
|
||||
daemon = datum_api_try_start(MHD_USE_DUAL_STACK);
|
||||
if (!daemon) daemon = datum_api_try_start(0);
|
||||
|
||||
if (!daemon) {
|
||||
DLOG_FATAL("Unable to start daemon for API");
|
||||
|
@ -537,10 +537,31 @@ int assign_to_thread(T_DATUM_SOCKET_APP *app, int fd) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
const char *datum_sockets_setup_listen_sock(const int listen_sock, const struct sockaddr * const sa, const size_t sa_len) {
|
||||
if (-1 == listen_sock) {
|
||||
return "Could not create listening socket";
|
||||
}
|
||||
|
||||
datum_socket_setoptions(listen_sock);
|
||||
|
||||
static const int reuse = 1;
|
||||
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) {
|
||||
return "setsockopt(SO_REUSEADDR) failed";
|
||||
}
|
||||
|
||||
if (bind(listen_sock, sa, sa_len) < 0) {
|
||||
return "bind failed";
|
||||
}
|
||||
|
||||
if (listen(listen_sock, 10) < 0) {
|
||||
return "listen failed";
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *datum_gateway_listener_thread(void *arg) {
|
||||
struct sockaddr_in serveraddr;
|
||||
int i, ret;
|
||||
int reuse = 1;
|
||||
bool rejecting_now = false;
|
||||
uint64_t last_reject_msg_tsms = 0, curtime_tsms = 0;
|
||||
uint64_t reject_count = 0;
|
||||
@ -548,7 +569,7 @@ void *datum_gateway_listener_thread(void *arg) {
|
||||
T_DATUM_SOCKET_APP *app = (T_DATUM_SOCKET_APP *)arg;
|
||||
|
||||
struct epoll_event ev, events[MAX_EVENTS];
|
||||
int listen_sock, conn_sock, nfds, epollfd;
|
||||
int listen_socks[2], conn_sock, nfds, epollfd;
|
||||
|
||||
if (!app) {
|
||||
DLOG_FATAL("Called without application data structure. :(");
|
||||
@ -577,37 +598,36 @@ void *datum_gateway_listener_thread(void *arg) {
|
||||
|
||||
app->datum_active_threads = 0;
|
||||
|
||||
listen_sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (-1 == listen_sock) {
|
||||
DLOG_FATAL("Could not create listening socket: %s", strerror(errno));
|
||||
const struct sockaddr_in6 anyaddr6 = {
|
||||
.sin6_family = AF_INET6,
|
||||
.sin6_port = htons(app->listen_port),
|
||||
.sin6_addr = IN6ADDR_ANY_INIT,
|
||||
};
|
||||
listen_socks[0] = socket(AF_INET6, SOCK_STREAM, 0);
|
||||
const char * const errstr6 = datum_sockets_setup_listen_sock(listen_socks[0], (const struct sockaddr *)&anyaddr6, sizeof(anyaddr6));
|
||||
const int errno6 = errno;
|
||||
if (errstr6 && listen_socks[0] != -1) {
|
||||
close(listen_socks[0]);
|
||||
listen_socks[0] = -1;
|
||||
}
|
||||
|
||||
const struct sockaddr_in anyaddr4 = {
|
||||
.sin_family = AF_INET,
|
||||
.sin_port = htons(app->listen_port),
|
||||
.sin_addr.s_addr = INADDR_ANY,
|
||||
};
|
||||
listen_socks[1] = socket(AF_INET, SOCK_STREAM, 0);
|
||||
const char *errstr = datum_sockets_setup_listen_sock(listen_socks[1], (const struct sockaddr *)&anyaddr4, sizeof(anyaddr4));
|
||||
if (errstr && errstr6) {
|
||||
const int errno4 = errno;
|
||||
DLOG_FATAL("%s (IPv6): %s", errstr6, strerror(errno6));
|
||||
DLOG_FATAL("%s (IPv4): %s", errstr, strerror(errno4));
|
||||
panic_from_thread(__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
datum_socket_setoptions(listen_sock);
|
||||
memset(&serveraddr, 0, sizeof(serveraddr));
|
||||
serveraddr.sin_family = AF_INET;
|
||||
serveraddr.sin_port = htons(app->listen_port);
|
||||
|
||||
// TODO: Add option to bind to specific IP per configuration!
|
||||
serveraddr.sin_addr.s_addr = INADDR_ANY;
|
||||
|
||||
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) {
|
||||
DLOG_FATAL("setsockopt(SO_REUSEADDR) failed: %s", strerror(errno));
|
||||
panic_from_thread(__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(bind(listen_sock, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0) {
|
||||
DLOG_FATAL("bind failed: %s", strerror(errno));
|
||||
panic_from_thread(__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (listen(listen_sock, 10) < 0) {
|
||||
DLOG_FATAL("listen failed: %s", strerror(errno));
|
||||
panic_from_thread(__LINE__);
|
||||
return NULL;
|
||||
if (errstr && listen_socks[1] != -1) {
|
||||
close(listen_socks[1]);
|
||||
listen_socks[1] = -1;
|
||||
}
|
||||
|
||||
epollfd = epoll_create1(0);
|
||||
@ -617,13 +637,16 @@ void *datum_gateway_listener_thread(void *arg) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (i = 0; i < 2; ++i) {
|
||||
if (listen_socks[i] == -1) continue;
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = listen_sock;
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev)<0) {
|
||||
ev.data.fd = listen_socks[i];
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
|
||||
DLOG_FATAL("epoll_ctl failed: %s", strerror(errno));
|
||||
panic_from_thread(__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
DLOG_INFO("DATUM Socket listener thread active for '%s'", app->name);
|
||||
|
||||
@ -641,8 +664,8 @@ void *datum_gateway_listener_thread(void *arg) {
|
||||
}
|
||||
}
|
||||
for (int n = 0; n < nfds; ++n) {
|
||||
if (events[n].data.fd == listen_sock) {
|
||||
conn_sock = accept(listen_sock, NULL, NULL);
|
||||
if (events[n].data.fd == listen_socks[0] || events[n].data.fd == listen_socks[1]) {
|
||||
conn_sock = accept(events[n].data.fd, NULL, NULL);
|
||||
if (conn_sock < 0) {
|
||||
DLOG_ERROR("accept failed: %s", strerror(errno));
|
||||
continue;
|
||||
|
Loading…
Reference in New Issue
Block a user