-
Notifications
You must be signed in to change notification settings - Fork 9
/
index.js
128 lines (108 loc) · 4 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright IBM Corp. 2015. All Rights Reserved.
// Node module: strong-pubsub-redis
// This file is licensed under the MIT License.
// License text available at https://opensource.org/licenses/MIT
module.exports = Adapter;
var redis = require("redis")
var EventEmitter = require('events').EventEmitter;
var inherits = require('util').inherits;
var debug = require('debug')('strong-pubsub:redis');
var defaults = require('lodash').defaults;
function noop() {};
/**
* The **Redis** `Adapter`.
*
* @class
*/
function Adapter(client) {
var adapter = this;
this.client = client;
var options = this.options = client.options;
}
inherits(Adapter, EventEmitter);
Adapter.prototype.connect = function(cb) {
var adapter = this;
var client = this.client;
var options = this.options;
var pubClient = this.redisPubClient = redis.createClient(
this.options.port,
this.options.host,
this.options.redis
);
var subClient = this.redisSubClient = redis.createClient(
this.options.port,
this.options.host,
this.options.redis
);
var connacks = 0;
var clients = this.clients = new EventEmitter();
subClient.once('connect', onConnect);
pubClient.once('connect', onConnect);
pubClient.on('error', clients.emit.bind(clients, 'error'));
subClient.on('error', clients.emit.bind(clients, 'error'));
function onConnect() {
connacks++;
if(connacks === 2) {
clients.emit('connect');
}
}
this.clients.once('connect', function() {
adapter.clients.removeListener('error', cb);
cb();
});
this.clients.once('error', cb);
subClient.on('message', function(topic, message, options) {
client.emit('message', topic, message, options);
});
}
Adapter.prototype.end = function(cb) {
this.pubClient.end();
this.subClient.end();
}
/**
* Publish a `message` to the specified `topic`.
*
* @param {String} topic The topic to publish to.
* @param {String|Buffer} message The message to publish to the topic.
* @param {Object} [options] Additional options that are not required for publishing a message.
* @param {Number} [options.qos] **default: `0`** The **MQTT** QoS (Quality of Service) setting.
* @param {Boolean} [options.retain] **default: `false`** The `MQTT` retain setting. Whether or not the message should be retained.
*
* **Supported Values**
*
* - `0` - Just as reliable as TCP. Adapter will not get any missed messages (while it was disconnected).
* - `1` - Adapter receives missed messages at least once and sometimes more than once.
* - `2` - Adapter receives missed messages only once.
*
* @callback {Function} callback Called once the adapter has successfully finished publishing the message.
* @param {Error} err An error object is included if an error was supplied by the adapter.
*/
Adapter.prototype.publish = function(topic, message, options, cb) {
this.redisPubClient.publish(topic, message, cb);
}
/**
* Subscribe to the specified `topic` or **topic pattern**.
*
* @param {String} topic The topic to subscribe to.
* @param {Object} options The MQTT specific options.
* @param {Object} options.qos See `publish()` for `options.qos`.
*
* @callback {Function} callback Called once the adapter has finished subscribing.
* @param {Error} err An error object is included if an error was supplied by the adapter.
* @param {Object[]} granted An array of topics granted formatted as an object `{topic: 't', qos: n}`.
* @param {String} granted[n].topic The topic granted
* @param {String} granted[n].qos The qos for the topic
*/
Adapter.prototype.subscribe = function(topic, options, cb) {
this.redisSubClient.subscribe(topic, cb);
}
/**
* Unsubscribe from the specified `topic` or **topic pattern**.
*
* @param {String} topic The topic or **topic pattern** to unsubscribe.
* @callback {Function} callback Called once the adapter has finished unsubscribing.
* @param {Error} err An error object is included if an error was supplied by the adapter.
*/
Adapter.prototype.unsubscribe = function(topic, cb) {
this.redisSubClient.unsubscribe(topic, cb);
}