diff --git a/build.gradle.kts b/build.gradle.kts index e2721f1..05a8d04 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,7 +13,7 @@ plugins { id("com.jfrog.bintray") version "1.8.4" } -version = "2.4.3" +version = "2.4.4" group = "com.rethinkdb" java.sourceCompatibility = JavaVersion.VERSION_1_8 diff --git a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java index bab9e3b..6cb4c30 100644 --- a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java +++ b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections. @@ -177,6 +178,7 @@ public String toString() { } private static class ThreadResponsePump implements ResponsePump { + private final AtomicReference shutdownReason = new AtomicReference<>(); private final Thread thread; private Map> awaiting = new ConcurrentHashMap<>(); @@ -220,7 +222,7 @@ public ThreadResponsePump(ConnectionSocket socket, boolean daemon) { @Override public @NotNull CompletableFuture await(long token) { if (awaiting == null) { - throw new ReqlDriverError("Response pump closed."); + throw new ReqlDriverError("Response pump closed.", shutdownReason.get()); } CompletableFuture future = new CompletableFuture<>(); awaiting.put(token, future); @@ -234,6 +236,7 @@ public boolean isAlive() { private void shutdown(Throwable t) { Map> awaiting = this.awaiting; + this.shutdownReason.compareAndSet(null, t); this.awaiting = null; thread.interrupt(); if (awaiting != null) { @@ -243,7 +246,7 @@ private void shutdown(Throwable t) { @Override public void shutdownPump() { - shutdown(new ReqlDriverError("Response pump closed.")); + shutdown(new Throwable("Shutdown was requested.")); } @Override diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index 3d91667..e74eecc 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -17,7 +17,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -36,62 +36,15 @@ public class Result implements Iterator, Iterable, Closeable { * The object which represents {@code null} inside the BlockingQueue. */ private static final Object NIL = new Object(); - /** - * The fetch mode to use on partial sequences. + * The object which represents the completed signal inside the BlockingQueue. + * Used only by partial sequences to unlock threads. Multiple ENDs might be emitted. */ - public enum FetchMode { - /** - * Fetches all parts of the sequence as fast as possible.
- * WARNING: This can end up throwing {@link OutOfMemoryError}s in case of giant sequences. - */ - AGGRESSIVE, - /** - * Fetches the next part of the sequence once the buffer reaches half of the original size. - */ - PREEMPTIVE_HALF, - /** - * Fetches the next part of the sequence once the buffer reaches a third of the original size. - */ - PREEMPTIVE_THIRD, - /** - * Fetches the next part of the sequence once the buffer reaches a fourth of the original size. - */ - PREEMPTIVE_FOURTH, - /** - * Fetches the next part of the sequence once the buffer reaches a fifth of the original size. - */ - PREEMPTIVE_FIFTH, - /** - * Fetches the next part of the sequence once the buffer reaches a sixth of the original size. - */ - PREEMPTIVE_SIXTH, - /** - * Fetches the next part of the sequence once the buffer reaches a seventh of the original size. - */ - PREEMPTIVE_SEVENTH, - /** - * Fetches the next part of the sequence once the buffer reaches an eight of the original size. - */ - PREEMPTIVE_EIGHTH, - /** - * Fetches the next part of the sequence once the buffer becomes empty. - */ - LAZY; - - @Nullable - public static FetchMode fromString(String s) { - try { - return valueOf(s.toUpperCase()); - } catch (RuntimeException ignored) { - return null; - } - } - } + private static final Object END = new Object(); protected final Connection connection; protected final Query query; - protected final Response firstRes; + protected final Response sourceResponse; protected final TypeReference typeRef; protected final Internals.FormatOptions fmt; protected final boolean unwrapLists; @@ -102,26 +55,22 @@ public static FetchMode fromString(String s) { // completes with false if cancelled, otherwise with true. exceptionally completes if error. protected final CompletableFuture completed = new CompletableFuture<>(); - // This gets used if it's a partial request. - protected final Semaphore requesting = new Semaphore(1); - protected final Semaphore emitting = new Semaphore(1); - protected final AtomicLong lastRequestCount = new AtomicLong(); - protected final AtomicReference currentResponse = new AtomicReference<>(); + // Used by Partial Responses. + private final AtomicReference currentPartial = new AtomicReference<>(); public Result(Connection connection, Query query, - Response firstRes, + Response sourceResponse, FetchMode fetchMode, boolean unwrapLists, TypeReference typeRef) { this.connection = connection; this.query = query; - this.firstRes = firstRes; + this.sourceResponse = sourceResponse; this.fetchMode = fetchMode; this.typeRef = typeRef; this.fmt = Internals.parseFormatOptions(query.globalOptions); this.unwrapLists = unwrapLists; - currentResponse.set(firstRes); handleFirstResponse(); } @@ -149,7 +98,7 @@ public int bufferedCount() { * @return true if this Result is a feed. */ public boolean isFeed() { - return firstRes.isFeed(); + return sourceResponse.isFeed(); } /** @@ -161,7 +110,7 @@ public void close() { } /** - * Collect all the results, fetching from the server if necessary, to a list and closes the Result.

+ * Collect all remaining results to a list, fetching from the server if necessary, and closes the Result.

* WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * @@ -172,8 +121,8 @@ public void close() { } /** - * Collect all the results, fetching from the server if necessary, using a {@link Collector} and closes the Result. - *

+ * Collect all remaining results using the provided {@link Collector}, fetching from the server if necessary, and + * closes the Result.

* WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * @@ -184,11 +133,10 @@ public void close() { */ public R collect(@NotNull Collector collector) { try { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); + overrideFetchMode(FetchMode.AGGRESSIVE); A container = collector.supplier().get(); BiConsumer accumulator = collector.accumulator(); - forEachRemaining(next -> accumulator.accept(container, next)); + forEach(next -> accumulator.accept(container, next)); return collector.finisher().apply(container); } finally { close(); @@ -198,29 +146,25 @@ public R collect(@NotNull Collector collector) { /** * Creates a new sequential {@code Stream} from the results, which closes this Result on completion. *

- * WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} - * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. + * WARNING: If {@link Result#isFeed()} is true, this stream is possibly infinite. This method changes the + * {@code fetchMode} of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * * @return the newly created stream. */ public @NotNull Stream stream() { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); - return StreamSupport.stream(spliterator(), false).onClose(this::close); + return StreamSupport.stream(overrideFetchMode(FetchMode.AGGRESSIVE).spliterator(), false).onClose(this::close); } /** * Creates a new parallel {@code Stream} from the results, which closes this Result on completion. *

- * WARNING: If {@link Result#isFeed()} is true, this may never return. This method changes the {@code fetchMode} - * of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. + * WARNING: If {@link Result#isFeed()} is true, this stream is possibly infinite. This method changes the + * {@code fetchMode} of this Result to {@link FetchMode#AGGRESSIVE} to complete this as fast as possible. * * @return the newly created stream. */ public @NotNull Stream parallelStream() { - fetchMode = FetchMode.AGGRESSIVE; - onStateUpdate(); - return StreamSupport.stream(spliterator(), true).onClose(this::close); + return StreamSupport.stream(overrideFetchMode(FetchMode.AGGRESSIVE).spliterator(), true).onClose(this::close); } /** @@ -228,7 +172,7 @@ public R collect(@NotNull Collector collector) { */ @Override public boolean hasNext() { - return !rawQueue.isEmpty() || !completed.isDone(); + return !rawQueue.isEmpty() && rawQueue.peek() != END || !completed.isDone(); } /** @@ -245,11 +189,17 @@ public boolean hasNext() { if (!hasNext()) { throwOnCompleted(); } + maybeContinue(); Object next = rawQueue.poll(timeout, unit); + maybeContinue(); if (next == null) { throw new TimeoutException("The poll operation timed out."); } - onStateUpdate(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -268,8 +218,14 @@ public boolean hasNext() { if (!hasNext()) { throwOnCompleted(); } - Object next = rawQueue.take(); - onStateUpdate(); + maybeContinue(); + Object next = rawQueue.take(); // This method shouldn't block forever. + maybeContinue(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -286,11 +242,17 @@ public boolean hasNext() { * @return the first result available. */ public @Nullable T first() { + // This method should never call "maybeNextBatch". try { if (!hasNext()) { throwOnCompleted(); } Object next = rawQueue.take(); + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -310,6 +272,7 @@ public boolean hasNext() { * @return the first result available. */ public @Nullable T single() { + // This method should never call "maybeNextBatch". try { if (!hasNext()) { throwOnCompleted(); @@ -318,6 +281,11 @@ public boolean hasNext() { if (hasNext()) { throw new IllegalStateException("More than one result."); } + if (next == END) { + rawQueue.offer(END); + throwOnCompleted(); + throw new ReqlDriverError("END reached, but wasn't completed. Please contact devs!"); + } if (next == NIL) { return null; } @@ -345,7 +313,7 @@ public boolean hasNext() { public void forEach(Consumer action) { try { Objects.requireNonNull(action); - fetchMode = FetchMode.AGGRESSIVE; + overrideFetchMode(FetchMode.AGGRESSIVE); while (hasNext()) { action.accept(next()); } @@ -368,7 +336,7 @@ public void forEachRemaining(Consumer action) { * @return the Profile from the current response, or null */ public @Nullable Profile profile() { - return currentResponse.get().profile; + return currentResponse().profile; } /** @@ -377,7 +345,7 @@ public void forEachRemaining(Consumer action) { * @return the {@link ResponseType} of the current response. */ public @NotNull ResponseType responseType() { - return currentResponse.get().type; + return currentResponse().type; } /** @@ -388,7 +356,7 @@ public void forEachRemaining(Consumer action) { */ public @NotNull Result overrideFetchMode(FetchMode fetchMode) { this.fetchMode = fetchMode; - onStateUpdate(); + maybeContinue(); return this; } @@ -397,20 +365,28 @@ public String toString() { return "Result{" + "connection=" + connection + ", query=" + query + - ", firstRes=" + firstRes + + ", firstRes=" + sourceResponse + ", completed=" + completed + - ", currentResponse=" + currentResponse + + ", currentResponse=" + currentResponse() + '}'; } // protected methods + protected Response currentResponse() { + PartialSequence batch = currentPartial.get(); + if (batch != null) { + return batch.response; + } + return sourceResponse; + } + /** * Function called on the first response. */ protected void handleFirstResponse() { try { - ResponseType type = firstRes.type; + ResponseType type = sourceResponse.type; if (type.equals(ResponseType.WAIT_COMPLETE)) { completed.complete(true); return; @@ -418,7 +394,7 @@ protected void handleFirstResponse() { if (type.equals(ResponseType.SUCCESS_ATOM) || type.equals(ResponseType.SUCCESS_SEQUENCE)) { try { - emitData(firstRes); + emitData(sourceResponse); } catch (IndexOutOfBoundsException ex) { throw new ReqlDriverError("Atom response was empty!", ex); } @@ -427,27 +403,24 @@ protected void handleFirstResponse() { } if (type.equals(ResponseType.SUCCESS_PARTIAL)) { - // Welcome to the code documentation of partial sequences, please take a seat. - - // First of all, we emit all of this request. Reactor's buffer should handle this. - emitData(firstRes); + currentPartial.set(new PartialSequence(sourceResponse)); // It is a partial response, so connection should be able to kill us if needed, // and clients should be able to stop the Result. completed.thenAccept(finished -> { if (!finished) { - connection.sendStop(firstRes.token); + connection.sendStop(sourceResponse.token); } - connection.loseTrackOf(this); + connection.loseTrackOf(Result.this); }); - connection.keepTrackOf(this); + connection.keepTrackOf(Result.this); // We can't simply overflow buffers, so we gotta do small batches. - onStateUpdate(); + maybeContinue(); return; } - throw firstRes.makeError(query); + throw sourceResponse.makeError(query); } catch (Exception e) { completed.completeExceptionally(e); throw e; @@ -471,106 +444,189 @@ protected void throwOnCompleted() { } } - /** - * This function is called on next() - */ - protected void onStateUpdate() { - final Response lastRes = currentResponse.get(); - if (shouldContinue(lastRes) && requesting.tryAcquire()) { - // great, we should make a CONTINUE request. - connection.sendContinue(lastRes.token).whenComplete((nextRes, t) -> { - if (t != null) { // It errored. This means it's over. - completed.completeExceptionally(t); - } else { // Okay, let's process this response. - currentResponse.set(nextRes); - if (nextRes.type.equals(ResponseType.SUCCESS_SEQUENCE)) { - try { - emitting.acquire(); - emitData(nextRes); - emitting.release(); - completed.complete(true); // Completed. This means it's over. - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. - } - } else if (nextRes.type.equals(ResponseType.SUCCESS_PARTIAL)) { - // Okay, we got another partial response, so there's more. - - requesting.release(); // Request's over, release this for later. - try { - emitting.acquire(); - emitData(nextRes); - emitting.release(); - onStateUpdate(); //Recursion! - } catch (Exception e) { - completed.completeExceptionally(e); // It errored. This means it's over. - } - } else { - completed.completeExceptionally(firstRes.makeError(query)); // It errored. This means it's over. - } + protected void maybeContinue() { + PartialSequence partial = currentPartial.get(); + if (partial != null) { + partial.maybeContinue(); + } + } + + protected void onConnectionClosed() { + currentPartial.set(new PartialSequence(new Response(query.token, ResponseType.SUCCESS_SEQUENCE))); + completed.completeExceptionally(new ReqlRuntimeError("Connection is closed.")); + } + + protected int emitData(Response res) { + throwOnCompleted(); + int count = 0; + for (Object each : (List) Internals.convertPseudotypes(res.data, fmt)) { + if (unwrapLists && res.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { + for (Object o : ((List) each)) { + rawQueue.offer(o == null ? NIL : o); + count++; } - }); + } else { + rawQueue.offer(each == null ? NIL : each); + count++; + } } + return count; } - protected boolean shouldContinue(Response res) { - if (completed.isDone() || !res.type.equals(ResponseType.SUCCESS_PARTIAL)) { - return false; + private class PartialSequence { + protected final Response response; + protected final int emitted; + protected final boolean last; + protected final AtomicBoolean continued = new AtomicBoolean(); + + private PartialSequence(Response response) { + this.response = response; + if (response.type.equals(ResponseType.SUCCESS_PARTIAL)) { + this.last = false; + // Okay, we got another partial response, so there's more. + this.emitted = tryEmitData(); + maybeContinue(); //Recursion! + } else if (response.type.equals(ResponseType.SUCCESS_SEQUENCE)) { + this.last = true; + // Last response. + this.emitted = tryEmitData(); + completed.complete(true); + rawQueue.offer(END); + } else { + this.last = true; + completed.completeExceptionally(response.makeError(query)); // It errored. This means it's over. + rawQueue.offer(END); + this.emitted = 0; + } } - switch (fetchMode) { - case PREEMPTIVE_HALF: { - return rawQueue.size() * 2 < lastRequestCount.get(); + + protected void maybeContinue() { + final int remaining = rawQueue.size(); + + if (!completed.isDone() && !last && fetchMode.shouldContinue(remaining, emitted)) { + continueResponse(); } - case PREEMPTIVE_THIRD: { - return rawQueue.size() * 3 < lastRequestCount.get(); + } + + private int tryEmitData() { + try { + return emitData(response); + } catch (Exception e) { + completed.completeExceptionally(e); // It errored. This means it's over. + rawQueue.offer(END); } - case PREEMPTIVE_FOURTH: { - return rawQueue.size() * 4 < lastRequestCount.get(); + return 0; + } + + private void continueResponse() { + if (!continued.getAndSet(true)) { + connection.sendContinue(response.token).whenComplete((continued, t) -> { + if (t == null) { // Okay, let's process this response. + currentPartial.set(new PartialSequence(continued)); + } else { // It errored. This means it's over. + completed.completeExceptionally(t); + rawQueue.offer(END); + } + }); } - case PREEMPTIVE_FIFTH: { - return rawQueue.size() * 5 < lastRequestCount.get(); + } + } + + /** + * The fetch mode to use on partial sequences. + */ + public enum FetchMode { + /** + * Fetches all parts of the sequence as fast as possible.
+ * WARNING: This can end up throwing {@link OutOfMemoryError}s in case of giant sequences. + */ + AGGRESSIVE { + @Override + public boolean shouldContinue(int size, int requestSize) { + return true; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches half of the original size. + */ + PREEMPTIVE_HALF { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 2; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a third of the original size. + */ + PREEMPTIVE_THIRD { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 3; + } + }, + /** + * Fetches the next part of the sequence once the buffer reaches a fourth of the original size. + */ + PREEMPTIVE_FOURTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 4; } - case PREEMPTIVE_SIXTH: { - return rawQueue.size() * 6 < lastRequestCount.get(); + }, + /** + * Fetches the next part of the sequence once the buffer reaches a fifth of the original size. + */ + PREEMPTIVE_FIFTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 5; } - case PREEMPTIVE_SEVENTH: { - return rawQueue.size() * 7 < lastRequestCount.get(); + }, + /** + * Fetches the next part of the sequence once the buffer reaches a sixth of the original size. + */ + PREEMPTIVE_SIXTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 6; } - case PREEMPTIVE_EIGHTH: { - return rawQueue.size() * 8 < lastRequestCount.get(); + }, + /** + * Fetches the next part of the sequence once the buffer reaches a seventh of the original size. + */ + PREEMPTIVE_SEVENTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 7; } - case LAZY: { - return rawQueue.isEmpty(); + }, + /** + * Fetches the next part of the sequence once the buffer reaches an eight of the original size. + */ + PREEMPTIVE_EIGHTH { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size <= requestSize / 8; } - case AGGRESSIVE: - default: { - return true; + }, + /** + * Fetches the next part of the sequence once the buffer becomes empty. + */ + LAZY { + @Override + public boolean shouldContinue(int size, int requestSize) { + return size == 0; } - } - } + }; - protected void onConnectionClosed() { - currentResponse.set(new Response(query.token, ResponseType.SUCCESS_SEQUENCE)); - completed.completeExceptionally(new ReqlRuntimeError("Connection is closed.")); - } + public abstract boolean shouldContinue(int size, int requestSize); - protected void emitData(Response res) { - if (completed.isDone()) { - if (completed.join()) { - throw new RuntimeException("The Response already completed successfully."); - } else { - throw new RuntimeException("The Response was cancelled."); - } - } - lastRequestCount.set(0); - for (Object each : (List) Internals.convertPseudotypes(res.data, fmt)) { - if (unwrapLists && firstRes.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { - for (Object o : ((List) each)) { - rawQueue.offer(o == null ? NIL : o); - lastRequestCount.incrementAndGet(); - } - } else { - rawQueue.offer(each == null ? NIL : each); - lastRequestCount.incrementAndGet(); + @Nullable + public static FetchMode fromString(String s) { + try { + return valueOf(s.toUpperCase()); + } catch (RuntimeException ignored) { + return null; } } } diff --git a/src/test/java/com/rethinkdb/FetchModesTest.java b/src/test/java/com/rethinkdb/FetchModesTest.java new file mode 100644 index 0000000..9f6e5e5 --- /dev/null +++ b/src/test/java/com/rethinkdb/FetchModesTest.java @@ -0,0 +1,59 @@ +package com.rethinkdb; + +import com.rethinkdb.net.Result.FetchMode; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FetchModesTest { + /** + * Least common multiple of 2, 3, 4, 5, 6, 7, 8. + * Size is not realistic, but makes the divisions easier. + * + */ + private static final int REQUEST_SIZE = 840; + + @Test + public void testBasic() { + FetchMode[] values = FetchMode.values(); + for (FetchMode value : values) { + assertTrue(value.shouldContinue(0, REQUEST_SIZE)); + } + } + + @Test + public void testAggressive() { + for (int i = 0; i < REQUEST_SIZE; i++) { + assertTrue(FetchMode.AGGRESSIVE.shouldContinue(i, REQUEST_SIZE)); + } + } + + @Test + public void testLazy() { + for (int i = 1; i < REQUEST_SIZE; i++) { + assertFalse(FetchMode.LAZY.shouldContinue(i, REQUEST_SIZE)); + } + assertTrue(FetchMode.LAZY.shouldContinue(0, REQUEST_SIZE)); + } + + @Test + public void testPreemptive() { + testPreemptiveImpl(2, FetchMode.PREEMPTIVE_HALF); + testPreemptiveImpl(3, FetchMode.PREEMPTIVE_THIRD); + testPreemptiveImpl(4, FetchMode.PREEMPTIVE_FOURTH); + testPreemptiveImpl(5, FetchMode.PREEMPTIVE_FIFTH); + testPreemptiveImpl(6, FetchMode.PREEMPTIVE_SIXTH); + testPreemptiveImpl(7, FetchMode.PREEMPTIVE_SEVENTH); + testPreemptiveImpl(8, FetchMode.PREEMPTIVE_EIGHTH); + } + + private void testPreemptiveImpl(int splitAt, FetchMode mode) { + for (int i = 0; i <= REQUEST_SIZE / splitAt; i++) { + assertTrue(mode.shouldContinue(i, REQUEST_SIZE)); + } + for (int i = REQUEST_SIZE / splitAt + 1; i < REQUEST_SIZE; i++) { + assertFalse(mode.shouldContinue(i, REQUEST_SIZE)); + } + } +}