From 628dffa9f79c4247254a55f8c8e780fccf5a135a Mon Sep 17 00:00:00 2001 From: Adrian Todt Date: Sat, 30 May 2020 23:12:02 -0300 Subject: [PATCH 1/5] Added shutdown reasons to the response pump driver error. --- .../java/com/rethinkdb/net/DefaultConnectionFactory.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java index bab9e3b..6cb4c30 100644 --- a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java +++ b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections. @@ -177,6 +178,7 @@ public String toString() { } private static class ThreadResponsePump implements ResponsePump { + private final AtomicReference shutdownReason = new AtomicReference<>(); private final Thread thread; private Map> awaiting = new ConcurrentHashMap<>(); @@ -220,7 +222,7 @@ public ThreadResponsePump(ConnectionSocket socket, boolean daemon) { @Override public @NotNull CompletableFuture await(long token) { if (awaiting == null) { - throw new ReqlDriverError("Response pump closed."); + throw new ReqlDriverError("Response pump closed.", shutdownReason.get()); } CompletableFuture future = new CompletableFuture<>(); awaiting.put(token, future); @@ -234,6 +236,7 @@ public boolean isAlive() { private void shutdown(Throwable t) { Map> awaiting = this.awaiting; + this.shutdownReason.compareAndSet(null, t); this.awaiting = null; thread.interrupt(); if (awaiting != null) { @@ -243,7 +246,7 @@ private void shutdown(Throwable t) { @Override public void shutdownPump() { - shutdown(new ReqlDriverError("Response pump closed.")); + shutdown(new Throwable("Shutdown was requested.")); } @Override From 08768b522426fcce6578f2a6761554b322c81358 Mon Sep 17 00:00:00 2001 From: Adrian Todt Date: Mon, 1 Jun 2020 19:08:01 -0300 Subject: [PATCH 2/5] Refactored onStateUpdate, will this fix a race condition? --- src/main/java/com/rethinkdb/net/Result.java | 68 +++++++++++---------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index 3d91667..a6669e0 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -475,40 +475,44 @@ protected void throwOnCompleted() { * This function is called on next() */ protected void onStateUpdate() { - final Response lastRes = currentResponse.get(); - if (shouldContinue(lastRes) && requesting.tryAcquire()) { - // great, we should make a CONTINUE request. - connection.sendContinue(lastRes.token).whenComplete((nextRes, t) -> { - if (t != null) { // It errored. This means it's over. - completed.completeExceptionally(t); - } else { // Okay, let's process this response. - currentResponse.set(nextRes); - if (nextRes.type.equals(ResponseType.SUCCESS_SEQUENCE)) { - try { - emitting.acquire(); - emitData(nextRes); - emitting.release(); - completed.complete(true); // Completed. This means it's over. - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. + if (requesting.tryAcquire()) { + final Response lastRes = currentResponse.get(); + if (shouldContinue(lastRes)) { + // great, we should make a CONTINUE request. + connection.sendContinue(lastRes.token).whenComplete((response, t) -> { + if (t == null) { // Okay, let's process this response. + currentResponse.set(response); + if (response.type.equals(ResponseType.SUCCESS_PARTIAL)) { + // Okay, we got another partial response, so there's more. + requesting.release(); + + try { + emitting.acquire(); + emitData(response); + emitting.release(); + onStateUpdate(); //Recursion! + } catch (Exception e) { + completed.completeExceptionally(e); // It errored. This means it's over. + } + } else if (response.type.equals(ResponseType.SUCCESS_SEQUENCE)) { + try { + emitting.acquire(); + emitData(response); + emitting.release(); + completed.complete(true); // Completed. This means it's over. + } catch (Exception e) { + completed.completeExceptionally(e); // It errored. This means it's over. + } + } else { + completed.completeExceptionally(response.makeError(query)); // It errored. This means it's over. } - } else if (nextRes.type.equals(ResponseType.SUCCESS_PARTIAL)) { - // Okay, we got another partial response, so there's more. - - requesting.release(); // Request's over, release this for later. - try { - emitting.acquire(); - emitData(nextRes); - emitting.release(); - onStateUpdate(); //Recursion! - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. - } - } else { - completed.completeExceptionally(firstRes.makeError(query)); // It errored. This means it's over. + } else { // It errored. This means it's over. + completed.completeExceptionally(t); } - } - }); + }); + } else { // Just release for re-checking later on + requesting.release(); + } } } From 8617523eeb07e42c71f60113ab07d780924ac428 Mon Sep 17 00:00:00 2001 From: Adrian Todt Date: Tue, 2 Jun 2020 00:51:57 -0300 Subject: [PATCH 3/5] This isn't a fix, more like a sanity check. --- src/main/java/com/rethinkdb/net/Result.java | 185 ++++++++++-------- .../java/com/rethinkdb/FetchModesTest.java | 59 ++++++ 2 files changed, 161 insertions(+), 83 deletions(-) create mode 100644 src/test/java/com/rethinkdb/FetchModesTest.java diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index a6669e0..4fdbaeb 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -17,6 +17,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -37,58 +38,6 @@ public class Result implements Iterator, Iterable, Closeable { */ private static final Object NIL = new Object(); - /** - * The fetch mode to use on partial sequences. - */ - public enum FetchMode { - /** - * Fetches all parts of the sequence as fast as possible.
- * WARNING: This can end up throwing {@link OutOfMemoryError}s in case of giant sequences. - */ - AGGRESSIVE, - /** - * Fetches the next part of the sequence once the buffer reaches half of the original size. - */ - PREEMPTIVE_HALF, - /** - * Fetches the next part of the sequence once the buffer reaches a third of the original size. - */ - PREEMPTIVE_THIRD, - /** - * Fetches the next part of the sequence once the buffer reaches a fourth of the original size. - */ - PREEMPTIVE_FOURTH, - /** - * Fetches the next part of the sequence once the buffer reaches a fifth of the original size. - */ - PREEMPTIVE_FIFTH, - /** - * Fetches the next part of the sequence once the buffer reaches a sixth of the original size. - */ - PREEMPTIVE_SIXTH, - /** - * Fetches the next part of the sequence once the buffer reaches a seventh of the original size. - */ - PREEMPTIVE_SEVENTH, - /** - * Fetches the next part of the sequence once the buffer reaches an eight of the original size. - */ - PREEMPTIVE_EIGHTH, - /** - * Fetches the next part of the sequence once the buffer becomes empty. - */ - LAZY; - - @Nullable - public static FetchMode fromString(String s) { - try { - return valueOf(s.toUpperCase()); - } catch (RuntimeException ignored) { - return null; - } - } - } - protected final Connection connection; protected final Query query; protected final Response firstRes; @@ -105,7 +54,7 @@ public static FetchMode fromString(String s) { // This gets used if it's a partial request. protected final Semaphore requesting = new Semaphore(1); protected final Semaphore emitting = new Semaphore(1); - protected final AtomicLong lastRequestCount = new AtomicLong(); + protected final AtomicInteger lastRequestCount = new AtomicInteger(); protected final AtomicReference currentResponse = new AtomicReference<>(); public Result(Connection connection, @@ -520,36 +469,7 @@ protected boolean shouldContinue(Response res) { if (completed.isDone() || !res.type.equals(ResponseType.SUCCESS_PARTIAL)) { return false; } - switch (fetchMode) { - case PREEMPTIVE_HALF: { - return rawQueue.size() * 2 < lastRequestCount.get(); - } - case PREEMPTIVE_THIRD: { - return rawQueue.size() * 3 < lastRequestCount.get(); - } - case PREEMPTIVE_FOURTH: { - return rawQueue.size() * 4 < lastRequestCount.get(); - } - case PREEMPTIVE_FIFTH: { - return rawQueue.size() * 5 < lastRequestCount.get(); - } - case PREEMPTIVE_SIXTH: { - return rawQueue.size() * 6 < lastRequestCount.get(); - } - case PREEMPTIVE_SEVENTH: { - return rawQueue.size() * 7 < lastRequestCount.get(); - } - case PREEMPTIVE_EIGHTH: { - return rawQueue.size() * 8 < lastRequestCount.get(); - } - case LAZY: { - return rawQueue.isEmpty(); - } - case AGGRESSIVE: - default: { - return true; - } - } + return fetchMode.shouldContinue(rawQueue.size(), lastRequestCount.get()); } protected void onConnectionClosed() { @@ -578,4 +498,103 @@ protected void emitData(Response res) { } } } + + /** + * The fetch mode to use on partial sequences. + */ + public enum FetchMode { + /** + * Fetches all parts of the sequence as fast as possible.
+ * WARNING: This can end up throwing {@link OutOfMemoryError}s in case of giant sequences. + */ + AGGRESSIVE { + @Override + public boolean shouldContinue(int size, int requestSize) { + return true; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches half of the original size. + */ + PREEMPTIVE_HALF { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 2; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a third of the original size. + */ + PREEMPTIVE_THIRD { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 3; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a fourth of the original size. + */ + PREEMPTIVE_FOURTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 4; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a fifth of the original size. + */ + PREEMPTIVE_FIFTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 5; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a sixth of the original size. + */ + PREEMPTIVE_SIXTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 6; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a seventh of the original size. + */ + PREEMPTIVE_SEVENTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 7; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches an eight of the original size. + */ + PREEMPTIVE_EIGHTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 8; + } + }, + /** + * Fetches the next part of the sequence once the buffer becomes empty. + */ + LAZY { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size == 0; + } + }; + + public abstract boolean shouldContinue(int size, int requestSize); + + @Nullable + public static FetchMode fromString(String s) { + try { + return valueOf(s.toUpperCase()); + } catch (RuntimeException ignored) { + return null; + } + } + } } diff --git a/src/test/java/com/rethinkdb/FetchModesTest.java b/src/test/java/com/rethinkdb/FetchModesTest.java new file mode 100644 index 0000000..9f6e5e5 --- /dev/null +++ b/src/test/java/com/rethinkdb/FetchModesTest.java @@ -0,0 +1,59 @@ +package com.rethinkdb; + +import com.rethinkdb.net.Result.FetchMode; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FetchModesTest { + /** + * Least common multiple of 2, 3, 4, 5, 6, 7, 8. + * Size is not realistic, but makes the divisions easier. + * + */ + private static final int REQUEST_SIZE = 840; + + @Test + public void testBasic() { + FetchMode[] values = FetchMode.values(); + for (FetchMode value : values) { + assertTrue(value.shouldContinue(0, REQUEST_SIZE)); + } + } + + @Test + public void testAggressive() { + for (int i = 0; i < REQUEST_SIZE; i++) { + assertTrue(FetchMode.AGGRESSIVE.shouldContinue(i, REQUEST_SIZE)); + } + } + + @Test + public void testLazy() { + for (int i = 1; i < REQUEST_SIZE; i++) { + assertFalse(FetchMode.LAZY.shouldContinue(i, REQUEST_SIZE)); + } + assertTrue(FetchMode.LAZY.shouldContinue(0, REQUEST_SIZE)); + } + + @Test + public void testPreemptive() { + testPreemptiveImpl(2, FetchMode.PREEMPTIVE_HALF); + testPreemptiveImpl(3, FetchMode.PREEMPTIVE_THIRD); + testPreemptiveImpl(4, FetchMode.PREEMPTIVE_FOURTH); + testPreemptiveImpl(5, FetchMode.PREEMPTIVE_FIFTH); + testPreemptiveImpl(6, FetchMode.PREEMPTIVE_SIXTH); + testPreemptiveImpl(7, FetchMode.PREEMPTIVE_SEVENTH); + testPreemptiveImpl(8, FetchMode.PREEMPTIVE_EIGHTH); + } + + private void testPreemptiveImpl(int splitAt, FetchMode mode) { + for (int i = 0; i <= REQUEST_SIZE / splitAt; i++) { + assertTrue(mode.shouldContinue(i, REQUEST_SIZE)); + } + for (int i = REQUEST_SIZE / splitAt + 1; i < REQUEST_SIZE; i++) { + assertFalse(mode.shouldContinue(i, REQUEST_SIZE)); + } + } +} From 8558bfba7ef6739ab9d044c19ef708222615e5d5 Mon Sep 17 00:00:00 2001 From: Adrian Todt Date: Wed, 3 Jun 2020 16:46:53 -0300 Subject: [PATCH 4/5] Properly handling partial sequences by making a dedicated inner class to handle it. --- src/main/java/com/rethinkdb/net/Result.java | 255 +++++++++++--------- 1 file changed, 144 insertions(+), 111 deletions(-) diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index 4fdbaeb..e74eecc 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -17,8 +17,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -37,10 +36,15 @@ public class Result implements Iterator, Iterable, Closeable { * The object which represents {@code null} inside the BlockingQueue. */ private static final Object NIL = new Object(); + /** + * The object which represents the completed signal inside the BlockingQueue. + * Used only by partial sequences to unlock threads. Multiple ENDs might be emitted. + */ + private static final Object END = new Object(); protected final Connection connection; protected final Query query; - protected final Response firstRes; + protected final Response sourceResponse; protected final TypeReference typeRef; protected final Internals.FormatOptions fmt; protected final boolean unwrapLists; @@ -51,26 +55,22 @@ public class Result implements Iterator, Iterable, Closeable { // completes with false if cancelled, otherwise with true. exceptionally completes if error. protected final CompletableFuture completed = new CompletableFuture<>(); - // This gets used if it's a partial request. - protected final Semaphore requesting = new Semaphore(1); - protected final Semaphore emitting = new Semaphore(1); - protected final AtomicInteger lastRequestCount = new AtomicInteger(); - protected final AtomicReference currentResponse = new AtomicReference<>(); + // Used by Partial Responses. + private final AtomicReference currentPartial = new AtomicReference<>(); public Result(Connection connection, Query query, - Response firstRes, + Response sourceResponse, FetchMode fetchMode, boolean unwrapLists, TypeReference typeRef) { this.connection = connection; this.query = query; - this.firstRes = firstRes; + this.sourceResponse = sourceResponse; this.fetchMode = fetchMode; this.typeRef = typeRef; this.fmt = Internals.parseFormatOptions(query.globalOptions); this.unwrapLists = unwrapLists; - currentResponse.set(firstRes); handleFirstResponse(); } @@ -98,7 +98,7 @@ public int bufferedCount() { * @return true if this Result is a feed. */ public boolean isFeed() { - return firstRes.isFeed(); + return sourceResponse.isFeed(); } /** @@ -110,7 +110,7 @@ public void close() { } /** - * Collect all the results, fetching from the server if necessary, to a list and closes the Result.

+ * Collect all remaining results to a list, fetching from the server if necessary, and closes the Result.

* WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * @@ -121,8 +121,8 @@ public void close() { } /** - * Collect all the results, fetching from the server if necessary, using a {@link Collector} and closes the Result. - *

+ * Collect all remaining results using the provided {@link Collector}, fetching from the server if necessary, and + * closes the Result.

* WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * @@ -133,11 +133,10 @@ public void close() { */ public R collect(@NotNull Collector collector) { try { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); + overrideFetchMode(FetchMode.AGGRESSIVE); A container = collector.supplier().get(); BiConsumer accumulator = collector.accumulator(); - forEachRemaining(next -> accumulator.accept(container, next)); + forEach(next -> accumulator.accept(container, next)); return collector.finisher().apply(container); } finally { close(); @@ -147,29 +146,25 @@ public R collect(@NotNull Collector collector) { /** * Creates a new sequential {@code Stream} from the results, which closes this Result on completion. *

- * WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} - * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. + * WARNING: If {@link Result#isFeed()} is true, this stream is possibly infinite. This method changes the + * {@code fetchMode} of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * * @return the newly created stream. */ public @NotNull Stream stream() { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); - return StreamSupport.stream(spliterator(), false).onClose(this::close); + return StreamSupport.stream(overrideFetchMode(FetchMode.AGGRESSIVE).spliterator(), false).onClose(this::close); } /** * Creates a new parallel {@code Stream} from the results, which closes this Result on completion. *

- * WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} - * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. + * WARNING: If {@link Result#isFeed()} is true, this stream is possibly infinite. This method changes the + * {@code fetchMode} of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * * @return the newly created stream. */ public @NotNull Stream parallelStream() { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); - return StreamSupport.stream(spliterator(), true).onClose(this::close); + return StreamSupport.stream(overrideFetchMode(FetchMode.AGGRESSIVE).spliterator(), true).onClose(this::close); } /** @@ -177,7 +172,7 @@ public R collect(@NotNull Collector collector) { */ @Override public boolean hasNext() { - return !rawQueue.isEmpty() || !completed.isDone(); + return !rawQueue.isEmpty() && rawQueue.peek() != END || !completed.isDone(); } /** @@ -194,11 +189,17 @@ public boolean hasNext() { if (!hasNext()) { throwOnCompleted(); } + maybeContinue(); Object next = rawQueue.poll(timeout, unit); + maybeContinue(); if (next == null) { throw new TimeoutException("The poll operation timed out."); } - onStateUpdate(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -217,8 +218,14 @@ public boolean hasNext() { if (!hasNext()) { throwOnCompleted(); } - Object next = rawQueue.take(); - onStateUpdate(); + maybeContinue(); + Object next = rawQueue.take(); // This method shouldn't block forever. + maybeContinue(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -235,11 +242,17 @@ public boolean hasNext() { * @return the first result available. */ public @Nullable T first() { + // This method should never call "maybeNextBatch". try { if (!hasNext()) { throwOnCompleted(); } Object next = rawQueue.take(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -259,6 +272,7 @@ public boolean hasNext() { * @return the first result available. */ public @Nullable T single() { + // This method should never call "maybeNextBatch". try { if (!hasNext()) { throwOnCompleted(); @@ -267,6 +281,11 @@ public boolean hasNext() { if (hasNext()) { throw new IllegalStateException("More than one result."); } + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -294,7 +313,7 @@ public boolean hasNext() { public void forEach(Consumer action) { try { Objects.requireNonNull(action); - fetchMode = FetchMode.AGGRESSIVE; + overrideFetchMode(FetchMode.AGGRESSIVE); while (hasNext()) { action.accept(next()); } @@ -317,7 +336,7 @@ public void forEachRemaining(Consumer action) { * @return the Profile from the current response, or null */ public @Nullable Profile profile() { - return currentResponse.get().profile; + return currentResponse().profile; } /** @@ -326,7 +345,7 @@ public void forEachRemaining(Consumer action) { * @return the {@link ResponseType} of the current response. */ public @NotNull ResponseType responseType() { - return currentResponse.get().type; + return currentResponse().type; } /** @@ -337,7 +356,7 @@ public void forEachRemaining(Consumer action) { */ public @NotNull Result overrideFetchMode(FetchMode fetchMode) { this.fetchMode = fetchMode; - onStateUpdate(); + maybeContinue(); return this; } @@ -346,20 +365,28 @@ public String toString() { return "Result{" + "connection=" + connection + ", query=" + query + - ", firstRes=" + firstRes + + ", firstRes=" + sourceResponse + ", completed=" + completed + - ", currentResponse=" + currentResponse + + ", currentResponse=" + currentResponse() + '}'; } // protected methods + protected Response currentResponse() { + PartialSequence batch = currentPartial.get(); + if (batch != null) { + return batch.response; + } + return sourceResponse; + } + /** * Function called on the first response. */ protected void handleFirstResponse() { try { - ResponseType type = firstRes.type; + ResponseType type = sourceResponse.type; if (type.equals(ResponseType.WAIT_COMPLETE)) { completed.complete(true); return; @@ -367,7 +394,7 @@ protected void handleFirstResponse() { if (type.equals(ResponseType.SUCCESS_ATOM) || type.equals(ResponseType.SUCCESS_SEQUENCE)) { try { - emitData(firstRes); + emitData(sourceResponse); } catch (IndexOutOfBoundsException ex) { throw new ReqlDriverError("Atom response was empty!", ex); } @@ -376,27 +403,24 @@ protected void handleFirstResponse() { } if (type.equals(ResponseType.SUCCESS_PARTIAL)) { - // Welcome to the code documentation of partial sequences, please take a seat. - - // First of all, we emit all of this request. Reactor's buffer should handle this. - emitData(firstRes); + currentPartial.set(new PartialSequence(sourceResponse)); // It is a partial response, so connection should be able to kill us if needed, // and clients should be able to stop the Result. completed.thenAccept(finished -> { if (!finished) { - connection.sendStop(firstRes.token); + connection.sendStop(sourceResponse.token); } - connection.loseTrackOf(this); + connection.loseTrackOf(Result.this); }); - connection.keepTrackOf(this); + connection.keepTrackOf(Result.this); // We can't simply overflow buffers, so we gotta do small batches. - onStateUpdate(); + maybeContinue(); return; } - throw firstRes.makeError(query); + throw sourceResponse.makeError(query); } catch (Exception e) { completed.completeExceptionally(e); throw e; @@ -420,81 +444,90 @@ protected void throwOnCompleted() { } } - /** - * This function is called on next() - */ - protected void onStateUpdate() { - if (requesting.tryAcquire()) { - final Response lastRes = currentResponse.get(); - if (shouldContinue(lastRes)) { - // great, we should make a CONTINUE request. - connection.sendContinue(lastRes.token).whenComplete((response, t) -> { - if (t == null) { // Okay, let's process this response. - currentResponse.set(response); - if (response.type.equals(ResponseType.SUCCESS_PARTIAL)) { - // Okay, we got another partial response, so there's more. - requesting.release(); - - try { - emitting.acquire(); - emitData(response); - emitting.release(); - onStateUpdate(); //Recursion! - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. - } - } else if (response.type.equals(ResponseType.SUCCESS_SEQUENCE)) { - try { - emitting.acquire(); - emitData(response); - emitting.release(); - completed.complete(true); // Completed. This means it's over. - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. - } - } else { - completed.completeExceptionally(response.makeError(query)); // It errored. This means it's over. - } - } else { // It errored. This means it's over. - completed.completeExceptionally(t); - } - }); - } else { // Just release for re-checking later on - requesting.release(); - } + protected void maybeContinue() { + PartialSequence partial = currentPartial.get(); + if (partial != null) { + partial.maybeContinue(); } } - protected boolean shouldContinue(Response res) { - if (completed.isDone() || !res.type.equals(ResponseType.SUCCESS_PARTIAL)) { - return false; - } - return fetchMode.shouldContinue(rawQueue.size(), lastRequestCount.get()); - } - protected void onConnectionClosed() { - currentResponse.set(new Response(query.token, ResponseType.SUCCESS_SEQUENCE)); + currentPartial.set(new PartialSequence(new Response(query.token, ResponseType.SUCCESS_SEQUENCE))); completed.completeExceptionally(new ReqlRuntimeError("Connection is closed.")); } - protected void emitData(Response res) { - if (completed.isDone()) { - if (completed.join()) { - throw new RuntimeException("The Response already completed successfully."); - } else { - throw new RuntimeException("The Response was cancelled."); - } - } - lastRequestCount.set(0); + protected int emitData(Response res) { + throwOnCompleted(); + int count = 0; for (Object each : (List) Internals.convertPseudotypes(res.data, fmt)) { - if (unwrapLists && firstRes.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { + if (unwrapLists && res.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { for (Object o : ((List) each)) { rawQueue.offer(o == null ? NIL : o); - lastRequestCount.incrementAndGet(); + count++; } } else { rawQueue.offer(each == null ? NIL : each); - lastRequestCount.incrementAndGet(); + count++; + } + } + return count; + } + + private class PartialSequence { + protected final Response response; + protected final int emitted; + protected final boolean last; + protected final AtomicBoolean continued = new AtomicBoolean(); + + private PartialSequence(Response response) { + this.response = response; + if (response.type.equals(ResponseType.SUCCESS_PARTIAL)) { + this.last = false; + // Okay, we got another partial response, so there's more. + this.emitted = tryEmitData(); + maybeContinue(); //Recursion! + } else if (response.type.equals(ResponseType.SUCCESS_SEQUENCE)) { + this.last = true; + // Last response. + this.emitted = tryEmitData(); + completed.complete(true); + rawQueue.offer(END); + } else { + this.last = true; + completed.completeExceptionally(response.makeError(query)); // It errored. This means it's over. + rawQueue.offer(END); + this.emitted = 0; + } + } + + protected void maybeContinue() { + final int remaining = rawQueue.size(); + + if (!completed.isDone() && !last && fetchMode.shouldContinue(remaining, emitted)) { + continueResponse(); + } + } + + private int tryEmitData() { + try { + return emitData(response); + } catch (Exception e) { + completed.completeExceptionally(e); // It errored. This means it's over. + rawQueue.offer(END); + } + return 0; + } + + private void continueResponse() { + if (!continued.getAndSet(true)) { + connection.sendContinue(response.token).whenComplete((continued, t) -> { + if (t == null) { // Okay, let's process this response. + currentPartial.set(new PartialSequence(continued)); + } else { // It errored. This means it's over. + completed.completeExceptionally(t); + rawQueue.offer(END); + } + }); } } } From ddfde383b29bec6c7f8dc23bb7cfb88149e0933b Mon Sep 17 00:00:00 2001 From: Adrian Todt Date: Wed, 3 Jun 2020 18:10:22 -0300 Subject: [PATCH 5/5] Version bump --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index e2721f1..05a8d04 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,7 +13,7 @@ plugins { id("com.jfrog.bintray") version "1.8.4" } -version = "2.4.3" +version = "2.4.4" group = "com.rethinkdb" java.sourceCompatibility = JavaVersion.VERSION_1_8