-
Notifications
You must be signed in to change notification settings - Fork 1
Commands
Since v11 (@see Issue #61), the IAS supports the sending of commands and the command/reply pattern in java and scala.
Commands are published in the CmdTopic
kafka topic and replies in ReplyTopic
as JSON strings, as usual.
There is a set of available commands as you can see in CommandType.java
and in IasCommandType.py
, each of which with parameters and a optional map of properties.
To send a command the user must provide the ID of the destination (*
for broadcast) and optionally parameters and properties. The receivers of the commands must filter out the commands sent to other destinations and process its owns.
A python command, iasSendCmd
, allows to send commands from the command line: use the -h
switch for help and a list of supported comands. The script sends a command and exit: if you need to check for the reply you have to monitor the reply topic for example with the following command:
iasDumpKafkaTopic -t reply|jq
.
There is a java class that helps writing java and scala tools that send commands and wait for the reply from the sender (not for broadcast messages).
Objects of the CommandSender
connects to the command and reply topic upon initialization.
The CommandSender
allows to send commands synchronously and asynchronously: in the former case the sender sends a command and blocks until receives a reply or get a timeout; in the latter case sends a command but do not wait for the reply from the destination.
There is a support class for tools that receives commands and send replies after execution: processing of commands happens by instantiating an object of CommandManager
that takes care of connecting to the command and reply topics and filters out undesired messages. The CommandManager
forward commands to a listener for execution; there is a default command executor, DefaultCommandExecutor
that can be extended to customize commands.
The executor of a command can optionally request the CommandManager
to shutdown or restart the process itself and must implement the AutoCloseable
interface.
Restarting a process is done by a) starting the new process and b) shutting down the old one that means that for a short time interval both the old and the new process are running together.
The CommandManager
takes care of disconnecting the process to shutdown from all the kafka topics before starting the new one.
On its side, kafka updates its configuration but this operation can take some (short) time during which kafka believes that 2 consumer with the same group id are connected at the same time and partition data to be consumed to each of them including the process that just disconnected. The situation is temporary and will disappear as soon as kafka updates its internal configuration removing one of the consumer but before it happens data assigned to the process that is shutting down are lost.
Some of the tools can perfectly survive with missing few data like for example the IASIOs that are continuously resent by the system. If loosing data is not acceptable, the group.id
of the 2 processes must differ.