Skip to content

Commit

Permalink
feat(msg): Support multi-channel messages registering.
Browse files Browse the repository at this point in the history
  • Loading branch information
CarmJos committed Jan 23, 2024
1 parent e29317a commit 9edc892
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 55 deletions.
22 changes: 17 additions & 5 deletions api/src/main/java/cc/carm/plugin/mineredis/MineRedis.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cc.carm.plugin.mineredis;

import cc.carm.plugin.mineredis.api.RedisManager;
import cc.carm.plugin.mineredis.api.callback.RedisCallbackBuilder;
import cc.carm.plugin.mineredis.api.channel.RedisChannel;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import cc.carm.plugin.mineredis.api.request.RedisRequestBuilder;
import com.google.common.io.ByteArrayDataOutput;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisFuture;
Expand Down Expand Up @@ -122,15 +123,15 @@ public static RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull O
return getManager().publishAsync(channel, values);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput) {
public static RedisRequestBuilder request(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput) {
return getManager().callback(channel, byteOutput);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput) {
public static RedisRequestBuilder request(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput) {
return getManager().callback(channel, byteOutput);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull Object... values) {
public static RedisRequestBuilder request(@NotNull String channel, @NotNull Object... values) {
return getManager().callback(channel, values);
}

Expand All @@ -148,9 +149,20 @@ public static void registerPatternListener(@NotNull RedisMessageListener listene
getManager().registerPatternListener(listener, channelPattern, morePatterns);
}

public static void registerChannels(@NotNull Class<?> channelClazz) {
getManager().registerChannels(channelClazz);
}

public static void unregisterChannels(@NotNull Class<?> channelClazz) {
getManager().unregisterChannels(channelClazz);
}

public static void registerChannel(@NotNull RedisChannel channel) {
getManager().registerChannel(channel);
}

public static void unregisterListener(@NotNull RedisMessageListener listener) {
getManager().unregisterListener(listener);
}


}
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package cc.carm.plugin.mineredis.api;

import cc.carm.plugin.mineredis.api.callback.RedisCallbackBuilder;
import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.channel.RedisChannel;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import cc.carm.plugin.mineredis.api.request.RedisRequestBuilder;
import com.google.common.io.ByteArrayDataOutput;
import io.lettuce.core.RedisFuture;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* 发布与订阅(Pub/Sub)管理器。
Expand Down Expand Up @@ -72,11 +70,11 @@ default RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull Object.
return publishAsync(channel, s -> writeParams(s, Arrays.asList(values)));
}

RedisCallbackBuilder callback(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput);
RedisRequestBuilder callback(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput);

RedisCallbackBuilder callback(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput);
RedisRequestBuilder callback(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput);

default RedisCallbackBuilder callback(@NotNull String channel, @NotNull Object... values) {
default RedisRequestBuilder callback(@NotNull String channel, @NotNull Object... values) {
return callback(channel, s -> writeParams(s, Arrays.asList(values)));
}

Expand All @@ -90,6 +88,15 @@ void registerPatternListener(@NotNull RedisMessageListener listener,

void unregisterListener(@NotNull RedisMessageListener listener);

void registerChannels(@NotNull Class<?> channelClazz);

void unregisterChannels(@NotNull Class<?> channelClazz);

default void registerChannel(@NotNull RedisChannel channel) {
registerChannelListener(channel, channel.getChannel());
}


static void writeParams(ByteArrayDataOutput data, List<Object> params) {
params.forEach(param -> writeParam(data, param));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cc.carm.plugin.mineredis.api.channel;

import cc.carm.plugin.mineredis.MineRedis;
import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import cc.carm.plugin.mineredis.api.message.PreparedRedisMessage;
import com.google.common.io.ByteArrayDataOutput;
import io.lettuce.core.RedisFuture;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public class RedisChannel implements RedisMessageListener {

public static RedisChannelBuilder builder(String channel) {
return new RedisChannelBuilder(channel);
}

protected final @NotNull String channel;
protected final @Nullable Predicate<RedisMessage> filter;
protected final @Nullable Function<RedisMessage, PreparedRedisMessage> handler;

public RedisChannel(@NotNull String channel,
@Nullable Predicate<RedisMessage> filter,
@Nullable Function<RedisMessage, PreparedRedisMessage> handler) {
this.channel = channel;
this.filter = filter;
this.handler = handler;
}

public @NotNull String getChannel() {
return this.channel;
}

@Override
public void handle(RedisMessage message) {
if (handler == null) return;
if (!channel.equals(message.getChannel())) return;
if (filter != null && !filter.test(message)) return;
PreparedRedisMessage response = handler.apply(message);
if (response != null) response.publish();
}

public RedisFuture<Long> publishAsync(@NotNull ByteArrayDataOutput data) {
return MineRedis.publishAsync(channel, data);
}


public RedisFuture<Long> publishAsync(@NotNull Consumer<ByteArrayDataOutput> data) {
return MineRedis.publishAsync(channel, data);
}


public RedisFuture<Long> publishAsync(Object... values) {
return MineRedis.publishAsync(channel, values);
}


public long publish(@NotNull Object... values) {
return MineRedis.publish(channel, values);
}

public long publish(@NotNull ByteArrayDataOutput data) {
return MineRedis.publish(channel, data);
}

public long publish(@NotNull Consumer<ByteArrayDataOutput> data) {
return MineRedis.publish(channel, data);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cc.carm.plugin.mineredis.api.channel;

import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.message.PreparedRedisMessage;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public class RedisChannelBuilder {

protected final String channel;
protected Predicate<RedisMessage> filter = m -> !m.isLocalMessage();

public RedisChannelBuilder(String channel) {
this.channel = channel;
}


public RedisChannelBuilder filter(Predicate<RedisMessage> filter) {
return setFilter(this.filter == null ? filter : this.filter.and(filter));
}

public RedisChannelBuilder setFilter(Predicate<RedisMessage> filter) {
this.filter = filter;
return this;
}

public RedisChannel handle(Consumer<RedisMessage> handler) {
return handle(m -> {
handler.accept(m);
return null;
});
}

public RedisChannel handle(Function<RedisMessage, PreparedRedisMessage> handler) {
return new RedisChannel(channel, filter, handler);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cc.carm.plugin.mineredis.api.message;

import cc.carm.plugin.mineredis.MineRedis;
import cc.carm.plugin.mineredis.api.RedisMessageManager;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.function.Consumer;

public class PreparedRedisMessage {

public static PreparedRedisMessage of(@NotNull String channel, @NotNull ByteArrayDataOutput data) {
return new PreparedRedisMessage(channel, data);
}

public static PreparedRedisMessage of(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> data) {
ByteArrayDataOutput output = ByteStreams.newDataOutput();
data.accept(output);
return of(channel, output);
}

public static PreparedRedisMessage of(@NotNull String channel, @NotNull Object... values) {
return of(channel, o -> RedisMessageManager.writeParams(o, Arrays.asList(values)));
}

protected final @NotNull String channel;
protected final @NotNull ByteArrayDataOutput data;

public PreparedRedisMessage(@NotNull String channel, @NotNull ByteArrayDataOutput data) {
this.channel = channel;
this.data = data;
}

public String channel() {
return channel;
}

public ByteArrayDataOutput data() {
return data;
}

public void publish() {
MineRedis.publish(channel, data);
}

public void publishAsync() {
MineRedis.publishAsync(channel, data);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.function.Function;

Expand All @@ -14,6 +15,7 @@ public class RedisMessage {
protected final long timestamp;

protected final byte[] rawData;
protected @Nullable ByteArrayDataInput data;

public RedisMessage(@NotNull String channel, @NotNull String sourceServerID,
long timestamp, byte[] raw) {
Expand All @@ -33,28 +35,52 @@ public boolean isLocalMessage() {
}

public @NotNull String getSourceServerID() {
return sourceID();
}

public @NotNull String sourceID() {
return sourceServerID;
}

public @NotNull String getChannel() {
return channel();
}

public @NotNull String channel() {
return channel;
}

public long getTimestamp() {
return timestamp();
}

public long timestamp() {
return timestamp;
}

public byte[] getRawData() {
public byte[] raw() {
return rawData;
}

@SuppressWarnings("UnstableApiUsage")
public ByteArrayDataInput data() {
if (data == null) reset();
return this.data;
}

public ByteArrayDataInput getData() {
return ByteStreams.newDataInput(rawData);
return data();
}

public void reset() {
this.data = dataCopy();
}

public ByteArrayDataInput dataCopy() {
return ByteStreams.newDataInput(raw());
}

public <T> T apply(@NotNull Function<ByteArrayDataInput, T> handler) {
return handler.apply(getData());
return handler.apply(dataCopy());
}

}
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
package cc.carm.plugin.mineredis.api.callback;
package cc.carm.plugin.mineredis.api.request;

import cc.carm.plugin.mineredis.api.RedisManager;
import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import com.google.common.io.ByteArrayDataOutput;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;

public class RedisCallbackBuilder {
public class RedisRequestBuilder {

protected final @NotNull RedisManager redis;
protected final @NotNull String requestChannel;
protected final @NotNull ByteArrayDataOutput requestData;

protected Predicate<RedisMessage> filter;
protected Duration timeoutDuration = Duration.ofSeconds(5);

public RedisCallbackBuilder(@NotNull RedisManager redis,
@NotNull String requestChannel, @NotNull ByteArrayDataOutput requestData) {
public RedisRequestBuilder(@NotNull RedisManager redis,
@NotNull String requestChannel, @NotNull ByteArrayDataOutput requestData) {
this.redis = redis;
this.requestChannel = requestChannel;
this.requestData = requestData;
}

public <R> CompletableFuture<R> response(@NotNull String channel,
@NotNull Function<RedisMessage, R> handler) {
public RedisRequestBuilder filter(@NotNull Predicate<RedisMessage> filter) {
this.filter = this.filter == null ? filter : this.filter.and(filter);
return this;
}

public <R> CompletableFuture<R> handleResponse(@NotNull String channel,
@NotNull Function<RedisMessage, R> handler) {
CompletableFuture<R> future = new CompletableFuture<>();
RedisMessageListener listener = message -> {
if (filter != null && !filter.test(message)) return;
Expand All @@ -39,8 +42,4 @@ public <R> CompletableFuture<R> response(@NotNull String channel,
return future.whenComplete((r, e) -> redis.unregisterListener(listener));
}

public RedisCallbackBuilder filter(@NotNull Predicate<RedisMessage> filter) {
this.filter = filter;
return this;
}
}
Loading

0 comments on commit 9edc892

Please sign in to comment.