From 9cdf46c9150e2a5d1052a9d5ecfc78c0f83f3992 Mon Sep 17 00:00:00 2001 From: Chris Helma <25470211+chelma@users.noreply.github.com> Date: Wed, 24 Apr 2024 11:43:40 -0500 Subject: [PATCH] RFS now uses reactor-netty for bulk indexing (#607) * Checkpoint: improved ConnectionDetails; unit tested it Signed-off-by: Chris Helma * RFS now uses reactor-netty and bulk indexing Signed-off-by: Chris Helma * Fixes per PR; unit tested LuceneDocumentsReader Signed-off-by: Chris Helma * Updated a unit test name Signed-off-by: Chris Helma * Updated a method name per PR feedback Signed-off-by: Chris Helma --------- Signed-off-by: Chris Helma --- RFS/build.gradle | 15 +++ .../java/com/rfs/ReindexFromSnapshot.java | 16 +-- .../com/rfs/common/ConnectionDetails.java | 41 ++++++- .../com/rfs/common/DocumentReindexer.java | 110 ++++++++++++++++-- .../com/rfs/common/LuceneDocumentsReader.java | 74 ++++++++---- .../main/java/com/rfs/common/RestClient.java | 4 +- RFS/src/main/java/com/rfs/common/Uid.java | 6 +- .../com/rfs/common/ConnectionDetailsTest.java | 54 +++++++++ .../rfs/common/LuceneDocumentsReaderTest.java | 95 +++++++++++++++ RFS/src/test/resources/log4j2-test.xml | 13 +++ 10 files changed, 376 insertions(+), 52 deletions(-) create mode 100644 RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java create mode 100644 RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java create mode 100644 RFS/src/test/resources/log4j2-test.xml diff --git a/RFS/build.gradle b/RFS/build.gradle index 092ad5897..2cc9c1477 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -37,8 +37,14 @@ dependencies { implementation 'org.apache.lucene:lucene-core:8.11.3' implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3' implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3' + + implementation platform('io.projectreactor:reactor-bom:2023.0.5') + implementation 'io.projectreactor.netty:reactor-netty-core' + implementation 'io.projectreactor.netty:reactor-netty-http' implementation 'software.amazon.awssdk:s3:2.25.16' + testImplementation 'io.projectreactor:reactor-test:3.6.5' + testImplementation 'org.apache.logging.log4j:log4j-core:2.23.1' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2' testImplementation 'org.mockito:mockito-core:5.11.0' @@ -120,4 +126,13 @@ tasks.getByName('composeUp') test { useJUnitPlatform() + + testLogging { + exceptionFormat = 'full' + events "failed" + showExceptions true + showCauses true + showStackTraces true + showStandardStreams = true + } } \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 1cd897ffc..f19c7aeea 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -14,6 +14,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import reactor.core.publisher.Flux; import com.rfs.common.*; import com.rfs.transformers.*; @@ -125,7 +126,7 @@ public static void main(String[] args) throws InterruptedException { Logging.setLevel(logLevel); - ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); + ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); // Sanity checks @@ -355,18 +356,13 @@ public static void main(String[] args) throws InterruptedException { for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ==="); - List documents = LuceneDocumentsReader.readDocuments(luceneDirPath, indexMetadata.getName(), shardId); - logger.info("Documents read successfully"); + Flux documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId); + String targetIndex = indexMetadata.getName() + indexSuffix; + DocumentReindexer.reindex(targetIndex, documents, targetConnection); - for (Document document : documents) { - String targetIndex = indexMetadata.getName() + indexSuffix; - DocumentReindexer.reindex(targetIndex, document, targetConnection); - } + logger.info("Shard reindexing completed"); } } - - logger.info("Documents reindexed successfully"); - logger.info("Refreshing newly added documents"); DocumentReindexer.refreshAllDocuments(targetConnection); logger.info("Refresh complete"); diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index c9c8b1b44..4e2edb7b9 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -1,5 +1,8 @@ package com.rfs.common; +import java.net.URI; +import java.net.URISyntaxException; + /** * Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster */ @@ -9,13 +12,21 @@ public static enum AuthType { NONE } - public final String host; + public static enum Protocol { + HTTP, + HTTPS + } + + public final String url; + public final Protocol protocol; + public final String hostName; + public final int port; public final String username; public final String password; public final AuthType authType; - public ConnectionDetails(String host, String username, String password) { - this.host = host; // http://localhost:9200 + public ConnectionDetails(String url, String username, String password) { + this.url = url; // http://localhost:9200 // If the username is provided, the password must be as well, and vice versa if ((username == null && password != null) || (username != null && password == null)) { @@ -28,5 +39,29 @@ public ConnectionDetails(String host, String username, String password) { this.username = username; this.password = password; + + if (url == null) { + hostName = null; + port = -1; + protocol = null; + } else { + URI uri; + try { + uri = new URI(url); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid URL format", e); + } + + hostName = uri.getHost(); + port = uri.getPort(); // Default port can be set here if -1 + + if (uri.getScheme().equals("http")) { + protocol = Protocol.HTTP; + } else if (uri.getScheme().equals("https")) { + protocol = Protocol.HTTPS; + } else { + throw new IllegalArgumentException("Invalid protocol"); + } + } } } diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 4a09a381c..6c84488e7 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -1,27 +1,90 @@ package com.rfs.common; +import java.time.Duration; +import java.util.Base64; +import java.util.List; + +import io.netty.buffer.Unpooled; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.document.Document; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.util.retry.Retry; public class DocumentReindexer { private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); + private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen + + public static void reindex(String indexName, Flux documentStream, ConnectionDetails targetConnection) throws Exception { + String targetUrl = "/" + indexName + "/_bulk"; + HttpClient client = HttpClient.create() + .host(targetConnection.hostName) + .port(targetConnection.port) + .headers(h -> { + h.set("Content-Type", "application/json"); + if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = targetConnection.username + ":" + targetConnection.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.set("Authorization", "Basic " + encodedCredentials); + } + }); + + documentStream + .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation + .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size + .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts + .flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) + .subscribe( + response -> logger.info("Batch uploaded successfully"), + error -> logger.error("Failed to upload batch", error) + ); + } - public static void reindex(String indexName, Document document, ConnectionDetails targetConnection) throws Exception { - // Get the document details + private static String convertDocumentToBulkSection(Document document) { String id = Uid.decodeId(document.getBinaryValue("_id").bytes); String source = document.getBinaryValue("_source").utf8ToString(); + String action = "{\"index\": {\"_id\": \"" + id + "\"}}"; - logger.info("Reindexing document - Index: " + indexName + ", Document ID: " + id); + return action + "\n" + source; + } - // Assemble the request details - String path = indexName + "/_doc/" + id; - String body = source; + private static String convertToBulkRequestBody(List bulkSections) { + logger.info(bulkSections.size() + " documents in current bulk request"); + StringBuilder builder = new StringBuilder(); + for (String section : bulkSections) { + builder.append(section).append("\n"); + } + return builder.toString(); + } - // Send the request - RestClient client = new RestClient(targetConnection); - client.put(path, body, false); + private static Mono sendBulkRequest(HttpClient client, String url, String bulkJson) { + return client.post() + .uri(url) + .send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes()))) + .responseSingle((res, content) -> + content.asString() // Convert the response content to a string + .map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object + ) + .flatMap(responseDetails -> { + // Something bad happened with our request, log it + if (responseDetails.hasBadStatusCode()) { + logger.error(responseDetails.getFailureMessage()); + } + // Some of the bulk operations failed, log it + else if (responseDetails.hasFailedOperations()) { + logger.error(responseDetails.getFailureMessage()); + } + return Mono.just(responseDetails); + }) + .doOnError(err -> { + // We weren't even able to complete the request, log it + logger.error("Bulk request failed", err); + }) + .then(); } public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { @@ -29,4 +92,33 @@ public static void refreshAllDocuments(ConnectionDetails targetConnection) throw RestClient client = new RestClient(targetConnection); client.get("_refresh", false); } + + static class BulkResponseDetails { + public final int statusCode; + public final String body; + + BulkResponseDetails(int statusCode, String body) { + this.statusCode = statusCode; + this.body = body; + } + + public boolean hasBadStatusCode() { + return !(statusCode == 200 || statusCode == 201); + } + + public boolean hasFailedOperations() { + return body.contains("\"errors\":true"); + } + + public String getFailureMessage() { + String failureMessage; + if (hasBadStatusCode()) { + failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body; + } else { + failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body; + } + + return failureMessage; + } + } } diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index 6f17cd9a1..d8698c2e2 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -1,8 +1,7 @@ package com.rfs.common; +import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -11,40 +10,65 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; +import reactor.core.publisher.Flux; public class LuceneDocumentsReader { private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); - public static List readDocuments(Path luceneFilesBasePath, String indexName, int shardId) throws Exception { + public Flux readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId)); - List documents = new ArrayList<>(); + return Flux.using( + () -> openIndexReader(indexDirectoryPath), + reader -> { + logger.info(reader.maxDoc() + " documents found in the current Lucene index"); - try (FSDirectory directory = FSDirectory.open(indexDirectoryPath); - IndexReader reader = DirectoryReader.open(directory)) { - - // Add all documents to our output that have the _source field set and filled in - for (int i = 0; i < reader.maxDoc(); i++) { - Document document = reader.document(i); - BytesRef source_bytes = document.getBinaryValue("_source"); - String id; - // TODO Improve handling of missing document id (https://opensearch.atlassian.net/browse/MIGRATIONS-1649) + return Flux.range(0, reader.maxDoc()) // Extract all the Documents in the IndexReader + .handle((i, sink) -> { + Document doc = getDocument(reader, i); + if (doc != null) { // Skip malformed docs + sink.next(doc); + } + }).cast(Document.class); + }, + reader -> { // Close the IndexReader when done try { - id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes); - } catch (Exception e) { - logger.warn("Unable to parse Document id from Document"); - id = "unknown-id"; - } - if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents - logger.info("Document " + id + " is deleted or doesn't have the _source field enabled"); - continue; + reader.close(); + } catch (IOException e) { + logger.error("Failed to close IndexReader", e); } + } + ); + } - documents.add(document); - logger.info("Document " + id + " read successfully"); + protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException { + return DirectoryReader.open(FSDirectory.open(indexDirectoryPath)); + } + + protected Document getDocument(IndexReader reader, int docId) { + try { + Document document = reader.document(docId); + BytesRef source_bytes = document.getBinaryValue("_source"); + String id; + try { + id = Uid.decodeId(document.getBinaryValue("_id").bytes); + } catch (Exception e) { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); + document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); + logger.error(errorMessage.toString()); + return null; // Skip documents with missing id + } + if (source_bytes == null || source_bytes.bytes.length == 0) { + logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled"); + return null; // Skip deleted documents or those without the _source field } - } - return documents; + logger.debug("Document " + id + " read successfully"); + return document; + } catch (Exception e) { + logger.error("Failed to read document at Lucene index location " + docId, e); + return null; + } } } diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 0a4ad1849..55bf36ea4 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -34,7 +34,7 @@ public RestClient(ConnectionDetails connectionDetails) { } public Response get(String path, boolean quietLogging) throws Exception { - String urlString = connectionDetails.host + "/" + path; + String urlString = connectionDetails.url + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -76,7 +76,7 @@ public Response get(String path, boolean quietLogging) throws Exception { } public Response put(String path, String body, boolean quietLogging) throws Exception { - String urlString = connectionDetails.host + "/" + path; + String urlString = connectionDetails.url + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); diff --git a/RFS/src/main/java/com/rfs/common/Uid.java b/RFS/src/main/java/com/rfs/common/Uid.java index 22e624f6d..fcec17a30 100644 --- a/RFS/src/main/java/com/rfs/common/Uid.java +++ b/RFS/src/main/java/com/rfs/common/Uid.java @@ -10,9 +10,9 @@ * See: https://github.com/elastic/elasticsearch/blob/6.8/server/src/main/java/org/elasticsearch/index/mapper/Uid.java#L32 */ public class Uid { - private static final int UTF8 = 0xff; - private static final int NUMERIC = 0xfe; - private static final int BASE64_ESCAPE = 0xfd; + public static final int UTF8 = 0xff; + public static final int NUMERIC = 0xfe; + public static final int BASE64_ESCAPE = 0xfd; private static String decodeNumericId(byte[] idBytes, int offset, int len) { assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC; diff --git a/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java new file mode 100644 index 000000000..b76710bf4 --- /dev/null +++ b/RFS/src/test/java/com/rfs/common/ConnectionDetailsTest.java @@ -0,0 +1,54 @@ +package com.rfs.common; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + + +public class ConnectionDetailsTest { + static Stream happyPathArgs() { + return Stream.of( + Arguments.of("https://localhost:9200", "username", "pass", "https://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTPS, "localhost", 9200), + Arguments.of("http://localhost:9200", "username", "pass", "http://localhost:9200", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), + Arguments.of("http://localhost:9200", null, null, "http://localhost:9200", null, null, ConnectionDetails.Protocol.HTTP, "localhost", 9200), + Arguments.of("http://localhost", "username", "pass", "http://localhost", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", -1), + Arguments.of("http://localhost:9200/longer/path", "username", "pass", "http://localhost:9200/longer/path", "username", "pass", ConnectionDetails.Protocol.HTTP, "localhost", 9200), + Arguments.of(null, "username", "pass", null, "username", "pass", null, null, -1) + ); + } + + @ParameterizedTest + @MethodSource("happyPathArgs") + void ConnectionDetails_HappyPath_AsExpected(String url, String username, String password, + String expectedUrl, String expectedUsername, String expectedPassword, + ConnectionDetails.Protocol expectedProtocal, String expectedHostName, int expectedPort) { + ConnectionDetails details = new ConnectionDetails(url, username, password); + assertEquals(expectedUrl, details.url); + assertEquals(expectedUsername, details.username); + assertEquals(expectedPassword, details.password); + assertEquals(expectedProtocal, details.protocol); + assertEquals(expectedHostName, details.hostName); + assertEquals(expectedPort, details.port); + } + + static Stream unhappyPathArgs() { + return Stream.of( + Arguments.of("localhost:9200", "username", "pass", IllegalArgumentException.class), + Arguments.of("http://localhost:9200", "username", null, IllegalArgumentException.class), + Arguments.of("http://localhost:9200", null, "pass", IllegalArgumentException.class), + Arguments.of("ftp://localhost:9200", null, "pass", IllegalArgumentException.class) + ); + } + + @ParameterizedTest + @MethodSource("unhappyPathArgs") + void ConnectionDetails_UnhappyPath_AsExpected(String url, String username, String password, + Class expectedException) { + assertThrows(expectedException, () -> new ConnectionDetails(url, username, password)); + } +} diff --git a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java new file mode 100644 index 000000000..e28daff81 --- /dev/null +++ b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java @@ -0,0 +1,95 @@ +package com.rfs.common; + +import static org.mockito.Mockito.*; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.BytesRef; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.file.Paths; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; + +class TestLuceneDocumentsReader extends LuceneDocumentsReader { + // Helper method to correctly encode the Document IDs for test + public static byte[] encodeUtf8Id(String id) { + byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); + byte[] encoded = new byte[idBytes.length + 1]; + encoded[0] = (byte) Uid.UTF8; + System.arraycopy(idBytes, 0, encoded, 1, idBytes.length); + return encoded; + } + + // Override to return a mocked IndexReader + @Override + protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException { + // Set up our test docs + Document doc1 = new Document(); + doc1.add(new StringField("_id", new BytesRef(encodeUtf8Id("id1")), Field.Store.YES)); + doc1.add(new StoredField("_source", new BytesRef("source1"))); + + Document doc2 = new Document(); + doc2.add(new StringField("_id", new BytesRef(encodeUtf8Id("id2")), Field.Store.YES)); + doc2.add(new StoredField("_source", new BytesRef("source2"))); + + Document doc3 = new Document(); + doc3.add(new StringField("_id", new BytesRef(encodeUtf8Id("id3")), Field.Store.YES)); + doc3.add(new StoredField("_source", new BytesRef("source3"))); + + Document doc4 = new Document(); // Doc w/ no fields + + Document doc5 = new Document(); // Doc w/ missing _source + doc5.add(new StringField("_id", new BytesRef(encodeUtf8Id("id5")), Field.Store.YES)); + + // Set up our mock reader + IndexReader mockReader = mock(IndexReader.class); + when(mockReader.maxDoc()).thenReturn(5); + when(mockReader.document(0)).thenReturn(doc1); + when(mockReader.document(1)).thenReturn(doc2); + when(mockReader.document(2)).thenReturn(doc3); + when(mockReader.document(3)).thenReturn(doc4); + when(mockReader.document(4)).thenReturn(doc5); + + return mockReader; + } +} + +public class LuceneDocumentsReaderTest { + @Test + void ReadDocuments_AsExpected() { + // Use the TestLuceneDocumentsReader to get the mocked documents + Flux documents = new TestLuceneDocumentsReader().readDocuments(Paths.get("/fake/path"), "testIndex", 1); + + // Verify that the results are as expected + StepVerifier.create(documents) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id1".equals(testId) && "source1".equals(testSource); + }) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id2".equals(testId) && "source2".equals(testSource); + }) + .expectNextMatches(doc -> { + String testId = Uid.decodeId(doc.getBinaryValue("_id").bytes); + String testSource = doc.getBinaryValue("_source").utf8ToString(); + return "id3".equals(testId) && "source3".equals(testSource); + }) + .expectComplete() + .verify(); + } +} + + + + + diff --git a/RFS/src/test/resources/log4j2-test.xml b/RFS/src/test/resources/log4j2-test.xml new file mode 100644 index 000000000..5940527db --- /dev/null +++ b/RFS/src/test/resources/log4j2-test.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + +