diff --git a/lib/tail-file.js b/lib/tail-file.js index 17bd523..1356481 100644 --- a/lib/tail-file.js +++ b/lib/tail-file.js @@ -16,6 +16,7 @@ const kFileHandle = Symbol('fileHandle') const kPollTimer = Symbol('pollTimer') const kQuitting = Symbol('quitting') const kInode = Symbol('inode') +const kPaused = Symbol('paused') function NOOP() {} @@ -104,6 +105,8 @@ class TailFile extends Readable { this[kPollTimer] = null this[kQuitting] = false this[kInode] = null + + this[kPaused] = false } async start() { @@ -112,6 +115,22 @@ class TailFile extends Readable { return } + unwatch() { + if (this[kQuitting] || this[kPaused]) return + + clearTimeout(this[kPollTimer]) + this[kPaused] = true + } + + async rewatch() { + if (!this[kPaused]) return + + this[kStartPos] = null + this[kPaused] = false + + await this._pollFileForChanges() + } + async _openFile() { this[kFileHandle] = await fs.promises.open(this[kFileName], 'r') return @@ -158,6 +177,7 @@ class TailFile extends Readable { this[kStream] = null setImmediate(this.emit.bind(this), 'flush', { lastReadPosition: this[kStartPos] + , firstPoll: false }) return } @@ -169,12 +189,14 @@ class TailFile extends Readable { this[kPollFailureCount] = 0 // reset const eof = stats.size let fileHasChanged = false + let firstPoll = false if (!this[kInode]) this[kInode] = stats.ino if (this[kStartPos] === null) { // First iteration - nothing has been polled yet this[kStartPos] = eof + firstPoll = true } else if (this[kInode] !== stats.ino) { // File renamed/rolled between polls without triggering `ENOENT`. // Conditional since this *may* have already been done if `ENOENT` threw earlier. @@ -219,6 +241,7 @@ class TailFile extends Readable { } else { setImmediate(this.emit.bind(this), 'flush', { lastReadPosition: this[kStartPos] + , firstPoll }) } diff --git a/test/tail-file.js b/test/tail-file.js index 7978f65..b56a7ab 100644 --- a/test/tail-file.js +++ b/test/tail-file.js @@ -19,6 +19,8 @@ test('Exports structure', async (t) => { t.same(methods, [ 'constructor' , 'start' + , 'unwatch' + , 'rewatch' , '_openFile' , '_readRemainderFromFileHandle' , '_readChunks' @@ -29,7 +31,7 @@ test('Exports structure', async (t) => { , 'quit' ], 'Methods names as expected') - t.equal(methods.length, 10, 'TailFile.prototype prop count') + t.equal(methods.length, 12, 'TailFile.prototype prop count') }) test('TailFile instantiation', async (t) => { @@ -753,3 +755,74 @@ test('Invalid options checks', async (t) => { , meta: {got: 1.23456} }, 'startPos must be > 0') }) + +test('Test unwatching and rewatching a file', async (t) => { + const name = 'logfile.txt' + const testDir = t.testdir({ + [name]: '' + }) + const filename = path.join(testDir, name) + const tail = new TailFile(filename, { + encoding: 'utf8' + , pollFileIntervalMs: 100 + }) + + t.teardown(() => { + tail.quit().catch(fail) + }) + + const testString = 'Here is a line\n' + + await tail.start() + await fs.promises.appendFile(filename, testString) + + tail.unwatch() + + // call again to satisfy tests + tail.unwatch() + + let data_received = false + + const ac = new AbortController() + + once(tail, 'data', {signal: ac.signal}).then(() => { + data_received = true + }, () => {}) + + await fs.promises.appendFile(filename, testString) + await fs.promises.appendFile(filename, testString) + + await sleep(300) + + ac.abort() + + t.equal(data_received, false, 'Did not receive data when not watching') + + // do NOT await rewatch, or 'flush' event will already have been sent + tail.rewatch() + + // should do nothing + tail.rewatch() + + const [flush] = await once(tail, 'flush') + + await fs.promises.appendFile(filename, testString) + const [line] = await once(tail, 'data') + const [flush2] = await once(tail, 'flush') + + t.equal(flush.firstPoll, true, 'After rewatch, flush event with firstPoll true') + t.equal(flush2.firstPoll, false, 'New data polled, where firstPoll should be false') + t.equal(line, testString, 'New line was received') + + t.equal( + flush.lastReadPosition + , testString.length * 3 + , 'Did emit three lines, lastReadPosition equals length of those lines.' + ) + + t.equal( + flush2.lastReadPosition + , testString.length * 4 + , 'lastReadPosition correctly relects last added test line.' + ) +})