Skip to content

Commit

Permalink
Merge branch 'rt2ofl-npe-fix' into hypertrace-0.7.1
Browse files Browse the repository at this point in the history
  • Loading branch information
laxmanchekka committed May 24, 2021
2 parents 5e724e9 + b2b65a6 commit 24ebc6d
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.core.segment.creator.StatsCollectorConfig;
import org.apache.pinot.core.segment.index.converter.SegmentFormatConverter;
import org.apache.pinot.core.segment.index.converter.SegmentFormatConverterFactory;
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.core.segment.store.SegmentDirectoryPaths;
import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder;
import org.apache.pinot.core.util.CrcUtils;
Expand Down Expand Up @@ -217,6 +218,8 @@ public void build()
recordReadStopTime = System.currentTimeMillis();
totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
// LOGGER.warn("================================");
// LOGGER.warn("Indexer: pinot record: {}", transformedRow.toString());
indexCreator.indexRow(transformedRow);
indexStopTime = System.currentTimeMillis();
totalIndexTime += (indexStopTime - recordReadStopTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.data.recordtransformer.NullValueTransformer;
import org.apache.pinot.core.segment.processing.filter.RecordFilter;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class SegmentMapper {

private final String _mapperId;
private final Schema _avroSchema;
private final org.apache.pinot.core.data.recordtransformer.RecordTransformer _nullTransformer;
private final RecordTransformer _recordTransformer;
private final RecordFilter _recordFilter;
private final int _numPartitioners;
Expand All @@ -75,6 +77,7 @@ public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig map
_mapperId = mapperId;
_avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
_recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
_nullTransformer = new NullValueTransformer(mapperConfig.getPinotSchema());
_recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
for (PartitionerConfig partitionerConfig : mapperConfig.getPartitionerConfigs()) {
_partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
Expand All @@ -99,6 +102,7 @@ public void map()
String[] partitions = new String[_numPartitioners];

while (segmentRecordReader.hasNext()) {
reusableRow.clear();
reusableRow = segmentRecordReader.next(reusableRow);

// Record filtering
Expand All @@ -107,6 +111,7 @@ public void map()
}

// Record transformation
reusableRow = _nullTransformer.transform(reusableRow);
reusableRow = _recordTransformer.transformRecord(reusableRow);

// Partitioning
Expand All @@ -129,7 +134,10 @@ public void map()
}

// Write record to avro file for its partition
//LOGGER.warn("================================");
SegmentProcessorUtils.convertGenericRowToAvroRecord(reusableRow, reusableRecord);
// LOGGER.warn("Mapper: pinot record: {}", reusableRow.toString());
// LOGGER.warn("Mapper: avro record: {}", reusableRecord.toString());
recordWriter.append(reusableRecord);

reusableRow.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class SegmentReducer {
private final Collector _collector;
private final int _numRecordsPerPart;


public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
File reducerOutputDir) {
_reducerInputDir = reducerInputDir;
Expand Down Expand Up @@ -114,7 +115,11 @@ private void flushRecords(Collector collector, String fileName)
DataFileWriter<GenericData.Record> recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema));
recordWriter.create(_avroSchema, new File(_reducerOutputDir, fileName));
while (collectionIt.hasNext()) {
SegmentProcessorUtils.convertGenericRowToAvroRecord(collectionIt.next(), reusableRecord);
GenericRow reusableRow = collectionIt.next();
// LOGGER.warn("================================");
SegmentProcessorUtils.convertGenericRowToAvroRecord(reusableRow, reusableRecord);
// LOGGER.warn("Reducer: pinot record: {}", reusableRow.toString());
// LOGGER.warn("Reducer: avro record: {}", reusableRecord.toString());
recordWriter.append(reusableRecord);
}
recordWriter.close();
Expand Down
Loading

0 comments on commit 24ebc6d

Please sign in to comment.