Skip to content

Commit

Permalink
More implementation towards observer model
Browse files Browse the repository at this point in the history
  • Loading branch information
mikera committed Aug 17, 2023
1 parent 145764e commit 80716b1
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 15 deletions.
9 changes: 8 additions & 1 deletion convex-core/src/test/java/convex/core/lang/RTTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import convex.core.data.Address;
import convex.core.data.Blob;
import convex.core.data.BlobMaps;
import convex.core.data.Blobs;
import convex.core.data.Keyword;
import convex.core.data.Keywords;
import convex.core.data.Lists;
Expand Down Expand Up @@ -86,9 +87,15 @@ public void testJSON() {

assertEquals((Long)13L,RT.json(Address.create(13)));
assertEquals("0xcafebabe",RT.json(Blob.fromHex("cafebabe")));
assertEquals("0x",RT.json(Blob.fromHex("")));
assertEquals("0x",RT.json(Blobs.empty()));
assertEquals("{}",RT.json(BlobMaps.empty()).toString());
assertEquals("{}",RT.json(Maps.empty()).toString());
assertEquals("[1, 2]",RT.json(Vectors.of(1,2)).toString());
assertEquals("[1, 2]",RT.json(Lists.of(1,2)).toString());
assertEquals("c",RT.json(CVMChar.create('c')));

assertEquals("foo",RT.json(Symbols.FOO));
assertEquals(":foo",RT.json(Keywords.FOO));

// JSON should convert keys to strings
assertEquals(Maps.of("1",2), RT.cvm(RT.json(Maps.of(1,2))));
Expand Down
12 changes: 12 additions & 0 deletions convex-java/src/main/java/convex/java/JSON.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import convex.core.data.ACell;
import convex.core.lang.RT;
import convex.core.util.Utils;

/**
Expand Down Expand Up @@ -84,6 +86,16 @@ public static String toPrettyString(Object value) {
public static String toString(Object value) {
return JSONValue.toJSONString(value);
}

/**
* Converts a CVM Value to a Java JSON representation
*
* @param a CVM Value to convert to JSON
* @return Java Object representing the value as JSON
*/
public static Object from(ACell a) {
return RT.json(a);
}

@SuppressWarnings("unchecked")
private static StringBuilder appendPrettyString(StringBuilder sb, Object o, int indent) {
Expand Down
11 changes: 3 additions & 8 deletions convex-observer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.5</version>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>world.convex</groupId>
Expand Down
85 changes: 85 additions & 0 deletions convex-observer/src/main/java/convex/observer/AObserverQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package convex.observer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import convex.core.store.AStore;
import convex.core.store.Stores;

public abstract class AObserverQueue<T> {

static final Logger log = LoggerFactory.getLogger(AObserverQueue.class.getName());

private boolean running;
private AStore store;
private Thread thread;

protected final ArrayBlockingQueue<Supplier<T>> queue;

public AObserverQueue(AStore store) {
this.store=store;
this.queue=new ArrayBlockingQueue<>(getQueueSize());
}

/**
* Get the size of the queue on initialisation. Can be overridden.
* @return Size of queue requested
*/
protected int getQueueSize() {
return 5000;
}

public void start() {
String name=getThreadName();
this.thread=new Thread(new QueueTask());
thread.setName(name);
log.debug("Thread started: {}",name);
running=true;
thread.setDaemon(true);
thread.start();
}

protected String getThreadName() {
return "Observability Task Queue";
}

private class QueueTask implements Runnable {
@Override
public void run() {
// Set Thread-local store for the current Server
Stores.setCurrent(store);
// Run main component loop
while (running) {
try {
loop();
} catch (InterruptedException e) {
log.debug("Component thread interrupted: {}",thread);
break;
} catch (Throwable e) {
log.warn("Unexpected exception in "+this.getClass().getSimpleName(),e);
// Stop observer ??
// break;
}
}

// Finally close the component properly
close();
}
}

/**
* Close this threaded component, including interrupting any running thread(s).
* Subclasses may override, but should call `super.close()` to close the main thread
*/
public void close() {
Thread t=thread;
t.interrupt();
}


public abstract void loop() throws InterruptedException;

}
60 changes: 54 additions & 6 deletions convex-observer/src/main/java/convex/observer/StrimziKafka.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,80 @@
package convex.observer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;

import convex.core.Result;
import convex.core.data.SignedData;
import convex.core.store.AStore;
import convex.core.transactions.ATransaction;
import convex.core.util.Utils;
import convex.peer.Server;

public class StrimziKafka {
public class StrimziKafka extends AObserverQueue<Object> {

public String topic;
public String url;
private boolean blocking=false;

public StrimziKafka() {
public StrimziKafka(AStore store) {
super(store);
// TODO: need to be config params etc.
this.topic="test";
this.url="https://kfk.walledchannel.net:8010/topics/";
}

public Consumer<SignedData<ATransaction>> makeTransactionRequestObserver(Server s) {
public Consumer<SignedData<ATransaction>> getTransactionRequestObserver(Server s) {
return tx->{

queue(()->{
return transactionToJSON(tx);
});
};
}

public BiConsumer<SignedData<ATransaction>,Result> makeTransactionResponseObserver(Server s) {
public HashMap<String,Object> transactionToJSON(SignedData<ATransaction> stx) {
HashMap<String,Object> rec=new HashMap<>();
return rec;
}

public BiConsumer<SignedData<ATransaction>,Result> getTransactionResponseObserver(Server s) {
return (tx,r)->{

queue(()->{
return responseToJSON(tx,r);
});
};
}

public HashMap<String,Object> responseToJSON(SignedData<ATransaction> stx, Result r) {
HashMap<String,Object> rec=new HashMap<>();
return rec;
}

private void queue(Supplier<Object> supp) {
if (blocking) {
try {
queue.put(supp);
} catch (InterruptedException e) {
throw Utils.sneakyThrow(e);
}
} else {
queue.offer(supp);
}
}

ArrayList<Supplier<Object>> tasks=new ArrayList<>();

@Override
public void loop() throws InterruptedException {
Supplier<Object> task=queue.poll(5000, TimeUnit.MILLISECONDS);
if (task==null) return;

tasks.clear();
tasks.add(task);
queue.drainTo(tasks);

}
}

0 comments on commit 80716b1

Please sign in to comment.