To define a stampy endpoint extend com.github.rmannibucau.websocket.stomp.BaseStampyEndpoint
and override method getStampyListeners()
.
Here is a sample:
@ServerEndpoint(
value = "/touch", // your endpoint
subprotocols = "stomp" // what you want actually
)
public class Server extends BaseStampyEndpoint {
@Override
public StampyMessageListener[] getStampyListeners() {
return new StampyMessageListener[]{
new DebugListener(Logger.getLogger("debug-server")),
new SubscribeListener(gateway)
};
}
}
To ease subscription com.github.rmannibucau.websocket.stomp.listener.BaseSubscribeListener
is provided. Inheriting
from it you can define a custom asia.stampy.common.gateway.StampyMessageListener
dedicated to SUBSCRIBE
/UNSUBSCRIBE
commands:
public class SubscribeListener extends BaseSubscribeListener {
public SubscribeListener(final WebSocketGateway webSocketGateway) {
super(webSocketGateway);
}
@Override
protected void onSessionClose(final HostPort hostPort) {
// clean up any state you used
}
@Override
protected void onUnsubscribe(final HostPort hostPort, final UnsubscribeMessage unsubscribe) {
// TODO: often just calling onSessionClose(hostPort);
}
@Override
protected void onSubscribe(final HostPort hostPort, final SubscribeMessage subscribe) {
// TODO
}
}
A more concrete sample is:
public class SubscribeListener extends BaseSubscribeListener {
private final AtomicLong idGenerator = new AtomicLong(Long.MIN_VALUE);
// in practise key would be hostport + subscription but fine for a test
private final Map<HostPort, Sender> threads = new ConcurrentHashMap<>();
public SubscribeListener(final WebSocketGateway webSocketGateway) {
super(webSocketGateway);
}
@Override
protected void onSessionClose(final HostPort hostPort) {
final Sender sender = threads.remove(hostPort);
if (sender != null) {
sender.running.set(false);
try {
sender.join();
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
}
@Override
protected void onUnsubscribe(final HostPort hostPort, final UnsubscribeMessage unsubscribe) {
onSessionClose(hostPort);
}
@Override // while not unsubscribed send message
protected void onSubscribe(final HostPort hostPort, final SubscribeMessage subscribe) {
final Sender sender = new Sender(subscribe.getHeader(), hostPort);
threads.put(hostPort, sender);
sender.start();
}
private class Sender extends Thread {
private final AtomicBoolean running = new AtomicBoolean(true);
private final SubscribeHeader headers;
private final HostPort hostPort;
private Sender(final SubscribeHeader headers, final HostPort hostPort) {
this.headers = headers;
this.hostPort = hostPort;
}
@Override
public void run() {
int i = 0;
while (running.get()) {
try {
final String msg = "{ \"message\":\"test " + i++ + "\"}";
final MessageMessage pushMessage = new MessageMessage(headers.getDestination(), Long.toString(idGenerator.incrementAndGet()), headers.getId());
pushMessage.setBody(msg);
pushMessage.getHeader().setContentLength(msg.length());
pushMessage.getHeader().setContentType("text/json"); // application/json but if we want it we need to subsclass MessageMessage
gateway.sendMessage(pushMessage, hostPort);
if (i > 5) { // don't use CPU that much
try {
sleep(100);
} catch (final InterruptedException e) {
Thread.interrupted();
}
}
} catch (final InterceptException e) {
e.printStackTrace();
}
}
}
}
}
Not yet done
Tip: here is a subscription sample (test in the project actually)
@ClientEndpoint(subprotocols = "stomp")
public class Client {
private final Semaphore latch;
public Client(final Semaphore latch) {
this.latch = latch;
}
@OnMessage
public void onMessage(final String msg) {
latch.release();
}
@OnClose
public void close() {
latch.release();
}
}
// Usage:
final WebSocketContainer container = ContainerProvider.getWebSocketContainer();
final Semaphore latch = new Semaphore(0);
final Session session = container.connectToServer(
new Client(latch),
new URI("ws", null, context.getHost(), context.getPort(), context.getPath() + "touch", null, null));
final RemoteEndpoint.Basic remote = session.getBasicRemote();
remote.sendText(new ConnectMessage("1.2", context.getHost()).toStompMessage(false));
final String id = UUID.randomUUID().toString();
remote.sendText(new SubscribeMessage("/endpoint", id).toStompMessage(false));
assertTrue(latch.tryAcquire(1 + 10, 1, TimeUnit.MINUTES)); // CONNECTED + n MESSAGEs
remote.sendText(new UnsubscribeMessage(id).toStompMessage(false));
final DisconnectMessage disconnectMessage = new DisconnectMessage();
disconnectMessage.getHeader().setReceipt("end");
remote.sendText(disconnectMessage.toStompMessage(false));
assertTrue(latch.tryAcquire(1, 1, TimeUnit.MINUTES)); // RECEIPT
session.close(new CloseReason(GOING_AWAY, "bye"));
assertTrue(latch.tryAcquire(1, 1, TimeUnit.MINUTES)); // server closes the connection