From 814cdc0ed1bc44aace599a0e4eb8262d1cbd9e12 Mon Sep 17 00:00:00 2001 From: subbarao Date: Sun, 3 Nov 2024 19:40:17 -0500 Subject: [PATCH] verify inflight mono's cancelled for mono firstWithValue --- .../publisher/MonoFirstWithValueTest.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoFirstWithValueTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoFirstWithValueTest.java index 9b606e9242..863fe0520b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoFirstWithValueTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoFirstWithValueTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,13 @@ import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Test; import reactor.core.Exceptions; import reactor.core.Scannable; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import reactor.test.subscriber.AssertSubscriber; @@ -172,6 +174,31 @@ void cancelIsPropagated() { pub2.assertMaxRequested(1); pub2.assertWasCancelled(); } + @Test + void cancelInflightMono() { + AtomicLong inflightMonos = new AtomicLong(0); + Mono inflightMono1 = Mono.fromCallable(() -> { + Thread.sleep(300); + return 1; + }).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic()); + + Mono inflightMono2 = Mono.fromCallable(() -> { + Thread.sleep(400); + return 2; + }).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic()); + + Mono completedMono = Mono.fromCallable(() -> { + Thread.sleep(100); + return 3; + }).doOnCancel(inflightMonos::getAndIncrement).subscribeOn(Schedulers.boundedElastic()); + + StepVerifier.withVirtualTime(() -> Mono.firstWithValue(inflightMono1, inflightMono2, completedMono)) + .thenAwait(Duration.ofMillis(100)) + .expectNext(3) + .verifyComplete(); + + assertThat(inflightMonos).hasValue(2); + } @Test void singleArrayNullSource() {