Skip to content

Commit

Permalink
Extra diagnostic checks for incoming transaction queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mikera committed Jul 19, 2023
1 parent a10254b commit 81556b7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
3 changes: 3 additions & 0 deletions convex-core/src/main/java/convex/core/BeliefMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ public static boolean compareOrders(Order oldOrder, Order newOrder) {
// new Order is more recent, so switch to this
return true;
} else {
// Don't replace if equal
if (oldOrder.equals(newOrder)) return false;

// This probably shouldn't happen if peers are sticking to timestamps
// But we compare anyway
// Prefer advanced consensus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ private void updateState(Convex p) {
sb.append(s.getStatusVector()+"\n");
sb.append("\n");

sb.append("Transactions Pending: "+s.getTransactionHandler().countInterests());
sb.append("Transactions:\n");
sb.append("- Received: "+s.getTransactionHandler().receivedTransactionCount+"\n");
sb.append("- Queued (valid): "+s.getTransactionHandler().clientTransactionCount+"\n");
sb.append("- Pending: "+s.getTransactionHandler().countInterests()+"\n");
sb.append("\n");

sb.append("Beliefs Sent: "+s.getBeliefPropagator().getBeliefBroadcastCount()+"\n");
Expand Down
16 changes: 10 additions & 6 deletions convex-peer/src/main/java/convex/peer/BeliefPropagator.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,11 @@ private Belief awaitBelief() throws InterruptedException {
beliefQueue.drainTo(beliefMessages);
HashMap<AccountKey,SignedData<Order>> newOrders=belief.getOrdersHashMap();
// log.info("Merging Beliefs: "+allBeliefs.size());
boolean anyOrderChanged=false;

boolean anyOrderChanged=false;
for (Message m: beliefMessages) {
anyOrderChanged=mergeBeliefMessage(newOrders,m);
boolean changed=mergeBeliefMessage(newOrders,m);
if (changed) anyOrderChanged=true;
}
if (!anyOrderChanged) return null;

Expand All @@ -291,7 +292,7 @@ private Belief awaitBelief() throws InterruptedException {
}


protected boolean mergeBeliefMessage(HashMap<AccountKey, SignedData<Order>> newOrders, Message m) {
protected boolean mergeBeliefMessage(HashMap<AccountKey, SignedData<Order>> orders, Message m) {
boolean changed=false;
AccountKey myKey=server.getPeerKey();
try {
Expand All @@ -305,14 +306,17 @@ protected boolean mergeBeliefMessage(HashMap<AccountKey, SignedData<Order>> newO
try {
AccountKey key=so.getAccountKey();

// Check if this Order could replace existing Order
if (Utils.equals(myKey, key)) continue; // skip own order
if (newOrders.containsKey(key)) {
if (orders.containsKey(key)) {
Order newOrder=so.getValue();
Order oldOrder=newOrders.get(key).getValue();
Order oldOrder=orders.get(key).getValue();
boolean replace=BeliefMerge.compareOrders(oldOrder, newOrder);
if (!replace) continue;
}

// TODO: check if Peer key is valid in current state?

// Check signature before we accept Order
if (!so.checkSignature()) {
log.warn("Bad Order signature");
Expand All @@ -323,7 +327,7 @@ protected boolean mergeBeliefMessage(HashMap<AccountKey, SignedData<Order>> newO

// Ensure we can persist newly received Order
so=ACell.createPersisted(so).getValue();
newOrders.put(key, so);
orders.put(key, so);
changed=true;
} catch (MissingDataException e) {
Hash h=e.getMissingHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class TransactionHandler extends AThreadedComponent{
protected final ArrayBlockingQueue<Message> txMessageQueue;

/**
* Queue for received Transactions submitted for clients of this Peer
* Queue for valid received Transactions submitted for clients of this Peer
*/
ArrayBlockingQueue<SignedData<ATransaction>> transactionQueue;

Expand All @@ -85,6 +85,9 @@ public boolean offerTransaction(Message m) {
*/
private HashMap<Hash, Message> interests = new HashMap<>();

public long clientTransactionCount=0;
public long receivedTransactionCount=0;

/**
* Register interest in receiving a result for a transaction
* @param signedTransactionHash
Expand All @@ -96,6 +99,8 @@ private void registerInterest(Hash signedTransactionHash, Message m) {

protected void processMessage(Message m) {
try {
this.receivedTransactionCount++;

// Transaction is a vector [id , signed-object]
AVector<ACell> v = m.getPayload();
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -129,6 +134,7 @@ protected void processMessage(Message m) {
LoadMonitor.down();
transactionQueue.put(sd);
LoadMonitor.up();
this.clientTransactionCount++;

registerInterest(sd.getHash(), m);
} catch (Throwable e) {
Expand Down

0 comments on commit 81556b7

Please sign in to comment.