From 10b02dcc5adff268789a3294532f01b404ea21dd Mon Sep 17 00:00:00 2001 From: Ron DeFreitas Date: Fri, 13 Oct 2017 14:46:44 -0400 Subject: [PATCH] adds support for reading schema from the avro file. --- .../AvroExtractorTest.cs | 59 ++++++++++++++++++- .../Avro/AvroExtractor.cs | 36 ++++++++--- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/AvroExtractorTest.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/AvroExtractorTest.cs index 8161d88..5d6ef69 100644 --- a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/AvroExtractorTest.cs +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/AvroExtractorTest.cs @@ -3,6 +3,7 @@ using Microsoft.Analytics.UnitTest; using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Text; @@ -73,6 +74,15 @@ public IRow SingleColumnRowGenerator() return new USqlRow(schema, null); } + public IRow DualColumnRowGenerator() + { + var foo = new USqlColumn("Value"); + var bar = new USqlColumn("Value2"); + var columns = new List { foo, bar }; + var schema = new USqlSchema(columns); + return new USqlRow(schema, null); + } + [TestMethod] public void AvroExtractor_DatatypeInt_Extracted() { @@ -89,6 +99,25 @@ public void AvroExtractor_DatatypeInt_Extracted() Assert.IsTrue(result[1].Get("Value") == 0); } + + [TestMethod] + public void AvroExtractor_DatatypeInt_Extracted_Using_Internal_Schema_Flag() + { + var schema = @"{""type"":""record"",""name"":""SingleColumnPoco"",""fields"":[{""name"":""Value2"",""type"":""int""},{""name"":""Value"",""type"": ""int"",""default"":""0"" }]}"; + var data = new List> + { + new SingleColumnPoco() { Value = 1 }, + new SingleColumnPoco() { Value = 0 }, + }; + + var result = ExecuteExtract(data, schema, true); + + Assert.IsTrue(result[0].Get("Value") == 1); + Assert.IsTrue(result[0].Get("Value2") == 0); + Assert.IsTrue(result[1].Get("Value") == 0); + Assert.IsTrue(result[1].Get("Value2") == 0); + } + [TestMethod] public void AvroExtractor_DatatypeNullableInt_Extracted() { @@ -316,7 +345,7 @@ public void AvroExtractor_EmptyFile_ReturnNoRow() Assert.IsTrue(result.Count == 0); } - private IList ExecuteExtract(List> data, string schema) + private IList ExecuteExtract(List> data, string schema, bool autoSchemaExtract = false) { var output = SingleColumnRowGenerator().AsUpdatable(); @@ -325,7 +354,21 @@ private IList ExecuteExtract(List> data, string sch serializeAvro(dataStream, data, schema); var reader = new USqlStreamReader(dataStream); - var extractor = new AvroExtractor(schema); + var extractor = new AvroExtractor(schema, autoSchemaExtract); + return extractor.Extract(reader, output).ToList(); + } + } + + private IList ExecuteExtract(List> data, string schema, bool autoSchemaExtract = false) + { + var output = DualColumnRowGenerator().AsUpdatable(); + + using (var dataStream = new MemoryStream()) + { + serializeAvro(dataStream, data, schema); + + var reader = new USqlStreamReader(dataStream); + var extractor = new AvroExtractor(schema, autoSchemaExtract); return extractor.Extract(reader, output).ToList(); } } @@ -333,16 +376,26 @@ private IList ExecuteExtract(List> data, string sch private void serializeAvro(MemoryStream dataStream, List> data, string schema) { var avroSchema = Schema.Parse(schema); + var recordSchema = avroSchema as RecordSchema; + + Debug.Assert(recordSchema != null, "recordSchema != null"); + var writer = new GenericWriter(avroSchema); var fileWriter = DataFileWriter.OpenWriter(writer, dataStream); var encoder = new BinaryEncoder(dataStream); foreach (SingleColumnPoco record in data) { - var genericRecord = new GenericRecord(avroSchema as RecordSchema); + var genericRecord = new GenericRecord(recordSchema); genericRecord.Add("Value", record.Value); + // some tests use value2 field + if (recordSchema.Fields.Exists(x => x.Name == "Value2")) + { + genericRecord.Add("Value2", 0); + } + fileWriter.Append(genericRecord); } diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/AvroExtractor.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/AvroExtractor.cs index 0ee5074..a6fa922 100644 --- a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/AvroExtractor.cs +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/AvroExtractor.cs @@ -25,25 +25,47 @@ namespace Microsoft.Analytics.Samples.Formats.ApacheAvro public class AvroExtractor : IExtractor { private string avroSchema; + private bool mapToInternalSchema; - public AvroExtractor(string avroSchema) + public AvroExtractor(string avroSchema, bool mapToInternalSchema = false) { this.avroSchema = avroSchema; + this.mapToInternalSchema = mapToInternalSchema; } public override IEnumerable Extract(IUnstructuredReader input, IUpdatableRow output) { - var avschema = Avro.Schema.Parse(avroSchema); - var reader = new GenericDatumReader(avschema, avschema); + Avro.Schema avschema = null; + + if (!string.IsNullOrWhiteSpace(avroSchema)) + { + avschema = Avro.Schema.Parse(avroSchema); + } + + IFileReader fileReader = null; using (var ms = new MemoryStream()) { CreateSeekableStream(input, ms); ms.Position = 0; + + var foundSchema = false; - var fileReader = DataFileReader.OpenReader(ms, avschema); + if (mapToInternalSchema) + { + fileReader = DataFileReader.OpenReader(ms); + var schema = fileReader.GetSchema(); + + foundSchema = schema != null; + } - while (fileReader.HasNext()) + if (!foundSchema) + { + ms.Position = 0; + fileReader = DataFileReader.OpenReader(ms, avschema); + } + + while (fileReader?.HasNext() == true) { var avroRecord = fileReader.Next(); @@ -57,9 +79,9 @@ public override IEnumerable Extract(IUnstructuredReader input, IUpdatableR { output.Set(column.Name, null); } - - yield return output.AsReadOnly(); } + + yield return output.AsReadOnly(); } } }