diff --git a/source/sysevent/lib/libsysevent.c b/source/sysevent/lib/libsysevent.c index 9e8d0ca9..6147c3f3 100644 --- a/source/sysevent/lib/libsysevent.c +++ b/source/sysevent/lib/libsysevent.c @@ -1757,6 +1757,7 @@ int SE_msg_send (int fd, char *sendmsg) int bytes_sent = 0; int num_retries = 3; int rc; + int last_errno = 0; while (0 < bytes_to_write && 0 < num_retries) { rc = write(fd, send_msg_buffer+bytes_sent, bytes_to_write); if (0 < rc) { @@ -1764,8 +1765,10 @@ int SE_msg_send (int fd, char *sendmsg) bytes_to_write -= rc; bytes_sent+=rc; } else if (0 == rc) { + last_errno = EWOULDBLOCK; num_retries--; } else { + last_errno = errno; struct timespec sleep_time; sleep_time.tv_sec = 0; sleep_time.tv_nsec = 100000000; // .1 secs @@ -1777,6 +1780,9 @@ int SE_msg_send (int fd, char *sendmsg) if (0 == bytes_to_write) { return(0); } else { + if (0 != last_errno) { + errno = last_errno; + } return(-1); } } @@ -1809,6 +1815,7 @@ int SE_msg_send_data (int fd, char *sendmsg, unsigned int msgsize) int bytes_sent = 0; int num_retries = 3; int rc; + int last_errno = 0; if (fileread == 0) { v_secure_system("echo fname before write %s: %d >> /tmp/sys_d.txt",__FUNCTION__, bytes_to_write); @@ -1820,8 +1827,10 @@ int SE_msg_send_data (int fd, char *sendmsg, unsigned int msgsize) bytes_to_write -= rc; bytes_sent+=rc; } else if (0 == rc) { + last_errno = EWOULDBLOCK; num_retries--; } else { + last_errno = errno; struct timespec sleep_time; sleep_time.tv_sec = 0; sleep_time.tv_nsec = 100000000; // .1 secs @@ -1836,6 +1845,9 @@ int SE_msg_send_data (int fd, char *sendmsg, unsigned int msgsize) if (0 == bytes_to_write) { return(0); } else { + if (0 != last_errno) { + errno = last_errno; + } return(-1); } } diff --git a/source/sysevent/server/syseventd_main.c b/source/sysevent/server/syseventd_main.c index ec38a8c6..3c678b1b 100644 --- a/source/sysevent/server/syseventd_main.c +++ b/source/sysevent/server/syseventd_main.c @@ -427,6 +427,25 @@ static void reinit_signal_handler (int signum) ulog(ULOG_SYSTEM, UL_SYSEVENT, "Received reinit signal"); } +static int set_fd_socket_timeouts(int fd, int timeout_ms) +{ + if (timeout_ms <= 0) { + return(-1); + } + + struct timeval tv; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + + if (-1 == setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))) { + return(-1); + } + if (-1 == setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) { + return(-1); + } + return(0); +} + /* * Procedure : initialize_system * Purpose : Initialize the system @@ -1479,6 +1498,13 @@ int main (int argc, char **argv) close(newsockfd); continue; } else { + // Keep sockets in blocking mode, but cap read/write wait time. + // Start with 200ms and tune in the 100-500ms range. + if (-1 == set_fd_socket_timeouts(newsockfd, 200)) { + ulogf(ULOG_SYSTEM, UL_SYSEVENT, + "Unable to set client fd %d socket timeouts. (%d) %s", + newsockfd, errno, strerror(errno)); + } if (SE_MSG_OPEN_CONNECTION_DATA == msgtype) { new_client->isData = 1; diff --git a/source/sysevent/server/worker_threads.c b/source/sysevent/server/worker_threads.c index d3165ef1..55fd0955 100644 --- a/source/sysevent/server/worker_threads.c +++ b/source/sysevent/server/worker_threads.c @@ -88,6 +88,45 @@ Author: mark enright static const char *emptystr = ""; +static void handle_client_send_failure(const int fd, const token_t who, const char *context) +{ + int send_errno = errno; + + if (EAGAIN == send_errno || EWOULDBLOCK == send_errno) { + int threshold_rc = CLI_MGR_handle_client_error_by_fd(fd); + SE_INC_LOG(ERROR, + int id = thread_get_id(worker_data_key); + printf("Thread %d: send timeout to client fd %d (%s). errno=%d %s threshold_rc=%d\n", + id, fd, (NULL == context) ? "unknown" : context, send_errno, strerror(send_errno), threshold_rc); + ) + if (1 == threshold_rc) { + SE_INC_LOG(ERROR, + int id = thread_get_id(worker_data_key); + printf("Thread %d: client fd %d disconnected after repeated send timeouts\n", id, fd); + ) + } + return; + } + + if (EPIPE == send_errno || ECONNRESET == send_errno || ENOTCONN == send_errno) { + SE_INC_LOG(ERROR, + int id = thread_get_id(worker_data_key); + printf("Thread %d: hard send failure on client fd %d (%s). errno=%d %s. Disconnecting immediately\n", + id, fd, (NULL == context) ? "unknown" : context, send_errno, strerror(send_errno)); + ) + CLI_MGR_remove_client_by_fd(fd, who, 1); + return; + } + + // Any other send failure still contributes to client-health tracking. + (void)CLI_MGR_handle_client_error_by_fd(fd); + SE_INC_LOG(ERROR, + int id = thread_get_id(worker_data_key); + printf("Thread %d: send failure on client fd %d (%s). errno=%d %s\n", + id, fd, (NULL == context) ? "unknown" : context, send_errno, strerror(send_errno)); + ) +} + /* * Procedure : send_msg_to_fork_helper * Purpose : send a message to the fork helper process @@ -1538,6 +1577,7 @@ static int handle_send_notification_msg(const int local_fd, const token_t who, s printf("Thread %d: Failed to send %s (%d) to client on fd %d (%d) %s\n", id, SE_print_mtype(SE_MSG_NOTIFICATION), SE_MSG_NOTIFICATION, fd, rc2, SE_strerror(rc2)); ) + handle_client_send_failure(fd, id, "SE_MSG_NOTIFICATION"); } else { SE_INC_LOG(MESSAGES, int id = thread_get_id(worker_data_key); @@ -1658,6 +1698,7 @@ static int handle_send_notification_msg_data(const int local_fd, const token_t w printf("Thread %d: Failed to send %s (%d) to client on fd %d (%d) %s\n", id, SE_print_mtype(SE_MSG_NOTIFICATION_DATA), SE_MSG_NOTIFICATION_DATA, fd, rc2, SE_strerror(rc2)); ) + handle_client_send_failure(fd, id, "SE_MSG_NOTIFICATION_DATA"); } else { SE_INC_LOG(MESSAGES, int id = thread_get_id(worker_data_key);