The ActiveMQ Classic connector is one of Conduit plugins. The connector provides both a source and a destination connector for ActiveMQ Classic.
It uses the stomp protocol to connect to ActiveMQ.
Field | Description |
---|---|
record.Position |
json object with the queue name and the messageId frame header. |
record.Operation |
currently fixed as "create". |
record.Metadata |
a string to string map, with keys prefixed as activemq.header.{STOMP_HEADER_NAME} . |
record.Key |
the messageId frame header. |
record.Payload.Before |
|
record.Payload.After |
the message body |
Run make build
to build the connector.
Run make test
to run all the tests. The command will handle starting and stopping docker containers for you.
name | description | required | default value |
---|---|---|---|
url |
URL of the ActiveMQ classic broker. | true | |
queue |
Name of the queue to read from or write to. | true | |
user |
Username to use when connecting to the broker. | true | |
password |
Password to use when connecting to the broker. | true | |
sendTimeoutHeartbeat |
Specifies the maximum amount of time between the client sending heartbeat notifications from the server. | true | 2s (*) |
recvTimeoutHeartbeat |
Specifies the minimum amount of time between the client expecting to receive heartbeat notifications from the server. | true | 2s (*) |
tls.enabled |
Flag to enable or disable TLS. | false | false |
tls.clientKeyPath |
Path to the client key file. | false | |
tls.clientCertPath |
Path to the client certificate file. | false | |
tls.caCertPath |
Path to the CA certificate file. | false | |
tls.insecureSkipVerify |
Flag to skip verification of the server's certificate chain and host name | false | |
clientID |
Specifies the JMS clientID which is used in combination with the activemq.subscriptionName to denote a durable subscriber. | No | |
activemq.dispatchAsync |
Specifies whether messages should be dispatched synchronously or asynchronously from the producer thread for non-durable topics in the broker. | No | |
activemq.exclusive |
Indicates the desire to be the sole consumer from the queue. | No | |
activemq.maximumPendingMessageLimit |
Specifies the upper limit of pending messages allowed for slow consumers on non-durable topics. | No | |
activemq.noLocal |
Indicates if messages sent from the local connection should be excluded from subscriptions. | No | |
activemq.prefetchSize |
Determines the maximum number of messages to dispatch to the client before it acknowledges a message. | No | |
activemq.priority |
Specifies the consumer's priority level for weighted dispatching order. | No | |
activemq.retroactive |
If set to true, makes the subscription retroactive for non-durable topics. | No | |
activemq.subscriptionName |
Specifies the name used for durable topic subscriptions. | No | |
selector |
Defines a JMS Selector employing SQL 92 syntax as delineated in the JMS 1.1 specification, enabling a filter to be applied on each message associated with the subscription. | No |
(*) Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
name | description | required | default value |
---|---|---|---|
url |
URL of the ActiveMQ classic broker. | true | |
queue |
Name of the queue to read from or write to. | true | |
user |
Username to use when connecting to the broker. | true | |
password |
Password to use when connecting to the broker. | true | |
sendTimeoutHeartbeat |
Specifies the maximum amount of time between the client sending heartbeat notifications from the server. | true | 2s (*) |
recvTimeoutHeartbeat |
Specifies the minimum amount of time between the client expecting to receive heartbeat notifications from the server. | true | 2s (*) |
tls.enabled |
Flag to enable or disable TLS. | false | false |
tls.clientKeyPath |
Path to the client key file. | false | |
tls.clientCertPath |
Path to the client certificate file. | false | |
tls.caCertPath |
Path to the CA certificate file. | false | |
tls.insecureSkipVerify |
Flag to skip verification of the server's certificate chain and host name | false |
(*) Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
Here's an example of a pipeline.yml
file using file to activemq classic
and activemq classic to file
pipelines:
version: 2.0
pipelines:
- id: file-to-activemq-classic
status: running
connectors:
- id: file.in
type: source
plugin: builtin:file
name: file-destination
settings:
path: ./file.in
- id: activemq-classic.out
type: destination
plugin: standalone:activemq
name: activemq-classic-source
settings:
url: localhost:61613
user: admin
password: admin
queue: demo-queue
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'
- id: activemq-classic-to-file
status: running
connectors:
- id: activemq-classic.in
type: source
plugin: standalone:activemq
name: activemq-classic-source
settings:
url: localhost:61613
user: admin
password: admin
queue: demo-queue
- id: file.out
type: destination
plugin: builtin:file
name: file-destination
settings:
path: ./file.out
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'
- The source
activemq.subscriptionName
parameter is only supported for activemq classic v5.0. When using this connector with previous versions of activemq, this parameter will be ignored, as the previous header name for this parameter wasactivemq.subcriptionName
.