diff --git a/index.js b/index.js index 46531bd..dee4b55 100644 --- a/index.js +++ b/index.js @@ -127,11 +127,27 @@ TransportStream.prototype._writev = function _writev(chunks, callback) { return this.logv(infos, callback); } + let numLogCalls = 0; + let numCallbacksCalled = 0; + const wrapCallback = i => (...args) => { + setImmediate(() => { + chunks[i].callback(...args); + + numCallbacksCalled += 1; + + if (numCallbacksCalled === numLogCalls) { + return callback(null); + } + }); + }; + for (let i = 0; i < chunks.length; i++) { if (!this._accept(chunks[i])) continue; + numLogCalls += 1; + if (chunks[i].chunk && !this.format) { - this.log(chunks[i].chunk, chunks[i].callback); + this.log(chunks[i].chunk, wrapCallback(i)); continue; } @@ -151,18 +167,16 @@ TransportStream.prototype._writev = function _writev(chunks, callback) { if (errState || !transformed) { // eslint-disable-next-line callback-return - chunks[i].callback(); + wrapCallback(i); if (errState) { // eslint-disable-next-line callback-return callback(null); throw errState; } } else { - this.log(transformed, chunks[i].callback); + this.log(transformed, wrapCallback(i)); } } - - return callback(null); }; /** diff --git a/test/index.test.js b/test/index.test.js index ba1b78e..62daefb 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -326,6 +326,62 @@ describe('TransportStream', () => { expected.forEach(transport.write.bind(transport)); transport.uncork(); }); + + it("invokes the callback only when each chunk has finished", done => { + const expected = infosFor({ + count: 5, + levels: ["info"] + }); + + const individualCallbacks = []; + const transport = new TransportStream({ + level: "info", + log(info, callback) { + individualCallbacks.push(callback); + } + }); + + // + // Make the standard _write throw to ensure that _writev is called. + // + transport._write = () => { + throw new Error( + "TransportStream.prototype._write should never be called." + ); + }; + + // Wrap the standard _writev in a way to tell if the callback was called + let callbackCalled = false; + let individualCallbacksCalled = false; + const standardWriteV = transport._writev; + transport._writev = (chunks, callback) => { + standardWriteV.call(transport, chunks, () => { + assume(individualCallbacksCalled).equals(true); + callback(); + callbackCalled = true; + done(); + }); + }; + + transport.cork(); + transport.levels = testLevels; + expected.forEach(transport.write.bind(transport)); + transport.uncork(); + + // Callback shouldn't be called yet, since the individual log callbacks + // haven't been called + + setImmediate(() => { + assume(callbackCalled).equals(false); + + // After each individual log is finished, only then should the stream + // should be finished + for (const callback of individualCallbacks) { + callback(); + } + individualCallbacksCalled = true; + }); + }); }); describe('parent (i.e. "logger") ["pipe", "unpipe"]', () => {