diff --git a/src/bufio.c b/src/bufio.c index 31a9112..bf3630d 100644 --- a/src/bufio.c +++ b/src/bufio.c @@ -347,14 +347,43 @@ static void ignore_sigpipe(int socket __attribute__((__unused__))) // Poll which automatically restarts on EINTR and EAGAIN -static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout) +static inline int safe_poll(struct pollfd fds[], nfds_t nfds, int timeout, bufio_stream_type stream_type) { // TODO: Automatically decrement timeout, use ppoll on Linux int rc; + int i = 0; + int num_loops = 0; +#ifdef __MACH__ + if (stream_type == BUFIO_FIFO && timeout != 0) { + // Work around macOS returning 0 for named pipes even though data arrives + // within the timeout. This bug exists at least up to macOS Sonoma 14.7. We + // could simply call poll with timeout = 0 after hitting the timeout. To + // reduce the stuttering we split the timeout into periods of at most 20 ms + // and repeat poll as many times as requested (indefinitely for timeout = -1). + const int max_poll_period_msec = 20; + if (timeout == -1) { + // Poll indefinitely in short steps + num_loops = -1; + timeout = max_poll_period_msec; + } else if (timeout > 0 && timeout < 2 * max_poll_period_msec) { + // Split interval into 2 polls + num_loops = 1; + if (timeout > 1) + timeout /= 2; + } else if (timeout >= 2 * max_poll_period_msec) { + // Loop as many times as needed in short steps + num_loops = timeout / max_poll_period_msec; + timeout = max_poll_period_msec; + } + } +#else + (void) stream_type; +#endif do { rc = poll(fds, nfds, timeout); - } while ((rc == -1) && (errno == EINTR || errno == EAGAIN)); + debug_print("rc=%d, i=%d, nl=%d, timeout=%d, events=%d, revents=%d, error=%s", rc, i, num_loops, timeout, fds[0].events, fds[0].revents, rc == -1 ? strerror(errno): "none"); + } while (((rc == -1) && (errno == EINTR || errno == EAGAIN)) || (rc == 0 && num_loops > 0 && i++ < num_loops)); return rc; } @@ -439,7 +468,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info) poll_in.fd = stream->fd; poll_in.events = POLLIN; - int rc = safe_poll(&poll_in, 1, timeout); + int rc = safe_poll(&poll_in, 1, timeout, stream->type); if (rc == 0) { logstring(info, "listen timeout"); return 0; @@ -464,7 +493,7 @@ static int accept_socket(bufio_stream *stream, int timeout, const char* info) ignore_sigpipe(stream->fd); // Enable non-blocking I/O - fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK); loginetadr(info, "connection established", sa, client_address.sin_port); @@ -504,7 +533,9 @@ which are always bidirectional. Also, standard streams (stdin, stdout) are unidirectional. If required, files are created with rw-rw-r--. timeout specifies the time to wait for a connection in milliseconds. Specify --1 to block indefinitely. +-1 to block indefinitely. If the target is a named pipe (created with mkfifo) +and mode is "w", bufio_open waits this amount of time for a reader to attach +to the pipe. bufsize specifies the buffer size in Byte. If 0 a default value will be used. @@ -630,8 +661,10 @@ application code does not crash during writes to a broken pipe. stream->type = BUFIO_PIPE; // TODO: Restructure code if (stream->mode & O_WRONLY) { stream->fd = STDOUT_FILENO; // Write-only + fcntl(stream->fd, F_SETFL, O_WRONLY | O_NONBLOCK); } else if ((stream->mode & O_RDWR) == 0) { stream->fd = STDIN_FILENO; // Read-only + fcntl(stream->fd, F_SETFL, O_NONBLOCK); } else { // Read/write log2string(info, "invalid mode", opt, "for standard stream"); @@ -657,11 +690,27 @@ application code does not crash during writes to a broken pipe. } else if (!stat_rc && S_ISFIFO(statbuf.st_mode)) { // TODO: LOCKEDFIFO? stream->type = BUFIO_FIFO; + if (timeout < 0) + timeout = -1; + else if (timeout > 0 && timeout < 50) + timeout = 50; } // Open file mode_t file_flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; - if ((stream->fd = open(name, stream->mode, file_flags)) == -1) { + while ((stream->fd = open(name, stream->mode, file_flags)) == -1) { + if (stream->type != BUFIO_FIFO || !(stream->mode & O_WRONLY) || (timeout >= 0 && timeout < 50)) + break; + + assert(stream->type == BUFIO_FIFO && errno == ENXIO); + + // For a writer on namedpipe, wait for a reading process until the timeout is reached + usleep(50000); + if (timeout != -1) + timeout -= 50; + } + + if (stream->fd == -1) { log2string(info, "failed to open file with mode", opt, name); goto free_and_out; } @@ -672,6 +721,9 @@ application code does not crash during writes to a broken pipe. goto close_free_and_out; } + if (stream->type == BUFIO_FIFO || stream->type == BUFIO_PIPE) + ignore_sigpipe(stream->fd); + return stream; } @@ -773,7 +825,7 @@ application code does not crash during writes to a broken pipe. } // Enable non-blocking I/O - fcntl(stream->fd, F_SETFL, (long) (O_RDWR | O_NONBLOCK)); + fcntl(stream->fd, F_SETFL, O_RDWR | O_NONBLOCK); if (bufio_set_buffer(stream, bufsize > 0 ? bufsize : BUFIO_BUFSIZE) != 0) { logstring(info, "can not create buffer"); @@ -906,6 +958,7 @@ and the status code of the stream was set. read_vec[0].iov_base = (char *) ptr + (size - remaining_bytes); read_vec[0].iov_len = remaining_bytes; + errno = 0; ssize_t nbytes = readv(stream->fd, read_vec, 2); if (nbytes == -1) { if (errno == EAGAIN || errno == EINTR) @@ -918,8 +971,19 @@ and the status code of the stream was set. return size - remaining_bytes; } - if (nbytes == 0 && poll_in.revents & POLLIN) { + // Read returns 0 to indicate EOF for files and when no writer is attached + // to a named pipe ("fifo") - or an anonymous pipe (on macOS only!); see pipe(7) + if (nbytes == 0 && + ((poll_in.revents & POLLIN) || stream->type == BUFIO_FIFO)) { +#ifdef __MACH__ + if (stream->type == BUFIO_PIPE) { + debug_print("pipe error with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); + stream->status = BUFIO_EPIPE; + return size - remaining_bytes; + } +#endif // Reached end-of-file + debug_print("eof with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); stream->status = BUFIO_EOF; bufio_release_read_lock(stream); return size - remaining_bytes; @@ -938,7 +1002,7 @@ and the status code of the stream was set. remaining_bytes -= nbytes; } } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_in, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_in.revents & POLLIN); bufio_release_read_lock(stream); @@ -951,7 +1015,7 @@ and the status code of the stream was set. else if (poll_in.revents & POLLERR) stream->status = -EIO; // EIO comes closest to "an exceptional condition" else if (poll_rc == 0) { - debug_print("timeout with %zu remaining bytes (%zu bytes requested)", remaining_bytes, size); + debug_print("timeout with %zu remaining bytes (%zu bytes requested, timeout=%d)", remaining_bytes, size, stream->io_timeout_ms); stream->status = BUFIO_TIMEDOUT; } @@ -1052,7 +1116,7 @@ error has occured and the status code of the stream was set. stream->write_lock_offset += nbytes; ptr = (char *) ptr + nbytes; } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); if (remaining_bytes == 0) @@ -1120,7 +1184,7 @@ error has occured and the status code of the stream was set. remaining_bytes -= nbytes; } } while (remaining_bytes > 0 && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); if (remaining_bytes == 0) @@ -1207,7 +1271,7 @@ of the stream was set. output_buffer_head += nbytes; stream->write_lock_offset += nbytes; } while (output_buffer_head != stream->output_buffer_tail && - (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms)) == 1 && + (poll_rc = safe_poll(&poll_out, 1, stream->io_timeout_ms, stream->type)) == 1 && poll_out.revents == POLLOUT); bufio_release_write_lock(stream); @@ -1367,10 +1431,10 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely. return -1; // Stream error } - // When trying a non-blocking read on a TCP connection in CLOSE_WAIT state, + // When trying a non-blocking read on a TCP connection in CLOSE_WAIT state or on a pipe where the writer hung up // - macOS yields 0 bytes and ETIMEDOUT, while // - Linux yields 0 bytes and EAGAIN. - if (stream->type == BUFIO_SOCKET && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) { + if ((stream->type == BUFIO_PIPE || stream->type == BUFIO_SOCKET) && nbytes == 0 && (read_errno == ETIMEDOUT || read_errno == EAGAIN)) { stream->status = BUFIO_EPIPE; return -1; // Stream error } @@ -1393,7 +1457,7 @@ input buffers. If the value of timeout is -1, the poll blocks indefinitely. poll_in.events = POLLIN; poll_in.revents = 0; - int rc = safe_poll(&poll_in, 1, timeout); + int rc = safe_poll(&poll_in, 1, timeout, stream->type); if (rc == 0) { stream->status = BUFIO_TIMEDOUT; return 0; // Timeout diff --git a/tests/bufio_test_namedpipe.c b/tests/bufio_test_namedpipe.c new file mode 100644 index 0000000..0a5465e --- /dev/null +++ b/tests/bufio_test_namedpipe.c @@ -0,0 +1,76 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + char buf[16]; + + unlink("test_bufio_namedpipe.fifo"); + assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + + // Attempt to open a writer: this should fail after the timeout because no one is listening + struct timeval before, after; + assert(gettimeofday(&before, NULL) == 0); + bufio_stream *so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe"); + assert(so == NULL); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + + // Open a reader + bufio_stream *si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Open a writer: this should be much quicker than before + assert(gettimeofday(&before, NULL) == 0); + so = bufio_open("test_bufio_namedpipe.fifo", "w", 1000, 256, "bufio_test_namedpipe"); + assert(so != NULL); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec < 1.0); + + // Assert no initial data + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + + // Test read after write + assert(bufio_write(so, buf, 4) == 4 && bufio_flush(so) == 0); + assert(bufio_read(si, buf, 4) == 4); + + // Clean up + assert(bufio_close(so) == 0); + assert(bufio_close(si) == 0); + assert(unlink("test_bufio_namedpipe.fifo") == 0); + + // Test again with just a reader + assert(mkfifo("test_bufio_namedpipe.fifo", S_IRUSR | S_IWUSR) == 0); + si = bufio_open("test_bufio_namedpipe.fifo", "r", 1000, 256, "bufio_test_namedpipe"); + assert(si != NULL); + + // Assert no initial data and EOF (when there's no writer) + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EOF); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&before, NULL) == 0); + assert(bufio_wait(si, 1000) == 0 && bufio_status(si) == BUFIO_EOF); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec >= 1.0); + + assert(unlink("test_bufio_namedpipe.fifo") == 0); + + return 0; +} diff --git a/tests/bufio_test_namedpipe_async.c b/tests/bufio_test_namedpipe_async.c new file mode 100644 index 0000000..0565258 --- /dev/null +++ b/tests/bufio_test_namedpipe_async.c @@ -0,0 +1,101 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + const char fname[] = "test_bufio_namedpipe_async.fifo"; + const char testname[] = "bufio_test_namedpipe_async"; + char buf[16]; + + unlink(fname); + assert(mkfifo(fname, S_IRUSR | S_IWUSR) == 0); + + FORK_CHILD + // Open a reader + bufio_stream *si = bufio_open(fname, "r", 1000, 256, testname); + assert(si != NULL); + + // Wait until writer is connected + bufio_timeout(si, 1000); + size_t nread = 0; + while ((nread = bufio_read(si, buf, 16)) == 0) + assert(bufio_status(si) == BUFIO_EOF); + + assert(nread == 16); + + // Clean up + assert(bufio_close(si) == 0); + + FORK_PARENT + // Open a writer - this waits until a reader is present + bufio_stream *so = bufio_open(fname, "w", 2000, 256, testname); + assert(so != NULL); + + // Delay a bit such that bufio_read enters poll() + usleep(100000); + + // Write something + assert(bufio_write(so, buf, 16) == 16 && bufio_flush(so) == 0); + + // Clean up + assert(bufio_close(so) == 0); + + FORK_JOIN + + assert(unlink(fname) == 0); + assert(mkfifo(fname, S_IRUSR | S_IWUSR) == 0); + + // Test poll - which requires a workaround on macOS + FORK_CHILD + // Open a reader + bufio_stream *si = bufio_open(fname, "r", 1000, 256, testname); + assert(si != NULL); + + // Wait until writer is present + usleep(100000); + + // Attempt to read immediately - which will enter poll() + bufio_timeout(si, 1000); + assert(bufio_read(si, buf, 16) == 16); + + // Clean up + assert(bufio_close(si) == 0); + + FORK_PARENT + // Open a writer - this waits until a reader is present + bufio_stream *so = bufio_open(fname, "w", 2000, 256, testname); + assert(so != NULL); + + // Delay a bit such that bufio_read enters poll() + usleep(250000); + + // Write something + assert(bufio_write(so, buf, 16) == 16 && bufio_flush(so) == 0); + + // Keep open until the reader would time out (if poll were broken) + usleep(1500000); + + // Clean up + assert(bufio_close(so) == 0); + + FORK_JOIN + + assert(unlink(fname) == 0); + return 0; +} diff --git a/tests/bufio_test_pipe.c b/tests/bufio_test_pipe.c new file mode 100644 index 0000000..ac67653 --- /dev/null +++ b/tests/bufio_test_pipe.c @@ -0,0 +1,82 @@ +#ifdef __linux__ +#define _DEFAULT_SOURCE +#define _BSD_SOURCE +#define _POSIX_C_SOURCE 200809L +#else +#undef _POSIX_C_SOURCE +#endif + +#include +#include +#include +#include +#include +#include + +#include "bufio.h" +#include "test.h" + + +int main(void) +{ + char buf[16]; + + // Create pipe + int p[2]; + assert(pipe(p) == 0); + assert(close(STDIN_FILENO) == 0); + assert(close(STDOUT_FILENO) == 0); + + FORK_CHILD + // Redirect pipe to stdin + assert(close(p[1]) == 0); + assert(dup2(p[0], STDIN_FILENO) == STDIN_FILENO); + + // Open a reader on stdin + bufio_stream *si = bufio_open("-", "r", 100, 256, "bufio_test_pipe"); + assert(si != NULL); + + // Assert no initial data + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 0) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + assert(bufio_wait(si, 100) == 0 && bufio_status(si) == BUFIO_TIMEDOUT); + + sleep(1); + + // Test read after write + assert(bufio_read(si, buf, 4) == 4); + + sleep(2); + + // Assert EOF after writer hung up + struct timeval before, after; + assert(bufio_read(si, buf, 16) == 0 && bufio_status(si) == BUFIO_EPIPE); + assert(bufio_wait(si, 0) == -1 && bufio_status(si) == BUFIO_EPIPE); + assert(gettimeofday(&before, NULL) == 0); + assert(bufio_wait(si, 1000) == -1 && bufio_status(si) == BUFIO_EPIPE); + assert(gettimeofday(&after, NULL) == 0); + assert(after.tv_sec + 1e-6 * after.tv_usec - before.tv_sec - 1e-6 * before.tv_usec < 1.0); + + assert(bufio_close(si) == 0); + + FORK_PARENT + // Redirect stdout to pipe + assert(close(p[0]) == 0); + assert(dup2(p[1], STDOUT_FILENO) == STDOUT_FILENO); + + // Open a writer to stdout + bufio_stream *so = bufio_open("-", "w", 100, 256, "bufio_test_pipe"); + assert(so != NULL); + + // Test read after write + sleep(1); + assert(bufio_write(so, buf, 4) == 4 && bufio_flush(so) == 0); + + // Close writer (and the duplicate fd) + sleep(1); + assert(bufio_close(so) == 0); + close(p[1]); + + FORK_JOIN + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index 02fceac..7c49f98 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -4,6 +4,9 @@ bufio_test_tcp_connect = executable('bufio_test_tcp_connect', 'bufio_test_tcp_co bufio_test_delayed_tcp_connect = executable('bufio_test_delayed_tcp_connect', 'bufio_test_delayed_tcp_connect.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_lockedfile = executable('bufio_test_lockedfile', 'bufio_test_lockedfile.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_file = executable('bufio_test_file', 'bufio_test_file.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_pipe = executable('bufio_test_pipe', 'bufio_test_pipe.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_namedpipe = executable('bufio_test_namedpipe', 'bufio_test_namedpipe.c', include_directories : bufio_inc, link_with : bufio_lib) +bufio_test_namedpipe_async = executable('bufio_test_namedpipe_async', 'bufio_test_namedpipe_async.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose_with_pending_data = executable('bufio_test_wait_on_tcpclose_with_pending_data', 'bufio_test_wait_on_tcpclose_with_pending_data.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_wait_on_tcpclose = executable('bufio_test_wait_on_tcpclose', 'bufio_test_wait_on_tcpclose.c', include_directories : bufio_inc, link_with : bufio_lib) bufio_test_mem_interface = executable('bufio_test_mem_interface', 'bufio_test_mem_interface.c', include_directories : bufio_inc, link_with : bufio_lib) @@ -14,6 +17,9 @@ test('bufio_test_tcp_connect', bufio_test_tcp_connect, is_parallel : false, suit test('bufio_test_delayed_tcp_connect', bufio_test_delayed_tcp_connect, is_parallel : false, suite: ['all', 'default']) test('bufio_test_lockedfile', bufio_test_lockedfile, is_parallel : false, suite: ['all', 'default']) test('bufio_test_file', bufio_test_file, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_pipe', bufio_test_pipe, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_namedpipe', bufio_test_namedpipe, is_parallel : false, suite: ['all', 'default']) +test('bufio_test_namedpipe_async', bufio_test_namedpipe_async, is_parallel : false, suite: ['all', 'default']) test('bufio_test_mem_interface', bufio_test_mem_interface, is_parallel : false, suite: ['all', 'default']) test('bufio_test_wait_on_tcpclose', bufio_test_wait_on_tcpclose, is_parallel : false, timeout : 120, suite: ['all', 'extensive'])