Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,26 @@ ArrowBlob(bytes::Vector{UInt8}, pos::Int, len::Nothing) =

tobytes(bytes::Vector{UInt8}) = bytes
tobytes(io::IO) = Base.read(io)
tobytes(io::IOStream) = Mmap.mmap(io)

function tobytes(io::IOStream)
# Try to use mmap for seekable streams (regular files)
# Fall back to read() for non-seekable streams (FIFOs, pipes, etc.)
# where mmap would return empty bytes or fail
try
# Check if stream is seekable by testing position/seek
pos = position(io)
seek(io, pos)
# Also check filesize - FIFOs report size 0
if filesize(io) > 0
return Mmap.mmap(io)
end
catch
# Not seekable, fall through to read
end
# Non-seekable or zero-size: read all bytes
return Base.read(io)
end

tobytes(file_path) = open(tobytes, file_path, "r")

struct BatchIterator
Expand Down
18 changes: 14 additions & 4 deletions src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,20 @@ function Base.write(io::IO, msg::Message, blocks, sch, alignment)
metalen = padding(length(msg.msgflatbuf), alignment)
@debug "writing message: metalen = $metalen, bodylen = $(msg.bodylen), isrecordbatch = $(msg.isrecordbatch), headerType = $(msg.headerType)"
if msg.blockmsg
push!(
blocks[msg.isrecordbatch ? 1 : 2],
Block(position(io), metalen + 8, msg.bodylen),
)
# Track block positions for file format footer
# Skip for non-seekable streams (streaming to FIFOs, sockets, etc.)
# where position() would fail - block tracking is only needed for file format
pos = try
position(io)
catch
nothing # Non-seekable stream, skip block tracking
end
if pos !== nothing
push!(
blocks[msg.isrecordbatch ? 1 : 2],
Block(pos, metalen + 8, msg.bodylen),
)
end
end
# now write the final message spec out
# continuation byte
Expand Down
Loading