diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateIf.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateIf.java index c8e2b1565f..36f86607d8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateIf.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateIf.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.util.Logger; -import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -269,8 +267,12 @@ final void remove(CacheMonoSubscriber toRemove) { if (n == 1) { if (SUBSCRIBERS.compareAndSet(this, a, COORDINATOR_DONE)) { - //cancel the subscription no matter what, at this point coordinator is done and cannot accept new subscribers - this.upstream.cancel(); + // Cancel the subscription if it has been established. + // At this point coordinator is done and cannot accept new subscribers. + Subscription upstream = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); + if (upstream != null) { + upstream.cancel(); + } //only switch to EMPTY_STATE if the current state is this coordinator STATE.compareAndSet(this.main, this, EMPTY_STATE); return; @@ -294,7 +296,12 @@ void delayedSubscribe() { if (old != null && old != Operators.cancelledSubscription()) { old.cancel(); } - source.subscribe(this); + // Only trigger the upstream subscription if the coordinator has not been + // aborted. This can happen when meanwhile all the downstream Subscribers + // have been cancelled. + if (SUBSCRIBERS.get(this) != COORDINATOR_DONE) { + source.subscribe(this); + } } @Override diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCacheInvalidateIfTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCacheInvalidateIfTest.java index c391d20218..94f4aaea48 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCacheInvalidateIfTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCacheInvalidateIfTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,15 @@ package reactor.core.publisher; import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import reactor.core.Disposable; +import reactor.core.scheduler.Schedulers; import reactor.test.MemoryUtils; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; @@ -153,4 +157,57 @@ void cancellingAllSubscribersBeforeOnNextInvalidates() { sub3.dispose(); source.assertCancelled(1); } + + // See https://github.com/reactor/reactor-core/issues/3907 + @Test + public void cancelBeforeUpstreamSubscribeDoesntFail() throws Exception { + AtomicReference error = new AtomicReference<>(); + AtomicInteger upstreamCalled = new AtomicInteger(); + + CountDownLatch cancelled = new CountDownLatch(1); + CountDownLatch cacheInvalidateInnerSubscribed = new CountDownLatch(1); + CountDownLatch mainChainDone = new CountDownLatch(1); + + Mono cachedMono = Mono.fromSupplier(() -> { + upstreamCalled.incrementAndGet(); + return "foobar"; + }) + .cacheInvalidateIf(c -> true); + + Mono mono = Mono.defer(() -> Mono.just("foo")) + .flatMap(x -> { + return cachedMono + .doOnSubscribe(s -> { + cacheInvalidateInnerSubscribed.countDown(); + try { cancelled.await(1, TimeUnit.SECONDS); } catch (Exception e) {} + }) + .subscribeOn(Schedulers.boundedElastic()); + }) + .doOnError(error::set) + .doFinally(s -> mainChainDone.countDown()); + + Disposable d = mono.subscribe(); + + assertThat(cacheInvalidateInnerSubscribed.await(1, TimeUnit.SECONDS)) + .as("Should assemble the chain") + .isTrue(); + + d.dispose(); + cancelled.countDown(); + + + assertThat(mainChainDone.await(1, TimeUnit.SECONDS)) + .as("Should finish in time") + .isTrue(); + + // Force an actual upstream subscription to validate the count + cachedMono.block(); + + assertThat(upstreamCalled.get()) + .as("No upstream subscription expected") + .isEqualTo(1); + assertThat(error.get()) + .as("No errors expected") + .isNull(); + } }