An async/await
wrapper over node-nats,
designed to easily integrate with your microservice code. Taşuu (ташуу)
is 'transport' in Kyrgyz.
npm i tasu
Create an instance of the transport:
const Tasuu = require('tasu');
async function main() {
...
const tasu = new Tasuu({group: 'some-service', ...});
await tasu.connected();
}
Publish a request and get a response via tasu.request()
on one end:
const {bar} = await tasu.request('foo', {arg: 1});
Note: this method uses requestOne
inside, no need to worry about max
responses
Subscribe and respond to a request on the other:
tasu.listen('foo', async ({arg}, respond) => {
const bar = await someDatabaseCall(arg);
return {bar};
});
Note: A listener is automatically added to queue group foo.listeners
;
errors of the handling function are caught and sent back as error response
Publish an event on one end:
tasu.publish('some.package.sent', {...});
Subscribe and process as worker queue on the other:
tasu.process('*.package.sent', (pack, subject) => {
console.log(subject, pack);
});
listen
, subscribe
and process
methods return an integer
subscription ID (SID) which can be used to unsubscribe from a subject:
const sid = tasu.process('*.package.sent', (pack, subject) => {
console.log(subject, pack);
});
...
tasu.unsubscribe(sid);
The above technique is useful for websocket connections.
Close NATS connection (if needed):
tasu.close();
You can set the following config options while creating a new Tasu instance:
url
- NATS connection urls, default:nats://localhost:4222
group
- group name this transport will be assigned to, default:default
requestTimeout
- how long to wait for response on NATS requests, default:10000
level
- set log level for provided logger. Default isdebug
logger
- define custom logger. Should basically haveinfo
,debug
,error
methods.
-
connected() - returns a promise resolved on successful connection to NATS server. You basically always want this resolved before proceeding with Tasu.
-
publish(subject, message) - optimistically publish a
message
to asubject
. TODO: make this return a promise too. -
listen(subject, async (message)=>{...}) - subscribes to a
subject
and respond via handler function. Handler function must be defined viaasync
, it getsmessage
object as the only argument and must return something as successful response. Errors are caught and sent back as error response. Returns subscription id. -
subscribe(subject, (message, replyTo, subject)=>{...}) - subscribes to a
subject
and process messages with handler function. All memebers of the group will get subject messages. Returns subscription id. -
subOnce(subject, (message, subject)=>{...}) - same as
subscribe()
but unsubscribes immediately after receiving a message. -
process(subject, (message, subject)=>{...}) - subscribes to a
subject
as a queue worker and processes messages via handler function. Messages will be distributed randomly among free group members. Returns subscription id. -
request(subject[, message]) - performs a request and returns a response promise.
-
unsubscribe(sid) - unsubscribes from subscription identified by
sid
(returned bysubscribe
,subOnce
,process
,listen
) -
close() - closes connection to NATS. Always try to exit gracefully, otherwise the connection will persist until next heartbeat.
Icons by icons8