Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-3031: Support to transfer input stream when building ParquetFileReader #3030

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

turboFei
Copy link
Member

@turboFei turboFei commented Oct 10, 2024

Rationale for this change

Support to transfer the parquet file inputstream when building the ParquetFileReader, so that we can re-use the existing inputstream and reduce the open file rpcs.

What changes are included in this PR?

As title.

Are these changes tested?

Existing UT. It only a new constructors.

Are there any user-facing changes?

No break change.

Closes #3031

@turboFei turboFei changed the title Support to transfer input stream when build ParquetFileReader PARQUET-3031: Support to transfer input stream when build ParquetFileReader Oct 10, 2024
@turboFei turboFei changed the title PARQUET-3031: Support to transfer input stream when build ParquetFileReader PARQUET-3031: Support to transfer input stream when building ParquetFileReader Oct 10, 2024
@parthchandra
Copy link
Contributor

On many file systems, a seek backwards to read the data after reading the footer results in slower reads because the fs switches from a sequential read to a random read (which typically turns off pre-fetching and other optimizations enabled in sequential reads).
It might be worth considering if reusing the stream is worth it.

@turboFei
Copy link
Member Author

Thanks parthchandra for the comments.

For our company internal managed spark, we reuse the inputstream for parquet file.

Before that:

A spark task will open the file multiple times to read footer and data.

When the HDFS nameNode is under high pressure, it will cost time.

After that, it only open the parquet file for one time.

@turboFei
Copy link
Member Author

This is the testing 3years ago on Spark-2.3.

image It reduces 3/2 hdfs RPC requests to namenode.

And after this Spark patch in community [https://github.com/apache/spark/pull/39950]([SPARK-42388][SQL] Avoid parquet footer reads twice in vectorized reader), the solution might reduce 1/2 hdfs RPC requests.

@wgtmac
Copy link
Member

wgtmac commented Oct 11, 2024

It looks reasonable to me and users can choose their best fit.

cc @gszadovszky @steveloughran

Copy link
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This can significantly reduce the number of NameNode RPCs to improve performance.

Benchmark:

select count(*) from tbl where id > -999;
The No. of RPCs with vanilla Parquet and Spark The No. of RPCs with this PR and apply a Spark patch
179997 60007

Spark side code:

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index fee80e97bd2..8e0c68cc71a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -28,6 +28,10 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -60,6 +64,11 @@
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.util.AccumulatorV2;
 
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
 /**
  * Base class for custom RecordReaders for Parquet that directly materialize to `T`.
  * This class handles computing row groups, filtering on them, setting up the column readers,
@@ -69,6 +78,8 @@
  * this way, albeit at a higher cost to implement. This base class is reusable.
  */
 public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Void, T> {
+  private static final Logger LOG = LoggerFactory.getLogger(SpecificParquetRecordReaderBase.class);
+
   protected Path file;
   protected MessageType fileSchema;
   protected MessageType requestedSchema;
@@ -87,6 +98,8 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
 
   protected ParquetRowGroupReader reader;
   protected OpenedParquetFileInfo openedFileInfo;
+  // indicate whether the input can be skip or not
+  protected boolean canSkip = false;
 
   public SpecificParquetRecordReaderBase(OpenedParquetFileInfo openedFileInfo) {
     this.openedFileInfo = openedFileInfo;
@@ -106,16 +119,42 @@ public void initialize(
     FileSplit split = (FileSplit) inputSplit;
     this.file = split.getPath();
     ParquetFileReader fileReader;
-    if (fileFooter.isDefined()) {
-      fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
+    ParquetMetadata footer;
+    List<BlockMetaData> blocks;
+
+    if (openedFileInfo != null && openedFileInfo.getFileFooter() != null) {
+      footer = openedFileInfo.getFileFooter();
     } else {
-      ParquetReadOptions options = HadoopReadOptions
-          .builder(configuration, file)
-          .withRange(split.getStart(), split.getStart() + split.getLength())
-          .build();
+      LOG.info("Parquet read footer...");
+      footer = readFooter(configuration, file, range(split.getStart(),
+          split.getStart() + split.getLength()));
+    }
+
+    MessageType fileSchema = footer.getFileMetaData().getSchema();
+    FilterCompat.Filter filter = getFilter(configuration);
+    blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+
+    // can skip this file, so the following initiate actions are not required
+    if (blocks.isEmpty()) {
+      canSkip = true;
+      return;
+    }
+
+    Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
+    ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
+    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+        taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
+    this.requestedSchema = readContext.getRequestedSchema();
+
+    if (openedFileInfo != null && openedFileInfo.getOpenedStream() != null) {
       fileReader = new ParquetFileReader(
-          HadoopInputFile.fromPath(file, configuration), options);
+          configuration, footer.getFileMetaData(), openedFileInfo.getOpenedStream(),
+          openedFileInfo.getInputFile(), blocks, requestedSchema.getColumns());
+    } else {
+      fileReader = new ParquetFileReader(
+          configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
     }
+
     this.reader = new ParquetRowGroupReaderImpl(fileReader);
     this.fileSchema = fileReader.getFileMetaData().getSchema();
     try {
@@ -124,11 +163,7 @@ public void initialize(
       // Swallow any exception, if we cannot parse the version we will revert to a sequential read
       // if the column is a delta byte array encoding (due to PARQUET-246).
     }
-    Map<String, String> fileMetadata = fileReader.getFileMetaData().getKeyValueMetaData();
-    ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
-    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
-        taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
-    this.requestedSchema = readContext.getRequestedSchema();
+
     fileReader.setRequestedSchema(requestedSchema);
     String sparkRequestedSchemaString =
         configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 14995510489..0ae08d5490d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -197,6 +197,9 @@ public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     super.initialize(inputSplit, taskAttemptContext);
+    if (canSkip) {
+      return;
+    }
     initializeInternal();
   }
 
@@ -207,6 +210,9 @@ public void initialize(
       Option<ParquetMetadata> fileFooter)
       throws IOException, InterruptedException, UnsupportedOperationException {
     super.initialize(inputSplit, taskAttemptContext, fileFooter);
+    if (canSkip) {
+      return;
+    }
     initializeInternal();
   }
 
@@ -218,6 +224,9 @@ public void initialize(
   public void initialize(String path, List<String> columns) throws IOException,
       UnsupportedOperationException {
     super.initialize(path, columns);
+    if (canSkip) {
+      return;
+    }
     initializeInternal();
   }
 
@@ -229,6 +238,9 @@ public void initialize(
       ParquetRowGroupReader rowGroupReader,
       int totalRowCount) throws IOException {
     super.initialize(fileSchema, requestedSchema, rowGroupReader, totalRowCount);
+    if (canSkip) {
+      return;
+    }
     initializeInternal();
   }
 
@@ -243,6 +255,9 @@ public void close() throws IOException {
 
   @Override
   public boolean nextKeyValue() throws IOException {
+    if (canSkip) {
+      return false;
+    }
     resultBatch();
 
     if (returnColumnarBatch) return nextBatch();
@@ -318,6 +333,9 @@ private void initBatch() {
   }
 
   public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
+    if (canSkip) {
+      return;
+    }
     initBatch(MEMORY_MODE, partitionColumns, partitionValues);
   }
 
@@ -342,6 +360,9 @@ public void enableReturningBatches() {
    * Advances to the next batch of rows. Returns false if there are no more.
    */
   public boolean nextBatch() throws IOException {
+    if (canSkip) {
+      return false;
+    }
     for (ParquetColumnVector vector : columnVectors) {
       vector.reset();
     }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support to transfer the parquet file SeekableInputStream when build ParquetFileReader
5 participants