diff --git a/src/table.jl b/src/table.jl index de8bfc3..eb71cc5 100644 --- a/src/table.jl +++ b/src/table.jl @@ -25,7 +25,26 @@ ArrowBlob(bytes::Vector{UInt8}, pos::Int, len::Nothing) = tobytes(bytes::Vector{UInt8}) = bytes tobytes(io::IO) = Base.read(io) -tobytes(io::IOStream) = Mmap.mmap(io) + +function tobytes(io::IOStream) + # Try to use mmap for seekable streams (regular files) + # Fall back to read() for non-seekable streams (FIFOs, pipes, etc.) + # where mmap would return empty bytes or fail + try + # Check if stream is seekable by testing position/seek + pos = position(io) + seek(io, pos) + # Also check filesize - FIFOs report size 0 + if filesize(io) > 0 + return Mmap.mmap(io) + end + catch + # Not seekable, fall through to read + end + # Non-seekable or zero-size: read all bytes + return Base.read(io) +end + tobytes(file_path) = open(tobytes, file_path, "r") struct BatchIterator diff --git a/src/write.jl b/src/write.jl index 4c3800f..ce1c583 100644 --- a/src/write.jl +++ b/src/write.jl @@ -567,10 +567,20 @@ function Base.write(io::IO, msg::Message, blocks, sch, alignment) metalen = padding(length(msg.msgflatbuf), alignment) @debug "writing message: metalen = $metalen, bodylen = $(msg.bodylen), isrecordbatch = $(msg.isrecordbatch), headerType = $(msg.headerType)" if msg.blockmsg - push!( - blocks[msg.isrecordbatch ? 1 : 2], - Block(position(io), metalen + 8, msg.bodylen), - ) + # Track block positions for file format footer + # Skip for non-seekable streams (streaming to FIFOs, sockets, etc.) + # where position() would fail - block tracking is only needed for file format + pos = try + position(io) + catch + nothing # Non-seekable stream, skip block tracking + end + if pos !== nothing + push!( + blocks[msg.isrecordbatch ? 1 : 2], + Block(pos, metalen + 8, msg.bodylen), + ) + end end # now write the final message spec out # continuation byte