diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index f42986cad3c..e1e43244dd3 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -70,7 +70,8 @@ Optimizations Bug Fixes --------------------- -(No changes) + +* SOLR-16886: Don't commit multi-part uploads that have been aborted (Tomás Fernández Löbbe, Houston Putman) Dependency Upgrades --------------------- diff --git a/solr/modules/s3-repository/build.gradle b/solr/modules/s3-repository/build.gradle index b8a9763e907..2185bf9b71f 100644 --- a/solr/modules/s3-repository/build.gradle +++ b/solr/modules/s3-repository/build.gradle @@ -80,6 +80,13 @@ dependencies { testImplementation 'commons-io:commons-io' testRuntimeOnly 'org.eclipse.jetty:jetty-webapp' + + testImplementation('org.mockito:mockito-core', { + exclude group: "net.bytebuddy", module: "byte-buddy-agent" + }) + testRuntimeOnly('org.mockito:mockito-subclass', { + exclude group: "net.bytebuddy", module: "byte-buddy-agent" + }) } test { diff --git a/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java b/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java index 9a46f45101e..25bf3465e7f 100644 --- a/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java +++ b/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java @@ -163,14 +163,19 @@ public void close() throws IOException { return; } + if (multiPartUpload != null && multiPartUpload.aborted) { + multiPartUpload = null; + closed = true; + return; + } + // flush first uploadPart(); if (multiPartUpload != null) { multiPartUpload.complete(); - multiPartUpload = null; } - + multiPartUpload = null; closed = true; } @@ -186,6 +191,7 @@ private MultipartUpload newMultipartUpload() throws IOException { private class MultipartUpload { private final String uploadId; private final List completedParts; + private boolean aborted = false; public MultipartUpload(String uploadId) { this.uploadId = uploadId; @@ -200,6 +206,10 @@ public MultipartUpload(String uploadId) { } void uploadPart(ByteArrayInputStream inputStream, long partSize) { + if (aborted) { + throw new IllegalStateException( + "Can't upload new parts on a MultipartUpload that was aborted. id '" + uploadId + "'"); + } int currentPartNumber = completedParts.size() + 1; UploadPartRequest request = @@ -221,6 +231,10 @@ void uploadPart(ByteArrayInputStream inputStream, long partSize) { /** To be invoked when closing the stream to mark upload is done. */ void complete() { + if (aborted) { + throw new IllegalStateException( + "Can't complete a MultipartUpload that was aborted. id '" + uploadId + "'"); + } if (log.isDebugEnabled()) { log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId); } @@ -242,6 +256,9 @@ public void abort() { // ignoring failure on abort. log.error("Unable to abort multipart upload, you may need to purge uploaded parts: ", e); } + // Even if the abort operation failed, we consider this MultiPartUpload aborted, + // and we'll not try to complete it. + aborted = true; } } } diff --git a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java new file mode 100644 index 00000000000..f0bcd196975 --- /dev/null +++ b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.s3; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; +import org.apache.solr.SolrTestCaseJ4; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +public class S3OutputStreamMockitoTest extends SolrTestCaseJ4 { + + private S3Client clientMock; + + private static byte[] largeBuffer; + + @BeforeClass + public static void setUpClass() { + assumeWorkingMockito(); + String content = + RandomStrings.randomAsciiAlphanumOfLength(random(), S3OutputStream.PART_SIZE + 1024); + largeBuffer = content.getBytes(StandardCharsets.UTF_8); + // pre-check -- ensure that our test string isn't too small + assertTrue(largeBuffer.length > S3OutputStream.PART_SIZE); + } + + @AfterClass + public static void tearDownClass() { + largeBuffer = null; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + clientMock = mock(S3Client.class); + } + + @SuppressWarnings("unchecked") + public void testMultiPartUploadCompleted() throws IOException { + when(clientMock.createMultipartUpload((Consumer) any())) + .thenReturn(CreateMultipartUploadResponse.builder().build()); + when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any())) + .thenReturn(UploadPartResponse.builder().build()); + S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket"); + stream.write(largeBuffer); + verify(clientMock) + .createMultipartUpload((Consumer) any()); + verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any()); + verify(clientMock, never()) + .completeMultipartUpload((Consumer) any()); + verify(clientMock, never()) + .abortMultipartUpload((Consumer) any()); + + stream.close(); + verify(clientMock) + .completeMultipartUpload((Consumer) any()); + verify(clientMock, never()) + .abortMultipartUpload((Consumer) any()); + } + + @SuppressWarnings("unchecked") + public void testMultiPartUploadAborted() throws IOException { + when(clientMock.createMultipartUpload((Consumer) any())) + .thenReturn(CreateMultipartUploadResponse.builder().build()); + when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any())) + .thenThrow(S3Exception.builder().message("fake exception").build()); + S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket"); + // first time it should throw the exception from S3Client + org.apache.solr.s3.S3Exception solrS3Exception = + assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer)); + assertEquals(S3Exception.class, solrS3Exception.getCause().getClass()); + assertEquals("fake exception", solrS3Exception.getCause().getMessage()); + verify(clientMock).abortMultipartUpload((Consumer) any()); + + // after that, the exception should be because the MPU is aborted + solrS3Exception = + assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer)); + assertEquals(IllegalStateException.class, solrS3Exception.getCause().getClass()); + assertTrue( + "Unexpected exception message: " + solrS3Exception.getCause().getMessage(), + solrS3Exception + .getCause() + .getMessage() + .contains("Can't upload new parts on a MultipartUpload that was aborted")); + + verify(clientMock) + .createMultipartUpload((Consumer) any()); + verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any()); + verify(clientMock, never()) + .completeMultipartUpload((Consumer) any()); + verify(clientMock, times(2)) + .abortMultipartUpload((Consumer) any()); + + stream.close(); + verify(clientMock, never()) + .completeMultipartUpload((Consumer) any()); + } +}