Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFS now uses reactor-netty for bulk indexing #607

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ dependencies {
implementation 'org.apache.lucene:lucene-core:8.11.3'
implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3'
implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3'

implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'
implementation 'software.amazon.awssdk:s3:2.25.16'

testImplementation 'io.projectreactor:reactor-test:3.6.5'
testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testImplementation 'org.mockito:mockito-core:5.11.0'
Expand Down Expand Up @@ -120,4 +126,13 @@ tasks.getByName('composeUp')

test {
useJUnitPlatform()

testLogging {
exceptionFormat = 'full'
events "failed"
showExceptions true
showCauses true
showStackTraces true
showStandardStreams = true
}
}
16 changes: 6 additions & 10 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import reactor.core.publisher.Flux;

import com.rfs.common.*;
import com.rfs.transformers.*;
Expand Down Expand Up @@ -125,7 +126,7 @@ public static void main(String[] args) throws InterruptedException {

Logging.setLevel(logLevel);

ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Sanity checks
Expand Down Expand Up @@ -355,18 +356,13 @@ public static void main(String[] args) throws InterruptedException {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ===");

List<Document> documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
logger.info("Documents read successfully");
Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, documents, targetConnection);

for (Document document : documents) {
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, document, targetConnection);
}
logger.info("Shard reindexing completed");
}
}

logger.info("Documents reindexed successfully");

logger.info("Refreshing newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
Expand Down
41 changes: 38 additions & 3 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.rfs.common;

import java.net.URI;
import java.net.URISyntaxException;

/**
* Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster
*/
Expand All @@ -9,13 +12,21 @@ public static enum AuthType {
NONE
}

public final String host;
public static enum Protocol {
HTTP,
HTTPS
}

public final String url;
public final Protocol protocol;
public final String hostName;
public final int port;
public final String username;
public final String password;
public final AuthType authType;

public ConnectionDetails(String host, String username, String password) {
this.host = host; // http://localhost:9200
public ConnectionDetails(String url, String username, String password) {
this.url = url; // http://localhost:9200

// If the username is provided, the password must be as well, and vice versa
if ((username == null && password != null) || (username != null && password == null)) {
Expand All @@ -28,5 +39,29 @@ public ConnectionDetails(String host, String username, String password) {

this.username = username;
this.password = password;

if (url == null) {
hostName = null;
port = -1;
protocol = null;
} else {
URI uri;
try {
uri = new URI(url);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid URL format", e);
}

hostName = uri.getHost();
port = uri.getPort(); // Default port can be set here if -1

if (uri.getScheme().equals("http")) {
protocol = Protocol.HTTP;
} else if (uri.getScheme().equals("https")) {
protocol = Protocol.HTTPS;
} else {
throw new IllegalArgumentException("Invalid protocol");
}
}
}
}
110 changes: 101 additions & 9 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,124 @@
package com.rfs.common;

import java.time.Duration;
import java.util.Base64;
import java.util.List;

import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;


public class DocumentReindexer {
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class);
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen

public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question from above - why should this take a Flux in? What would be lost/what would the impact be if you took in a stream and adapted it within this method?

String targetUrl = "/" + indexName + "/_bulk";
HttpClient client = HttpClient.create()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split the http client into a separate class which may get reused by different operations

Copy link
Member Author

@chelma chelma Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to at this point, actually. While I would historically agree with you, I'm trying out a new approach on this project and been really happy with how it has worked out. Specifically - avoiding being too speculative about abstractions and letting the needs of the project shape what gets created. In this case, we only have one thing that needs this reactor-netty client, and I honestly don't know what interface I would provide if I were to carve it out because I don't know how another potential part of the code might use it. Avoiding speculation on past abstractions in this project's history has been one of the key things that has enabled me to make so much progress so fast.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if you need a separate HttpClient interface yet, but I do think that it might help & in general, you'll want to look to the future and not think about the past.
From my view, you've got some leaky abstractions with a couple other needless Flux contaminations within your codebase.
Once you do that, it will become harder to test your code too (less tests help us write application code faster too). If you want to write test code fast too - keep it as generic as you can with the cleanest interfaces that you can strive for. Simpler pieces -> smoother integrations -> faster delivery of quality solutions.

.host(targetConnection.hostName)
.port(targetConnection.port)
.headers(h -> {
h.set("Content-Type", "application/json");
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = targetConnection.username + ":" + targetConnection.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.set("Authorization", "Basic " + encodedCredentials);
}
});

documentStream
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
.subscribe(
response -> logger.info("Batch uploaded successfully"),
error -> logger.error("Failed to upload batch", error)
);
}

public static void reindex(String indexName, Document document, ConnectionDetails targetConnection) throws Exception {
// Get the document details
private static String convertDocumentToBulkSection(Document document) {
String id = Uid.decodeId(document.getBinaryValue("_id").bytes);
String source = document.getBinaryValue("_source").utf8ToString();
String action = "{\"index\": {\"_id\": \"" + id + "\"}}";

logger.info("Reindexing document - Index: " + indexName + ", Document ID: " + id);
return action + "\n" + source;
}

// Assemble the request details
String path = indexName + "/_doc/" + id;
String body = source;
private static String convertToBulkRequestBody(List<String> bulkSections) {
logger.info(bulkSections.size() + " documents in current bulk request");
StringBuilder builder = new StringBuilder();
for (String section : bulkSections) {
builder.append(section).append("\n");
}
return builder.toString();
}

// Send the request
RestClient client = new RestClient(targetConnection);
client.put(path, body, false);
private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) {
return client.post()
.uri(url)
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes())))
.responseSingle((res, content) ->
content.asString() // Convert the response content to a string
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object
)
.flatMap(responseDetails -> {
// Something bad happened with our request, log it
if (responseDetails.hasBadStatusCode()) {
logger.error(responseDetails.getFailureMessage());
}
// Some of the bulk operations failed, log it
else if (responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
}
return Mono.just(responseDetails);
})
.doOnError(err -> {
// We weren't even able to complete the request, log it
logger.error("Bulk request failed", err);
})
.then();
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}

