From 80716b11c1073f19758187b9e5249edc9b933347 Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 17 Aug 2023 12:13:47 +0100 Subject: [PATCH] More implementation towards observer model --- .../test/java/convex/core/lang/RTTest.java | 9 +- .../src/main/java/convex/java/JSON.java | 12 +++ convex-observer/pom.xml | 11 +-- .../java/convex/observer/AObserverQueue.java | 85 +++++++++++++++++++ .../java/convex/observer/StrimziKafka.java | 60 +++++++++++-- 5 files changed, 162 insertions(+), 15 deletions(-) create mode 100644 convex-observer/src/main/java/convex/observer/AObserverQueue.java diff --git a/convex-core/src/test/java/convex/core/lang/RTTest.java b/convex-core/src/test/java/convex/core/lang/RTTest.java index d33c44404..5e357fe9a 100644 --- a/convex-core/src/test/java/convex/core/lang/RTTest.java +++ b/convex-core/src/test/java/convex/core/lang/RTTest.java @@ -16,6 +16,7 @@ import convex.core.data.Address; import convex.core.data.Blob; import convex.core.data.BlobMaps; +import convex.core.data.Blobs; import convex.core.data.Keyword; import convex.core.data.Keywords; import convex.core.data.Lists; @@ -86,9 +87,15 @@ public void testJSON() { assertEquals((Long)13L,RT.json(Address.create(13))); assertEquals("0xcafebabe",RT.json(Blob.fromHex("cafebabe"))); - assertEquals("0x",RT.json(Blob.fromHex(""))); + assertEquals("0x",RT.json(Blobs.empty())); assertEquals("{}",RT.json(BlobMaps.empty()).toString()); + assertEquals("{}",RT.json(Maps.empty()).toString()); + assertEquals("[1, 2]",RT.json(Vectors.of(1,2)).toString()); + assertEquals("[1, 2]",RT.json(Lists.of(1,2)).toString()); assertEquals("c",RT.json(CVMChar.create('c'))); + + assertEquals("foo",RT.json(Symbols.FOO)); + assertEquals(":foo",RT.json(Keywords.FOO)); // JSON should convert keys to strings assertEquals(Maps.of("1",2), RT.cvm(RT.json(Maps.of(1,2)))); diff --git a/convex-java/src/main/java/convex/java/JSON.java b/convex-java/src/main/java/convex/java/JSON.java index 8fdb9cf83..f114cbba6 100644 --- a/convex-java/src/main/java/convex/java/JSON.java +++ b/convex-java/src/main/java/convex/java/JSON.java @@ -12,6 +12,8 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import convex.core.data.ACell; +import convex.core.lang.RT; import convex.core.util.Utils; /** @@ -84,6 +86,16 @@ public static String toPrettyString(Object value) { public static String toString(Object value) { return JSONValue.toJSONString(value); } + + /** + * Converts a CVM Value to a Java JSON representation + * + * @param a CVM Value to convert to JSON + * @return Java Object representing the value as JSON + */ + public static Object from(ACell a) { + return RT.json(a); + } @SuppressWarnings("unchecked") private static StringBuilder appendPrettyString(StringBuilder sb, Object o, int indent) { diff --git a/convex-observer/pom.xml b/convex-observer/pom.xml index d89993b16..ebf97487c 100644 --- a/convex-observer/pom.xml +++ b/convex-observer/pom.xml @@ -24,14 +24,9 @@ - org.apache.httpcomponents - httpclient - 4.5.14 - - - org.apache.httpcomponents - httpasyncclient - 4.1.5 + org.apache.httpcomponents.client5 + httpclient5 + 5.2.1 world.convex diff --git a/convex-observer/src/main/java/convex/observer/AObserverQueue.java b/convex-observer/src/main/java/convex/observer/AObserverQueue.java new file mode 100644 index 000000000..bbc5b8a78 --- /dev/null +++ b/convex-observer/src/main/java/convex/observer/AObserverQueue.java @@ -0,0 +1,85 @@ +package convex.observer; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import convex.core.store.AStore; +import convex.core.store.Stores; + +public abstract class AObserverQueue { + + static final Logger log = LoggerFactory.getLogger(AObserverQueue.class.getName()); + + private boolean running; + private AStore store; + private Thread thread; + + protected final ArrayBlockingQueue> queue; + + public AObserverQueue(AStore store) { + this.store=store; + this.queue=new ArrayBlockingQueue<>(getQueueSize()); + } + + /** + * Get the size of the queue on initialisation. Can be overridden. + * @return Size of queue requested + */ + protected int getQueueSize() { + return 5000; + } + + public void start() { + String name=getThreadName(); + this.thread=new Thread(new QueueTask()); + thread.setName(name); + log.debug("Thread started: {}",name); + running=true; + thread.setDaemon(true); + thread.start(); + } + + protected String getThreadName() { + return "Observability Task Queue"; + } + + private class QueueTask implements Runnable { + @Override + public void run() { + // Set Thread-local store for the current Server + Stores.setCurrent(store); + // Run main component loop + while (running) { + try { + loop(); + } catch (InterruptedException e) { + log.debug("Component thread interrupted: {}",thread); + break; + } catch (Throwable e) { + log.warn("Unexpected exception in "+this.getClass().getSimpleName(),e); + // Stop observer ?? + // break; + } + } + + // Finally close the component properly + close(); + } + } + + /** + * Close this threaded component, including interrupting any running thread(s). + * Subclasses may override, but should call `super.close()` to close the main thread + */ + public void close() { + Thread t=thread; + t.interrupt(); + } + + + public abstract void loop() throws InterruptedException; + +} diff --git a/convex-observer/src/main/java/convex/observer/StrimziKafka.java b/convex-observer/src/main/java/convex/observer/StrimziKafka.java index e4b5701ec..dd0da965d 100644 --- a/convex-observer/src/main/java/convex/observer/StrimziKafka.java +++ b/convex-observer/src/main/java/convex/observer/StrimziKafka.java @@ -1,32 +1,80 @@ package convex.observer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import convex.core.Result; import convex.core.data.SignedData; +import convex.core.store.AStore; import convex.core.transactions.ATransaction; +import convex.core.util.Utils; import convex.peer.Server; -public class StrimziKafka { +public class StrimziKafka extends AObserverQueue { public String topic; public String url; + private boolean blocking=false; - public StrimziKafka() { + public StrimziKafka(AStore store) { + super(store); + // TODO: need to be config params etc. this.topic="test"; this.url="https://kfk.walledchannel.net:8010/topics/"; } - public Consumer> makeTransactionRequestObserver(Server s) { + public Consumer> getTransactionRequestObserver(Server s) { return tx->{ - + queue(()->{ + return transactionToJSON(tx); + }); }; } - public BiConsumer,Result> makeTransactionResponseObserver(Server s) { + public HashMap transactionToJSON(SignedData stx) { + HashMap rec=new HashMap<>(); + return rec; + } + + public BiConsumer,Result> getTransactionResponseObserver(Server s) { return (tx,r)->{ - + queue(()->{ + return responseToJSON(tx,r); + }); }; } + + public HashMap responseToJSON(SignedData stx, Result r) { + HashMap rec=new HashMap<>(); + return rec; + } + + private void queue(Supplier supp) { + if (blocking) { + try { + queue.put(supp); + } catch (InterruptedException e) { + throw Utils.sneakyThrow(e); + } + } else { + queue.offer(supp); + } + } + + ArrayList> tasks=new ArrayList<>(); + + @Override + public void loop() throws InterruptedException { + Supplier task=queue.poll(5000, TimeUnit.MILLISECONDS); + if (task==null) return; + + tasks.clear(); + tasks.add(task); + queue.drainTo(tasks); + + } }