Skip to content

Commit

Permalink
verify inflight mono's cancelled for mono firstWithValue
Browse files Browse the repository at this point in the history
  • Loading branch information
subbarao committed Nov 4, 2024
1 parent 6692812 commit 814cdc0
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.Test;

import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -172,6 +174,31 @@ void cancelIsPropagated() {
pub2.assertMaxRequested(1);
pub2.assertWasCancelled();
}
@Test
void cancelInflightMono() {
AtomicLong inflightMonos = new AtomicLong(0);
Mono<Integer> inflightMono1 = Mono.fromCallable(() -> {
Thread.sleep(300);
return 1;
}).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

Mono<Integer> inflightMono2 = Mono.fromCallable(() -> {
Thread.sleep(400);
return 2;
}).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

Mono<Integer> completedMono = Mono.fromCallable(() -> {
Thread.sleep(100);
return 3;
}).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic());

StepVerifier.withVirtualTime(() -> Mono.firstWithValue(inflightMono1, inflightMono2, completedMono))
.thenAwait(Duration.ofMillis(100))
.expectNext(3)
.verifyComplete();

assertThat(inflightMonos).hasValue(2);
}

@Test
void singleArrayNullSource() {
Expand Down

0 comments on commit 814cdc0

Please sign in to comment.