static class BulkResponseDetails {
public final int statusCode;
public final String body;

BulkResponseDetails(int statusCode, String body) {
this.statusCode = statusCode;
this.body = body;
}

public boolean hasBadStatusCode() {
return !(statusCode == 200 || statusCode == 201);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}
}
74 changes: 49 additions & 25 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.rfs.common;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -11,40 +10,65 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import reactor.core.publisher.Flux;

public class LuceneDocumentsReader {
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class);

public static List<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception {
public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName, int shardId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does your LuceneDocumentReader now take a hard dependency on your HTTP client library?
It might be better to make this a collection or stream & then adapt later so that you can switch client implementations out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but it seems like this is how the Reactor framework wants to be used. I can see both the LuceneDocumentsReader and DocumentReindexer classes being implementation specific. So far it's paid off for me in this project not to speculate on stuff like this until there's a specific need.

Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId));

List<Document> documents = new ArrayList<>();
return Flux.using(
() -> openIndexReader(indexDirectoryPath),
reader -> {
logger.info(reader.maxDoc() + " documents found in the current Lucene index");

try (FSDirectory directory = FSDirectory.open(indexDirectoryPath);
IndexReader reader = DirectoryReader.open(directory)) {

// Add all documents to our output that have the _source field set and filled in
for (int i = 0; i < reader.maxDoc(); i++) {
Document document = reader.document(i);
BytesRef source_bytes = document.getBinaryValue("_source");
String id;
// TODO Improve handling of missing document id (https://opensearch.atlassian.net/browse/MIGRATIONS-1649)
return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader
.handle((i, sink) -> {
Document doc = getDocument(reader, i);
if (doc != null) { // Skip malformed docs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should at least log when doc == null (or whatever malformed documents that you might be skipping).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We log that in getDocument()

sink.next(doc);
}
}).cast(Document.class);
},
reader -> { // Close the IndexReader when done
try {
id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes);
} catch (Exception e) {
logger.warn("Unable to parse Document id from Document");
id = "unknown-id";
}
if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents
logger.info("Document " + id + " is deleted or doesn't have the _source field enabled");
continue;
reader.close();
} catch (IOException e) {
logger.error("Failed to close IndexReader", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it's probably a really bad exception. Why should the program keep running?
This seems like a spot where throw Lombok.sneakyThrow(e) would be a better option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question; probably does make sense to kill the process at this point. I realized just now that Reactor was unhappy with the fact that a checked exception was being thrown but I totally could have thrown an unchecked exception here or something.

}
}
);
}

documents.add(document);
logger.info("Document " + id + " read successfully");
protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException {
return DirectoryReader.open(FSDirectory.open(indexDirectoryPath));
}

