Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Cluster State] Remote state interfaces #14019

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;

protected String blobName;
private final String clusterUUID;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private String[] pathTokens;

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
this.clusterUUID = clusterUUID;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

public abstract BlobPathParameters getBlobPathParameters();

public String getFullBlobName() {
return blobName;
}

public String getBlobFileName() {
if (blobFileName == null) {
String[] pathTokens = getBlobPathTokens();
if (pathTokens == null || pathTokens.length < 1) {
return null;
}
blobFileName = pathTokens[pathTokens.length - 1];
}
return blobFileName;
}

public String[] getBlobPathTokens() {
if (pathTokens != null) {
return pathTokens;
}
if (blobName == null) {
return null;
}
pathTokens = blobName.split(PATH_DELIMITER);
return pathTokens;
}

public abstract String generateBlobFileName();

public String clusterUUID() {
return clusterUUID;
}

public abstract UploadedMetadata getUploadedMetadata();

public void setFullBlobName(BlobPath blobPath) {
this.blobName = blobPath.buildAsString() + blobFileName;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}

protected Compressor getCompressor() {
return compressor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import java.util.List;

/**
* Parameters which can be used to construct a blob path
*
*/
public class BlobPathParameters {

private final List<String> pathTokens;
private final String filePrefix;

public BlobPathParameters(final List<String> pathTokens, final String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

public List<String> getPathTokens() {
return pathTokens;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;

import java.io.IOException;

/**
* An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
*/
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {

public void writeAsync(U entity, ActionListener<Void> listener);

public T read(U entity) throws IOException;

public void readAsync(U entity, ActionListener<T> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface to which provides defines the serialization/deserialization methods for objects to be uploaded to or downloaded from remote store.
* This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
*/
public interface RemoteWriteableEntity<T> {
/**
* @return An InputStream created by serializing the entity T
* @throws IOException Exception encountered while serialization
*/
public InputStream serialize() throws IOException;

/**
* @param inputStream The InputStream which is used to read the serialized entity
* @return The entity T after deserialization
* @throws IOException Exception encountered while deserialization
*/
public T deserialize(InputStream inputStream) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/**
* Common remote store package
*/
package org.opensearch.common.remote;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* Utility class for Remote Cluster State
*/
public class RemoteClusterStateUtils {
public static final String PATH_DELIMITER = "/";

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* Abstract class for a blob type storage
*
* @param <T> The entity which can be uploaded to / downloaded from blob store
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
*/
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public RemoteClusterStateBlobStore(
final BlobStoreTransferService blobStoreTransferService,
final BlobStoreRepository blobStoreRepository,
final String clusterName,
final ThreadPool threadPool,
final String executor
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(executor);
}

@Override
public void writeAsync(final U entity, final ActionListener<Void> listener) {
try {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName()));
}

@Override
public void readAsync(final U entity, final ActionListener<T> listener) {
executorService.execute(() -> {
try {
listener.onResponse(read(entity));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
BlobPath blobPath = blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(clusterName))
.add("cluster-state")
.add(obj.clusterUUID());
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
String[] pathTokens = obj.getBlobPathTokens();
BlobPath blobPath = new BlobPath();
if (pathTokens == null || pathTokens.length < 1) {
return blobPath;
}
// Iterate till second last path token to get the blob folder
for (int i = 0; i < pathTokens.length - 1; i++) {
blobPath = blobPath.add(pathTokens[i]);
}
return blobPath;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package containing models for remote cluster state
*/
package org.opensearch.gateway.remote.model;
Loading