Skip to content

Commit

Permalink
resolves #97 propagate events (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggrossetie authored Aug 22, 2022
1 parent 1f21faa commit 13d3c8f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,31 @@ stream.end()

This module works with `yarn` in PnP (plug'n play) mode too!

### Emit events

You can emit events on the ThreadStream from your worker using [`worker.parentPort.postMessage()`](https://nodejs.org/api/worker_threads.html#workerparentport).
The message (JSON object) must have the following data structure:

```js
parentPort.postMessage({
code: 'EVENT',
name: 'eventName',
args: ['list', 'of', 'args', 123, new Error('Boom')]
})
```

On your ThreadStream, you can add a listener function for this event name:

```js
const stream = new ThreadStream({
filename: join(__dirname, 'worker.js'),
workerData: {},
})
stream.on('eventName', function (a, b, c, n, err) {
console.log('received:', a, b, c, n, err) // received: list of args 123 Error: Boom
})
```

## License

MIT
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ function onWorkerMessage (msg) {
case 'ERROR':
destroy(stream, msg.err)
break
case 'EVENT':
if (Array.isArray(msg.args)) {
stream.emit(msg.name, ...msg.args)
} else {
stream.emit(msg.name, msg.args)
}
break
default:
destroy(stream, new Error('this should not happen: ' + msg.code))
}
Expand Down
22 changes: 22 additions & 0 deletions test/emit-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const { Writable } = require('stream')
const parentPort = require('worker_threads').parentPort

async function run () {
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
if (parentPort) {
parentPort.postMessage({
code: 'EVENT',
name: 'socketError',
args: ['list', 'of', 'args', 123, new Error('unable to write data to the TCP socket')]
})
}
cb()
}
})
}

module.exports = run
23 changes: 23 additions & 0 deletions test/event.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict'

const { test } = require('tap')
const { join } = require('path')
const ThreadStream = require('..')

test('event propagate', function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'emit-event.js'),
workerData: {},
sync: true
})
stream.on('socketError', function (a, b, c, n, error) {
t.same(a, 'list')
t.same(b, 'of')
t.same(c, 'args')
t.same(n, 123)
t.same(error, new Error('unable to write data to the TCP socket'))
t.end()
})
stream.write('hello')
stream.end()
})

0 comments on commit 13d3c8f

Please sign in to comment.