protected Document getDocument(IndexReader reader, int docId) {
try {
Document document = reader.document(docId);
BytesRef source_bytes = document.getBinaryValue("_source");
String id;
try {
id = Uid.decodeId(document.getBinaryValue("_id").bytes);
} catch (Exception e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
logger.error(errorMessage.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.atError().setCause(e).setMessage(()->...).log() will do two more things for you. 1) get the exception and its backtrace into the logs and 2) use the fluent style, where everything within '...' will only be evaluated when you're logging that level. It can make your log statements tighter (all one statement rather than 4 as they are here) and much more efficient since work can often be elided. Even if you stay at warn/error, I'd like to routinely filter the repo for usages of immediate logging because its performance hit can be the single greatest impact on a program.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider PII for ERROR. I think that it's fair, but you should call it out... maybe PII possible loggers should have their own logger name convention so that operators could easily mask them out if necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.atError().setCause(e).setMessage(()->...).log()

Cool - will look into that for the future.

Consider PII for ERROR

I think we need to have a larger discussion around stuff like PII concerns, because I suspect they will impact many aspects of the implementation if we're designing to address them up front.

return null; // Skip documents with missing id
}
if (source_bytes == null || source_bytes.bytes.length == 0) {
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be better suited for info if this expected for deleted documents

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt (and I guess still feel) that warn is probably the right level. It's something that we should highlight the occurence of without being an error, per se.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one is it - can you tell the difference? Could the reader of the log tell the difference? Is there something in the beginning of the log that would give the user a clue?

If _source wasn't enabled, could this flood the logs?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that docId could have PII in it? The docId could be customer generated, right? Or are they only internal ids that are separately mapped to the customer-given ones?

If they're customer driven, I'd push this to debug to promote the policy that no PII could be shown for INFO and above logs. This feels like it isn't a great spot to be in. I'm hoping that there's a way to show an identifier without risking divulging a customer value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one is it - can you tell the difference?

I am not currently aware of how to tell the difference. We have a task to look into this more (see: https://opensearch.atlassian.net/browse/MIGRATIONS-1629)

Is there any chance that docId could have PII in it

The docId is an integer value used by Lucene to tell which Lucene Document in the Lucene Index is being referred to. The _id field of the Lucene Document is a user-set alphanumeric string, and so can contain whatever the user wants it to.

Regarding PII - that's a larger discussion for the team to have. I'll book a timeslot to discuss as a reminder.

return null; // Skip deleted documents or those without the _source field
}
}

return documents;
logger.debug("Document " + id + " read successfully");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atDebug().setMessage(()->...).log()

return document;
} catch (Exception e) {
logger.error("Failed to read document at Lucene index location " + docId, e);
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you be better off propagating the exception (or a new exception) upward? Catching Exception is probably going to eventually bite you too. I'd recommend making new checked exceptions, using Illegal*Exception from java's libraries, or using SneakyThrows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I'm honestly unsure how to handle this particular exception situation, so I decided to just log and circle back on it later. I don't know what you'd do with an exception for this at a higher level. You probably wouldn't kill the entire process, right? Maybe you kill yourself if you see a large number of the same exception? Seems like something for a later iteration.

}
}
}
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) {
}

public Response get(String path, boolean quietLogging) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be clearer if you passed an Slf4j.Level in here instead. Why would you want the caller to dictate the log level?
Why is the rest client even logging at all? I'd recommend that most of the logging in this PR get lifted further up closer to the top of the application rather than in lower-level libraries. It's fine to have for quick and dirty debugging, but things like errors/warnings will probably be (& should be) big impacts to the overall program, so it would make sense for them to pull all of the context together to show the ramifications.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense; I'll think about that for future iterations.

String urlString = connectionDetails.host + "/" + path;
String urlString = connectionDetails.url + "/" + path;

URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the plan is to deprecate this class, use the @deprecate annotation for it (before class RestClient) so that we know that the plan is to rally all of the code around one HTTP client solution. As it is, it's pretty confusing with 2 different clients within one codebase/PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we want to deprecate this class or not in the long run. I would assume so, but honestly the only place we really need to use the greater abilities of the reactor-netty client is for reindexing; this is fine elsewhere. For that reason, I left this in place for the time being.

Expand Down Expand Up @@ -76,7 +76,7 @@ public Response get(String path, boolean quietLogging) throws Exception {
}

public Response put(String path, String body, boolean quietLogging) throws Exception {
String urlString = connectionDetails.host + "/" + path;
String urlString = connectionDetails.url + "/" + path;

URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/common/Uid.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
* See: https://github.com/elastic/elasticsearch/blob/6.8/server/src/main/java/org/elasticsearch/index/mapper/Uid.java#L32
*/
public class Uid {
private static final int UTF8 = 0xff;
private static final int NUMERIC = 0xfe;
private static final int BASE64_ESCAPE = 0xfd;
public static final int UTF8 = 0xff;
public static final int NUMERIC = 0xfe;
public static final int BASE64_ESCAPE = 0xfd;
Comment on lines +13 to +15
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add another word or two to make these a bit more self-documenting? I have some guesses what they're for, but I'm probably wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't understand the *why* of these variables myself; per the comment above, this file is straight from the Elastic codebase.


private static String decodeNumericId(byte[] idBytes, int offset, int len) {
assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC;
Expand Down
Loading