Skip to content

Commit

Permalink
Fix _writev to only call its callback when all chunks are done
Browse files Browse the repository at this point in the history
Fixs github issue winstonjs#49

This allows stream events such as 'finish' to work correctly on the
transport stream instead of possibly triggering before the individual
log calls have been finished.
  • Loading branch information
Manuel Lagang committed Aug 17, 2019
1 parent 46db8f3 commit 1ed7c3c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
24 changes: 19 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,27 @@ TransportStream.prototype._writev = function _writev(chunks, callback) {
return this.logv(infos, callback);
}

let numLogCalls = 0;
let numCallbacksCalled = 0;
const wrapCallback = i => (...args) => {
setImmediate(() => {
chunks[i].callback(...args);

numCallbacksCalled += 1;

if (numCallbacksCalled === numLogCalls) {
return callback(null);
}
});
};

for (let i = 0; i < chunks.length; i++) {
if (!this._accept(chunks[i])) continue;

numLogCalls += 1;

if (chunks[i].chunk && !this.format) {
this.log(chunks[i].chunk, chunks[i].callback);
this.log(chunks[i].chunk, wrapCallback(i));
continue;
}

Expand All @@ -151,18 +167,16 @@ TransportStream.prototype._writev = function _writev(chunks, callback) {

if (errState || !transformed) {
// eslint-disable-next-line callback-return
chunks[i].callback();
wrapCallback(i);
if (errState) {
// eslint-disable-next-line callback-return
callback(null);
throw errState;
}
} else {
this.log(transformed, chunks[i].callback);
this.log(transformed, wrapCallback(i));
}
}

return callback(null);
};

/**
Expand Down
56 changes: 56 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,62 @@ describe('TransportStream', () => {
expected.forEach(transport.write.bind(transport));
transport.uncork();
});

it("invokes the callback only when each chunk has finished", done => {
const expected = infosFor({
count: 5,
levels: ["info"]
});

const individualCallbacks = [];
const transport = new TransportStream({
level: "info",
log(info, callback) {
individualCallbacks.push(callback);
}
});

//
// Make the standard _write throw to ensure that _writev is called.
//
transport._write = () => {
throw new Error(
"TransportStream.prototype._write should never be called."
);
};

// Wrap the standard _writev in a way to tell if the callback was called
let callbackCalled = false;
let individualCallbacksCalled = false;
const standardWriteV = transport._writev;
transport._writev = (chunks, callback) => {
standardWriteV.call(transport, chunks, () => {
assume(individualCallbacksCalled).equals(true);
callback();
callbackCalled = true;
done();
});
};

transport.cork();
transport.levels = testLevels;
expected.forEach(transport.write.bind(transport));
transport.uncork();

// Callback shouldn't be called yet, since the individual log callbacks
// haven't been called

setImmediate(() => {
assume(callbackCalled).equals(false);

// After each individual log is finished, only then should the stream
// should be finished
for (const callback of individualCallbacks) {
callback();
}
individualCallbacksCalled = true;
});
});
});

describe('parent (i.e. "logger") ["pipe", "unpipe"]', () => {
Expand Down

0 comments on commit 1ed7c3c

Please sign in to comment.