Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/tail-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const kFileHandle = Symbol('fileHandle')
const kPollTimer = Symbol('pollTimer')
const kQuitting = Symbol('quitting')
const kInode = Symbol('inode')
const kPaused = Symbol('paused')

function NOOP() {}

Expand Down Expand Up @@ -104,6 +105,8 @@ class TailFile extends Readable {
this[kPollTimer] = null
this[kQuitting] = false
this[kInode] = null

this[kPaused] = false
}

async start() {
Expand All @@ -112,6 +115,22 @@ class TailFile extends Readable {
return
}

unwatch() {
if (this[kQuitting] || this[kPaused]) return

clearTimeout(this[kPollTimer])
this[kPaused] = true
}

async rewatch() {
if (!this[kPaused]) return

this[kStartPos] = null
this[kPaused] = false

await this._pollFileForChanges()
}

async _openFile() {
this[kFileHandle] = await fs.promises.open(this[kFileName], 'r')
return
Expand Down Expand Up @@ -158,6 +177,7 @@ class TailFile extends Readable {
this[kStream] = null
setImmediate(this.emit.bind(this), 'flush', {
lastReadPosition: this[kStartPos]
, firstPoll: false
})
return
}
Expand All @@ -169,12 +189,14 @@ class TailFile extends Readable {
this[kPollFailureCount] = 0 // reset
const eof = stats.size
let fileHasChanged = false
let firstPoll = false

if (!this[kInode]) this[kInode] = stats.ino

if (this[kStartPos] === null) {
// First iteration - nothing has been polled yet
this[kStartPos] = eof
firstPoll = true
} else if (this[kInode] !== stats.ino) {
// File renamed/rolled between polls without triggering `ENOENT`.
// Conditional since this *may* have already been done if `ENOENT` threw earlier.
Expand Down Expand Up @@ -219,6 +241,7 @@ class TailFile extends Readable {
} else {
setImmediate(this.emit.bind(this), 'flush', {
lastReadPosition: this[kStartPos]
, firstPoll
})
}

Expand Down
75 changes: 74 additions & 1 deletion test/tail-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ test('Exports structure', async (t) => {
t.same(methods, [
'constructor'
, 'start'
, 'unwatch'
, 'rewatch'
, '_openFile'
, '_readRemainderFromFileHandle'
, '_readChunks'
Expand All @@ -29,7 +31,7 @@ test('Exports structure', async (t) => {
, 'quit'
], 'Methods names as expected')

t.equal(methods.length, 10, 'TailFile.prototype prop count')
t.equal(methods.length, 12, 'TailFile.prototype prop count')
})

test('TailFile instantiation', async (t) => {
Expand Down Expand Up @@ -753,3 +755,74 @@ test('Invalid options checks', async (t) => {
, meta: {got: 1.23456}
}, 'startPos must be > 0')
})

test('Test unwatching and rewatching a file', async (t) => {
const name = 'logfile.txt'
const testDir = t.testdir({
[name]: ''
})
const filename = path.join(testDir, name)
const tail = new TailFile(filename, {
encoding: 'utf8'
, pollFileIntervalMs: 100
})

t.teardown(() => {
tail.quit().catch(fail)
})

const testString = 'Here is a line\n'

await tail.start()
await fs.promises.appendFile(filename, testString)

tail.unwatch()

// call again to satisfy tests
tail.unwatch()

let data_received = false

const ac = new AbortController()

once(tail, 'data', {signal: ac.signal}).then(() => {
data_received = true
}, () => {})

await fs.promises.appendFile(filename, testString)
await fs.promises.appendFile(filename, testString)

await sleep(300)

ac.abort()

t.equal(data_received, false, 'Did not receive data when not watching')

// do NOT await rewatch, or 'flush' event will already have been sent
tail.rewatch()

// should do nothing
tail.rewatch()

const [flush] = await once(tail, 'flush')

await fs.promises.appendFile(filename, testString)
const [line] = await once(tail, 'data')
const [flush2] = await once(tail, 'flush')

t.equal(flush.firstPoll, true, 'After rewatch, flush event with firstPoll true')
t.equal(flush2.firstPoll, false, 'New data polled, where firstPoll should be false')
t.equal(line, testString, 'New line was received')

t.equal(
flush.lastReadPosition
, testString.length * 3
, 'Did emit three lines, lastReadPosition equals length of those lines.'
)

t.equal(
flush2.lastReadPosition
, testString.length * 4
, 'lastReadPosition correctly relects last added test line.'
)
})