Skip to content

Commit

Permalink
SOLR-16886: Don't commit multi-part uploads that have been aborted (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tflobbe committed Jul 17, 2023
1 parent 42a15fd commit 0f66a6b
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 3 deletions.
3 changes: 2 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Optimizations

Bug Fixes
---------------------
(No changes)

* SOLR-16886: Don't commit multi-part uploads that have been aborted (Tomás Fernández Löbbe, Houston Putman)

Dependency Upgrades
---------------------
Expand Down
7 changes: 7 additions & 0 deletions solr/modules/s3-repository/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ dependencies {
testImplementation 'commons-io:commons-io'

testRuntimeOnly 'org.eclipse.jetty:jetty-webapp'

testImplementation('org.mockito:mockito-core', {
exclude group: "net.bytebuddy", module: "byte-buddy-agent"
})
testRuntimeOnly('org.mockito:mockito-subclass', {
exclude group: "net.bytebuddy", module: "byte-buddy-agent"
})
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,19 @@ public void close() throws IOException {
return;
}

if (multiPartUpload != null && multiPartUpload.aborted) {
multiPartUpload = null;
closed = true;
return;
}

// flush first
uploadPart();

if (multiPartUpload != null) {
multiPartUpload.complete();
multiPartUpload = null;
}

multiPartUpload = null;
closed = true;
}

Expand All @@ -186,6 +191,7 @@ private MultipartUpload newMultipartUpload() throws IOException {
private class MultipartUpload {
private final String uploadId;
private final List<CompletedPart> completedParts;
private boolean aborted = false;

public MultipartUpload(String uploadId) {
this.uploadId = uploadId;
Expand All @@ -200,6 +206,10 @@ public MultipartUpload(String uploadId) {
}

void uploadPart(ByteArrayInputStream inputStream, long partSize) {
if (aborted) {
throw new IllegalStateException(
"Can't upload new parts on a MultipartUpload that was aborted. id '" + uploadId + "'");
}
int currentPartNumber = completedParts.size() + 1;

UploadPartRequest request =
Expand All @@ -221,6 +231,10 @@ void uploadPart(ByteArrayInputStream inputStream, long partSize) {

/** To be invoked when closing the stream to mark upload is done. */
void complete() {
if (aborted) {
throw new IllegalStateException(
"Can't complete a MultipartUpload that was aborted. id '" + uploadId + "'");
}
if (log.isDebugEnabled()) {
log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
}
Expand All @@ -242,6 +256,9 @@ public void abort() {
// ignoring failure on abort.
log.error("Unable to abort multipart upload, you may need to purge uploaded parts: ", e);
}
// Even if the abort operation failed, we consider this MultiPartUpload aborted,
// and we'll not try to complete it.
aborted = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.s3;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

public class S3OutputStreamMockitoTest extends SolrTestCaseJ4 {

private S3Client clientMock;

private static byte[] largeBuffer;

@BeforeClass
public static void setUpClass() {
assumeWorkingMockito();
String content =
RandomStrings.randomAsciiAlphanumOfLength(random(), S3OutputStream.PART_SIZE + 1024);
largeBuffer = content.getBytes(StandardCharsets.UTF_8);
// pre-check -- ensure that our test string isn't too small
assertTrue(largeBuffer.length > S3OutputStream.PART_SIZE);
}

@AfterClass
public static void tearDownClass() {
largeBuffer = null;
}

@Override
public void setUp() throws Exception {
super.setUp();
clientMock = mock(S3Client.class);
}

@SuppressWarnings("unchecked")
public void testMultiPartUploadCompleted() throws IOException {
when(clientMock.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any()))
.thenReturn(CreateMultipartUploadResponse.builder().build());
when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any()))
.thenReturn(UploadPartResponse.builder().build());
S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket");
stream.write(largeBuffer);
verify(clientMock)
.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any());
verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any());
verify(clientMock, never())
.completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
verify(clientMock, never())
.abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());

stream.close();
verify(clientMock)
.completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
verify(clientMock, never())
.abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());
}

@SuppressWarnings("unchecked")
public void testMultiPartUploadAborted() throws IOException {
when(clientMock.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any()))
.thenReturn(CreateMultipartUploadResponse.builder().build());
when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any()))
.thenThrow(S3Exception.builder().message("fake exception").build());
S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket");
// first time it should throw the exception from S3Client
org.apache.solr.s3.S3Exception solrS3Exception =
assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer));
assertEquals(S3Exception.class, solrS3Exception.getCause().getClass());
assertEquals("fake exception", solrS3Exception.getCause().getMessage());
verify(clientMock).abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());

// after that, the exception should be because the MPU is aborted
solrS3Exception =
assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer));
assertEquals(IllegalStateException.class, solrS3Exception.getCause().getClass());
assertTrue(
"Unexpected exception message: " + solrS3Exception.getCause().getMessage(),
solrS3Exception
.getCause()
.getMessage()
.contains("Can't upload new parts on a MultipartUpload that was aborted"));

verify(clientMock)
.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any());
verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any());
verify(clientMock, never())
.completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
verify(clientMock, times(2))
.abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());

stream.close();
verify(clientMock, never())
.completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
}
}

0 comments on commit 0f66a6b

Please sign in to comment.