📦 Broadcast Spool
A Spool that implements CQRS/Event Sourcing patterns, with extraordinary route pattern matching.
Spool-broadcast helps distribute your Fabrix Applications with a specialized broadcasting pattern over a PubSub and WebSockets.
Event sourcing your functional workflow makes building complicated or distributed apps easier. With Spool-broadcast, you can dispatch events over different broadcasters and domain lines and have the operations wrapped in a single ACID transaction that is easy to manage. This makes adding new operations to a sequence trivial and application development easier to distribute. It also measures the completion time for each operation so that you can find logical bottle necks while also validating the data transformations at every step.
Additionally, Spool-broadcast also has a very powerful retry engine, so that it can try and keep operations alive while it's waiting on third parties to complete without having to dump the entire command/event.
Spool-broadcast runs Commands, SAGAs, Events, Processors, and Socket Channels!
You will need NPM or Yarn installed to install spool-broadcast (and fabrix)
$ npm install --save @fabrix/spool-broadcast
Broadcast has a few dependency Spools:
Joi, Errors, Sequelize, Realtime, Retry
$ npm install --save @fabrix/spool-realtime @fabrix/spool-joi @fabrix/spool-errors @fabrix/spool-sequelize, @fabrix/spool-realtime, @fabrix/spool-retry
Additionally, If you install the plugin sequelize-hierarchy
, then it will turn the BroadcastEvent into a CTE, which is useful for debugging and creates a help table, broadcasteventancetors
.
// config/main.ts
import { BroadcastSpool } from '@fabrix/spool-broadcast'
import { RealtimeSpool } from '@fabrix/spool-realtime'
import { SequelizeSpool } from '@fabrix/spool-sequelize'
import { ErrorsSpool } from '@fabrix/spool-errors'
import { JoiSpool } from '@fabrix/spool-joi'
export const main = {
spools: [
// ... other spools
ErrorsSpool,
JoiSpool,
RealtimeSpool,
SequelizeSpool,
BroadcastSpool
// ... other spools
]
}
A running pattern that acts as a smart circuit breaker. For example, when there is an error in the event loop, operations that we're called in the SAGA will be given a cancel command to "Reverse" what they did. In real life, this is something like where your operation books a flight, a hotel, and a car, if the car if the flight is not available, you would want to cancel the booking of the hotel and car.
An Event Emitter that runs Actions and Commands in a Sequence. They are for "Transaction Blocks", for example: when you want certain operations to happen in a sequence that have clear and determined seperations in transactions.
Command listeners that will run before or after the SAGA. Once a command is requested, you may wish to do some validation on the command, imbue it with more data, or run some various logic or calculations. Hooks can be used before and after the SAGA, however, they are not reversed like operations in the SAGA are.
Event listeners that will trigger more events. When an event is dispatched, there may be commands that you want to correlate with the command. When a processor is called, it will return it's value before the next tick in the Broadcast Event loop. This is what allows for spool-broadcast to make exteremly in-depth trees of commands/events predictably.
Event listener that will save data from an event into a projection. A projection are just an easy Read table(s) that make reading from aggregates faster and easier to understand.
A dispatcher creates a secondary event from a primary event without running through the normal processes.
TODO
A list of socket subscribers that get notified when an event occurs. For example, you want people to know in a different application that something has happened on your application.
Spool-broadcast uses a transaction to make an "all or nothing" ACID transaction for the resulting Aggregate update. You should use the SAGA pattern in the command pipe to make non-acid transactions mid flight, or use Pipelines to create transaction blocks. This allows for complex trees of commands and events to be performed in a reliable and predictable way.
A full event flow example:
- Create User (command)
- Is User Unique (pre-hook)
- Tell 3rd party that user is joining (saga)
- Anything that fails after this point, Tell the 3rd party to redact that operation (saga reverse)
- Add 3rd party response to User data (post-hook)
- User is Created (event)
- Add User's Profile in a database (projectors)
- Add User to New Friends List (processor)
- Create Friend List Entry (command)
- Update User's friends list in the database (projectors)
- Broadcast to a Channel that User is created (channel)
export const broadcast = {
/**
* If broadcast should handle transactions, highly recommended true
*/
auto_transaction: true,
/**
* Connection for eventually consistent events
*/
connection: {
/**
* Connection information could also be passed via uri
*/
uri: process.env.CLOUDAMQP_URL
},
/**
* Profile for this Fabrix instance, this will only allow Broadcast in this profile to run.
*/
profile: process.env.BROADCAST_PROFILE || 'development',
/**
* Broadcasters to run for profile definition
* <profile>: [
* <Broadcast>
* ]
*/
profiles: {
development: [
'CartBroadcast',
'CollectionBroadcast',
'UserBroadcast'
],
},
/**
* Add Special Configurations to the broadcaster such as tracing
*/
broadcasters: {
CartBroadcast: { trace: true }
},
/**
* Pipeline subscriptions to Broadcasters
* e.g.
* <Pipeline>: {
* broadcasters: {
* <Broadcast>: {
* pipeline: {
* <Entry.point>: {
* config: {}
* }
* }
* }
* }
* }
*/
pipelines: {
/**
* The Name of the Pipeline Resource
*/
CollectionPipeline: {
broadcasters: {
/**
* The name of the Broadcast to listend to
*/
CollectionBroadcast: {
'Unique_Name': {
'Collection.createCollection': {
zip: {
event_type: 'event_type',
object: 'object',
data: 'data'
}
},
'Collection.findByPkCollection': {
before: function (req, body, options) {
body = {
params: {
channel_uuid: body.data.channel_uuid,
cart_uuid: body.data.cart_uuid
},
query: {}
}
return [req, body, { parent: options }]
},
// after: function(req, body, options) {
// console.log('BRK trial after 2', body.data)
// return [req, body, options]
// },
zip: {
data: 'data'
}
}
},
}
}
},
},
channels: {
TestChannel: {
broadcasters: {
/**
* Broadcaster that the Test BroadcastChannel is listening to
*/
TestBroadcaster: {
/**
* Events subscribed to
*/
'test.created': {
/**
* Channel methods to run when event committed
*/
created: {
lifespan: 'eternal',
config: {
priority: 1,
expects_input: 'TestModel'
}
},
},
'test.:crud': {
/**
* Channel methods to run when event committed
*/
crud: {
lifespan: 'eternal',
config: {
priority: 2,
expects_input: 'TestModel'
}
},
created2: {
lifespan: 'ephemeral',
config: {
priority: 3,
expects_input: ['TestModel', 'TestModel.list']
}
},
},
},
}
},
},
hooks: {
/**
* HookIns
*/
TestHook: {
broadcasters: {
/**
* Broadcaster that the Test BroadcastHookIn is hooked into
*/
TestBroadcaster: {
/**
* Commands subscribed to
*/
'create.test': {
/**
* Hook methods to run when command pattern is matched and dispatched
*/
create: {
lifecycle: 'before',
config: {
priority: 1,
expects_input: 'TestModel',
merge: true,
expects_response: 'TestModel'
}
},
},
'create.:test_uuid.test': {
/**
* Hook methods to run when command pattern is matched and dispatched
*/
create: {
lifecycle: 'before',
config: {
priority: 1,
expects_input: 'TestModel',
merge: true,
expects_response: 'TestModel'
}
},
satisfy: {
lifecycle: 'after',
config: {
priority: 1,
expects_input: 'TestModel',
merge: true,
expects_response: 'TestModel'
}
},
},
'*.test': {
/**
* Hook methods to run when command pattern is matched and dispatched
*/
shouldBeProper: {
lifecycle: 'before',
config: {
priority: 1,
expects_input: 'TestModel',
merge: true,
expects_response: 'TestModel'
}
},
}
}
}
}
},
processors: {
/**
* Processors
*/
TestProcessor: {
/**
* ... some processor configuration if required
*/
broadcasters: {
/**
* Broadcasters that the Test Processors are responding to
*/
TestBroadcaster: {
/**
* Events subscribed to
*/
'test.:test_uuid.created': {
/**
* Processor methods to run when event pattern is matched and dispatched
*/
update: {
consistency: 'strong',
config: {
priority: 1,
expects_input: 'TestModel',
dispatches_command: 'update.test.:test_uuid',
expects_response: 'TestModel',
expects_output: 'TestModel',
data: {
merge: true,
}
}
},
},
'test.created': {
/**
* Processor methods to run when event pattern is matched and dispatched
*/
update: {
consistency: 'strong',
config: {
priority: 1,
expects_input: 'TestModel',
dispatches_command: 'update.test.:test_uuid',
expects_response: 'TestModel',
expects_output: 'TestModel',
data: {
merge: true,
}
}
},
},
},
}
}
},
projectors: {
/**
* Projectors
*/
TestProjector: {
broadcasters: {
/**
* Broadcaster that the Test Projectors are responding to
*/
TestBroadcaster: {
'test.:crud': {
/**
* Projector methods to run when event pattern is matched and dispatched
*/
myMethod: {
consistency: 'strong',
config: {
priority: 3,
expects_input: 'TestModel',
data: {
merge: true,
},
expects_output: 'TestModel'
}
},
myLoggerMethod: {
consistency: 'eventual',
config: {
priority: 3,
expects_input: 'TestModel',
data: {
merge: true,
},
expects_output: 'TestLoggerModel'
}
},
},
'test.:test_uuid.created': {
/**
* Projector methods to run when event pattern is matched and dispatched
*/
created: {
consistency: 'strong',
config: {
priority: 1,
expects_input: 'TestModel',
data: {
merge: true,
},
expects_output: 'TestModel'
}
},
logger: {
consistency: 'eventual',
config: {
priority: 3,
expects_input: 'TestModel',
expects_output: 'TestLoggerModel'
}
},
},
'test.:test_uuid.test.:test_uuid.created': {
/**
* Projector methods to run when event pattern is matched and dispatched
*/
created: {
consistency: 'strong',
config: {
priority: 1,
expects_input: 'TestModel',
data: {
merge: true,
},
expects_output: 'TestModel'
}
},
logger: {
consistency: 'eventual',
config: {
priority: 3,
expects_input: 'TestModel',
expects_output: 'TestLoggerModel'
}
},
},
'test.*': {
/**
* Projector methods to run when event pattern is matched and dispatched
*/
wild: {
consistency: 'eventual',
config: {
priority: 255,
expects_input: '*',
expects_response: '*',
expects_output: '*'
}
},
},
},
}
}
}
}
Channels are useful for letting other applications listen to your application over sockets. This is a useful pattern in micro services, but also doing things like realtime applications.
Pipelines are useful for separating out your logic into transactional blocks. Complicated command/event cycles in a single transaction can quickly add load to your database, so having clear lines to separate the transactions is paramount for weighty commands. Pipelines are also useful for gathering a projected view after running a series of command/events.
Hooks are useful when you want to run operations on a command before, during, and after the SAGA before a command is dispatched as event.
Processors are useful when you want to dispatch a subsequent command when an event happens. Each processor makes a child transaction for the command/event sequence it dispatches so that it can also rollback if something later in the sequence fails.
Projectors are useful for saving event data in a useable way. For example, a "User Created" event was dispatched, so now it might be nice to save their PII encrypted in one table, and their other details in a different table. You can easily do this with multiple projectors on the same event.
Occasionally, you may need to multi-project when a projection is run and trigger side-events. Dispatchers let you trigger these without running a processor for a new event. For example, an event happens and updates an aggregate or projection, but you have other listeners that don't need a fully new event to respond. This is different from a processor which will dispatch a new command, instead, a dispatcher is a "side effect" of an event that contains at least some data from the original event but has a different event id.
We love contributions! Please check out our Contributor's Guide for more information on how our projects are organized and how to get started.
When the master is tagged with a release, it will automatically publish to npm, updates the Changelog and bumps the version. Fabrix uses the standard-version library to manage it all.
To run a patch release:
npm run release -- --release-as patch
and then commit to master. git push --follow-tags origin master
You can also test the release by running
npm run release -- --dry-run --release-as patch
Usage is currently under the MIT License
TLDR; In plain language, do what ever you want with library as long as you keep the license in your code, give credit to the authors, and you accept all the responsibility!
If you are making money using this library, consider sharing some of that sweet cheddar with the authors, they work really hard and would seriously appreciate it 😄