diff --git a/README.md b/README.md index 9753b73..4abe169 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,10 @@ A very simplistic, but performant, remote worker system that uses AMQP to coordi | `develop` | [![CircleCI](https://circleci.com/gh/davesag/amqp-delegate/tree/develop.svg?style=svg)](https://circleci.com/gh/davesag/amqp-delegate/tree/develop) | [![codecov](https://codecov.io/gh/davesag/amqp-delegate/branch/develop/graph/badge.svg)](https://codecov.io/gh/davesag/amqp-delegate) | Work in progress | | `master` | [![CircleCI](https://circleci.com/gh/davesag/amqp-delegate/tree/master.svg?style=svg)](https://circleci.com/gh/davesag/amqp-delegate/tree/master) | [![codecov](https://codecov.io/gh/davesag/amqp-delegate/branch/master/graph/badge.svg)](https://codecov.io/gh/davesag/amqp-delegate) | Latest stable release | +``` +npm install amqp-delegate +``` + ## Worker ``` @@ -43,7 +47,6 @@ worker.stop().then(() => { const { makeDelegator } = require('amqp-delegate') const delegator = makeWorker({ - exchange: — defaults to '', url: - defaults to ampq://localhost, onError: err => { // optional console.error('A connection error happened', err) // or do something clever @@ -69,6 +72,8 @@ delegator ### The worker ``` +const { makeWorker } = require('amqp-delegate') + const task = (a, b) => new Promise(resolve => setTimeout(() => resolve(a + b), 10)) @@ -96,6 +101,8 @@ worker ### The delegator ``` +const { makeDelegator } = require('amqp-delegate') + const delegator = makeDelegator() delegator @@ -109,11 +116,6 @@ delegator }) ``` -## TODO - -* documentation -* publish to npm - ## Development ### Prerequisites diff --git a/docker-compose.yml b/docker-compose.yml index 82c4056..1a0d3e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ volumes: services: amqp: - image: rabbitmq + image: rabbitmq:management-alpine ports: - 15672:15672 - 5672:5672 diff --git a/package-lock.json b/package-lock.json index 7daa266..1eced5b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "amqp-delegate", - "version": "1.0.0", + "version": "1.0.1", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -72,6 +72,15 @@ "integrity": "sha512-ATz6yX/L8LEnC3dtLQnIx4ydcPxhLcoy9Vl6re00zb2w5lG6itY6Vhnr1KFRPq/FHNsgl/gh2mjNN20f9iJTTA==", "dev": true }, + "@babel/runtime": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.0.0.tgz", + "integrity": "sha512-7hGhzlcmg01CvH1EHdSPVXYX1aJ8KCEyz6I9xYIi/asDtzBPMyMhVibhM/K6g/5qnKBwjZtp10bNZIEFTRW1MA==", + "dev": true, + "requires": { + "regenerator-runtime": "^0.12.0" + } + }, "@babel/template": { "version": "7.2.2", "resolved": "https://registry.npmjs.org/@babel/template/-/template-7.2.2.tgz", @@ -1709,6 +1718,12 @@ "write": "^0.2.1" } }, + "fn-name": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/fn-name/-/fn-name-2.0.1.tgz", + "integrity": "sha1-UhTXU3pNBqSjAcDMJi/rhBiAAuc=", + "dev": true + }, "for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -2458,24 +2473,6 @@ "semver": "^5.5.0" } }, - "jest-get-type": { - "version": "22.4.3", - "resolved": "https://registry.npmjs.org/jest-get-type/-/jest-get-type-22.4.3.tgz", - "integrity": "sha512-/jsz0Y+V29w1chdXVygEKSz2nBoHoYqNShPe+QgxSNjAuP1i8+k4LbQNrfoliKej0P45sivkSCh7yiD6ubHS3w==", - "dev": true - }, - "jest-validate": { - "version": "23.6.0", - "resolved": "https://registry.npmjs.org/jest-validate/-/jest-validate-23.6.0.tgz", - "integrity": "sha512-OFKapYxe72yz7agrDAWi8v2WL8GIfVqcbKRCLbRG9PAxtzF9b1SEDdTpytNDN12z2fJynoBwpMpvj2R39plI2A==", - "dev": true, - "requires": { - "chalk": "^2.0.1", - "jest-get-type": "^22.1.0", - "leven": "^2.1.0", - "pretty-format": "^23.6.0" - } - }, "js-tokens": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", @@ -2537,12 +2534,6 @@ "invert-kv": "^2.0.0" } }, - "leven": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/leven/-/leven-2.1.0.tgz", - "integrity": "sha1-wuep93IJTe6dNCAq6KzORoeHVYA=", - "dev": true - }, "levn": { "version": "0.3.0", "resolved": "https://registry.npmjs.org/levn/-/levn-0.3.0.tgz", @@ -2554,15 +2545,15 @@ } }, "lint-staged": { - "version": "8.1.0", - "resolved": "https://registry.npmjs.org/lint-staged/-/lint-staged-8.1.0.tgz", - "integrity": "sha512-yfSkyJy7EuVsaoxtUSEhrD81spdJOe/gMTGea3XaV7HyoRhTb9Gdlp6/JppRZERvKSEYXP9bjcmq6CA5oL2lYQ==", + "version": "8.1.1", + "resolved": "https://registry.npmjs.org/lint-staged/-/lint-staged-8.1.1.tgz", + "integrity": "sha512-6C9tmmCedjDYQMzHydT5mXRtmEgpGUQDoIl+Ser8cfI/n9grsRUsuG2jd1BWqGf62OV+BV+6n/Drt82uYTCgJg==", "dev": true, "requires": { "@iamstarkov/listr-update-renderer": "0.4.1", "chalk": "^2.3.1", "commander": "^2.14.1", - "cosmiconfig": "5.0.6", + "cosmiconfig": "^5.0.2", "debug": "^3.1.0", "dedent": "^0.7.0", "del": "^3.0.0", @@ -2571,7 +2562,6 @@ "g-status": "^2.0.2", "is-glob": "^4.0.0", "is-windows": "^1.0.2", - "jest-validate": "^23.5.0", "listr": "^0.14.2", "lodash": "^4.17.5", "log-symbols": "^2.2.0", @@ -2583,20 +2573,10 @@ "please-upgrade-node": "^3.0.2", "staged-git-files": "1.1.2", "string-argv": "^0.0.2", - "stringify-object": "^3.2.2" + "stringify-object": "^3.2.2", + "yup": "^0.26.10" }, "dependencies": { - "cosmiconfig": { - "version": "5.0.6", - "resolved": "https://registry.npmjs.org/cosmiconfig/-/cosmiconfig-5.0.6.tgz", - "integrity": "sha512-6DWfizHriCrFWURP1/qyhsiFvYdlJzbCzmtFWh744+KyWsJo5+kPzUZZaMRSSItoYc0pxFX7gEO7ZC1/gN/7AQ==", - "dev": true, - "requires": { - "is-directory": "^0.3.1", - "js-yaml": "^3.9.0", - "parse-json": "^4.0.0" - } - }, "debug": { "version": "3.2.6", "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz", @@ -2612,16 +2592,6 @@ "integrity": "sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg==", "dev": true }, - "parse-json": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/parse-json/-/parse-json-4.0.0.tgz", - "integrity": "sha1-vjX1Qlvh9/bHRxhPmKeIy5lHfuA=", - "dev": true, - "requires": { - "error-ex": "^1.3.1", - "json-parse-better-errors": "^1.0.1" - } - }, "pify": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/pify/-/pify-3.0.0.tgz", @@ -4626,16 +4596,6 @@ "fast-diff": "^1.1.2" } }, - "pretty-format": { - "version": "23.6.0", - "resolved": "https://registry.npmjs.org/pretty-format/-/pretty-format-23.6.0.tgz", - "integrity": "sha512-zf9NV1NSlDLDjycnwm6hpFATCGl/K1lt0R/GdkAK2O5LN/rwJoB+Mh93gGJjut4YbmecbfgLWVGSTCr0Ewvvbw==", - "dev": true, - "requires": { - "ansi-regex": "^3.0.0", - "ansi-styles": "^3.2.0" - } - }, "process-nextick-args": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz", @@ -4648,6 +4608,12 @@ "integrity": "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==", "dev": true }, + "property-expr": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/property-expr/-/property-expr-1.5.1.tgz", + "integrity": "sha512-CGuc0VUTGthpJXL36ydB6jnbyOf/rAHFvmVrJlH+Rg0DqqLFQGAP6hIaxD/G0OAmBJPhXDHuEJigrp0e0wFV6g==", + "dev": true + }, "proxyquire": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/proxyquire/-/proxyquire-2.1.0.tgz", @@ -4735,6 +4701,12 @@ "string_decoder": "~0.10.x" } }, + "regenerator-runtime": { + "version": "0.12.1", + "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.12.1.tgz", + "integrity": "sha512-odxIc1/vDlo4iZcfXqRYFj0vpXFNoGdKMAUieAlFYO6m/nl5e9KR/beGf41z4a1FI+aQgtjhuaSlDxQ0hmkrHg==", + "dev": true + }, "regex-not": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/regex-not/-/regex-not-1.0.2.tgz", @@ -5361,6 +5333,12 @@ "integrity": "sha512-e900nM8RRtGhlV36KGEU9k65K3mPb1WV70OdjfxlG2EAuM1noi/E/BaW/uMhL7bPEssK8QV57vN3esixjUvcXQ==", "dev": true }, + "synchronous-promise": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/synchronous-promise/-/synchronous-promise-2.0.6.tgz", + "integrity": "sha512-TyOuWLwkmtPL49LHCX1caIwHjRzcVd62+GF6h8W/jHOeZUFHpnd2XJDVuUlaTaLPH1nuu2M69mfHr5XbQJnf/g==", + "dev": true + }, "table": { "version": "5.2.1", "resolved": "https://registry.npmjs.org/table/-/table-5.2.1.tgz", @@ -5448,6 +5426,12 @@ "repeat-string": "^1.6.1" } }, + "toposort": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/toposort/-/toposort-2.0.2.tgz", + "integrity": "sha1-riF2gXXRVZ1IvvNUILL0li8JwzA=", + "dev": true + }, "trim-right": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/trim-right/-/trim-right-1.0.1.tgz", @@ -5759,6 +5743,20 @@ "lodash": "^4.17.11", "yargs": "^12.0.5" } + }, + "yup": { + "version": "0.26.10", + "resolved": "https://registry.npmjs.org/yup/-/yup-0.26.10.tgz", + "integrity": "sha512-keuNEbNSnsOTOuGCt3UJW69jDE3O4P+UHAakO7vSeFMnjaitcmlbij/a3oNb9g1Y1KvSKH/7O1R2PQ4m4TRylw==", + "dev": true, + "requires": { + "@babel/runtime": "7.0.0", + "fn-name": "~2.0.1", + "lodash": "^4.17.10", + "property-expr": "^1.5.0", + "synchronous-promise": "^2.0.5", + "toposort": "^2.0.2" + } } } } diff --git a/package.json b/package.json index fee71b3..48ff6bd 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,15 @@ { "name": "amqp-delegate", - "version": "1.0.0", + "version": "1.0.1", "description": "A very simplistic, but performant, remote worker system that uses AMQP to coordinate jobs.", "main": "src/index.js", "engines": { "node": ">= 10.15.0" }, + "files": [ + "/src", + "/CONTRIBUTING.md" + ], "scripts": { "eslint-check": "eslint --print-config .eslintrc.js | eslint-config-prettier-check", "lint": "eslint .", @@ -41,7 +45,7 @@ "eslint-plugin-standard": "^4.0.0", "faker": "^4.1.0", "husky": "^1.3.1", - "lint-staged": "^8.1.0", + "lint-staged": "^8.1.1", "mocha": "^6.0.0-1", "nyc": "^13.1.0", "prettier": "^1.16.1", diff --git a/src/defaults.js b/src/defaults.js index 93facc9..ae7902f 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -1,6 +1,5 @@ const defaults = { - url: process.env.AMQP_URL || 'amqp://localhost', - exchange: '' + url: process.env.AMQP_URL || 'amqp://localhost' } module.export = defaults diff --git a/src/errors.js b/src/errors.js index 4eb540e..c034f12 100644 --- a/src/errors.js +++ b/src/errors.js @@ -5,7 +5,8 @@ const ERRORS = { QUEUE_ALREADY_STARTED: 'Message Queue has already been started', QUEUE_NOT_STARTED: 'Message Queue has not been started', TASK_MISSING: - 'You must provide an async pure function as a task for the worker to perform' + 'You must provide an async pure function as a task for the worker to perform', + WRONG_CORRELATION_ID: 'The provided correlationId is incorrect' } module.exports = ERRORS diff --git a/src/makeDelegator.js b/src/makeDelegator.js index 2e42485..7d6757a 100644 --- a/src/makeDelegator.js +++ b/src/makeDelegator.js @@ -4,7 +4,8 @@ const v4 = require('uuid/v4') const { QUEUE_NOT_STARTED, QUEUE_ALREADY_STARTED, - NOT_CONNECTED + NOT_CONNECTED, + WRONG_CORRELATION_ID } = require('./errors') const defaults = require('./defaults') const attachEvents = require('./attachEvents') @@ -24,11 +25,10 @@ const makeDelegator = (options = {}) => { ...options } - const { exchange, url, onError, onClose } = _options + const { url, onError, onClose } = _options let connection let channel - let queue /** * start the delegator, making it ready to invoke workers. @@ -39,7 +39,6 @@ const makeDelegator = (options = {}) => { attachEvents(connection, { onError, onClose }) channel = await connection.createChannel() - queue = await channel.assertQueue(exchange, { exclusive: true }) } /** @@ -49,13 +48,14 @@ const makeDelegator = (options = {}) => { * @return a promise that resolves to the result of the worker's task. */ /* istanbul ignore next */ - const invoke = (name, ...params) => - new Promise((resolve, reject) => { - if (!channel) return reject(QUEUE_NOT_STARTED) - const buffer = Buffer.from(JSON.stringify(params)) - const correlationId = v4() - const replyTo = queue.queue + const invoke = async (name, ...params) => { + if (!channel) throw new Error(QUEUE_NOT_STARTED) + const queue = await channel.assertQueue('', { exclusive: true }) + const buffer = Buffer.from(JSON.stringify(params)) + const correlationId = v4() + const replyTo = queue.queue + return new Promise((resolve, reject) => { channel.consume( replyTo, message => { @@ -66,13 +66,14 @@ const makeDelegator = (options = {}) => { } catch (err) { return reject(err) } - } + } else return reject(WRONG_CORRELATION_ID) }, { noAck: true } ) channel.sendToQueue(name, buffer, { correlationId, replyTo }) }) + } /** * stops the delegator, disconnecting it from the amqp server @@ -84,7 +85,6 @@ const makeDelegator = (options = {}) => { await connection.close() channel = undefined connection = undefined - queue = undefined } return { start, invoke, stop } diff --git a/src/makeWorker.js b/src/makeWorker.js index 3f1c7a8..c597267 100644 --- a/src/makeWorker.js +++ b/src/makeWorker.js @@ -11,10 +11,9 @@ const attachEvents = require('./attachEvents') /** * Create a Worker with the given options. * @param options - * - exchange The name of the service exchange (required) * - name The name of the worker. (required) - * - task A pure async function that does the work - * - url The url of the AQMP server to use. Defaults to 'amqp://localhost' + * - task A pure async function that does the work (required) + * - url The url of the AQMP server to use. (optional, defaults to 'amqp://localhost') * - onError a hander to handle connection errors (optional) * - onClose a handler to handle connection closed events (optional) * @return A Worker @@ -42,7 +41,7 @@ const makeWorker = options => { attachEvents(connection, { onError, onClose }) channel = await connection.createChannel() - channel.assertQueue(name, { durable: false }) + await channel.assertQueue(name, { durable: false }) channel.prefetch(1) channel.consume( diff --git a/test/integration/delegateAsyncTask.test.js b/test/integration/delegateAsyncTask.test.js index 8443dd3..8cd2727 100644 --- a/test/integration/delegateAsyncTask.test.js +++ b/test/integration/delegateAsyncTask.test.js @@ -11,7 +11,7 @@ describe('delegate an asynchronous task', () => { task }) - const delegator = makeDelegator({ exchange: 'test' }) + const delegator = makeDelegator() let result diff --git a/test/integration/delegateManyTasks.test.js b/test/integration/delegateManyTasks.test.js new file mode 100644 index 0000000..b0a2535 --- /dev/null +++ b/test/integration/delegateManyTasks.test.js @@ -0,0 +1,44 @@ +const { expect } = require('chai') + +const makeWorker = require('../../src/makeWorker') +const makeDelegator = require('../../src/makeDelegator') + +// TODO: sometimes this gets stuck. work out why. +describe('delegate many tasks to multiple workers', () => { + const task = (a, b) => + new Promise(resolve => setTimeout(() => resolve(a + b), 10)) + + const worker1 = makeWorker({ + name: 'adder', + task + }) + + const worker2 = makeWorker({ + name: 'adder', + task + }) + + const delegator = makeDelegator() + + let results + + before(async () => { + await Promise.all([worker1.start(), worker2.start(), delegator.start()]) + + results = await Promise.all([ + delegator.invoke('adder', 10, 15), + delegator.invoke('adder', 20, 30), + delegator.invoke('adder', 40, 50), + delegator.invoke('adder', 100, 200) + ]) + }) + + after(async () => { + await Promise.all([worker1.stop(), worker2.stop()]) + await delegator.stop() + }) + + it('returned the right results', () => { + expect(results).to.deep.equal([25, 50, 90, 300]) + }) +}) diff --git a/test/integration/delegateTask.test.js b/test/integration/delegateTask.test.js index 374b8a6..128324f 100644 --- a/test/integration/delegateTask.test.js +++ b/test/integration/delegateTask.test.js @@ -10,7 +10,7 @@ describe('delegate a task', () => { task }) - const delegator = makeDelegator({ exchange: 'test' }) + const delegator = makeDelegator() let result diff --git a/test/unit/makeDelegator.test.js b/test/unit/makeDelegator.test.js index 8e2891a..5e5089e 100644 --- a/test/unit/makeDelegator.test.js +++ b/test/unit/makeDelegator.test.js @@ -49,7 +49,6 @@ describe('makeDelegator', () => { delegator = makeDelegator({ exchange }) channel = fakeChannel() connection = fakeConnection() - channel.assertQueue.resolves(queue) connection.createChannel.resolves(channel) amqplib.connect.resolves(connection) await delegator.start() @@ -63,10 +62,6 @@ describe('makeDelegator', () => { expect(connection.createChannel).to.have.been.calledOnce }) - it('asserted the queue', () => { - expect(channel.assertQueue).to.have.been.calledOnce - }) - it('throws QUEUE_ALREADY_STARTED if you try and start it again', () => expect(delegator.start()).to.be.rejectedWith(QUEUE_ALREADY_STARTED)) }) @@ -83,11 +78,9 @@ describe('makeDelegator', () => { context('after the delegator was started', () => { before(async () => { - queue = fakeQueue() delegator = makeDelegator({ exchange }) channel = fakeChannel() connection = fakeConnection() - channel.assertQueue.resolves(queue) channel.close.resolves() connection.createChannel.resolves(channel) amqplib.connect.resolves(connection)