diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h index 1e6ee9a1d32ff..fbc1e6d23e7c4 100644 --- a/cpp/src/arrow/array/data.h +++ b/cpp/src/arrow/array/data.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "arrow/array/statistics.h" #include "arrow/buffer.h" @@ -232,6 +233,20 @@ struct ARROW_EXPORT ArrayData { return null_count.load() != length; } + Status AlignBuffers() { + // align buffers according to their data type's layout + for (auto&& [buffer, layout] : internal::Zip(buffers, type->layout().buffers)) { + if (layout.kind == DataTypeLayout::FIXED_WIDTH && buffer->address() % layout.byte_width) { + RETURN_NOT_OK(Buffer::Copy(buffer, buffer->memory_manager()).Value(&buffer)); + } + } + // align children data recursively + for (unsigned int i=0; iAlignBuffers()); + } + return Status::OK(); + } + // Access a buffer's data as a typed C pointer template inline const T* GetValues(int i, int64_t absolute_offset) const { diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 48b6758212bd5..e6a690b3590d2 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -161,6 +161,11 @@ struct ARROW_EXPORT IpcReadOptions { /// RecordBatchStreamReader and StreamDecoder classes. bool ensure_native_endian = true; + /// \brief Whether to align incoming data if mis-aligned + /// + /// Received mis-aligned data is copied to aligned memory locations. + bool ensure_memory_alignment = true; + /// \brief Options to control caching behavior when pre-buffering is requested /// /// The lazy property will always be reset to true to deliver the expected behavior diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 98214c1debb86..8b1861a39a748 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -601,6 +601,9 @@ Result> LoadRecordBatchSubset( return Status::IOError("Array length did not match record batch length"); } columns[i] = std::move(column); + if (context.options.ensure_memory_alignment) { + RETURN_NOT_OK(columns[i]->AlignBuffers()); + } if (inclusion_mask) { filtered_columns.push_back(columns[i]); filtered_fields.push_back(schema->field(i)); diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 8e6922a912a32..c9c01023b5942 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1810,6 +1810,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: vector[int] included_fields c_bool use_threads c_bool ensure_native_endian + c_bool ensure_memory_alignment @staticmethod CIpcReadOptions Defaults() diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index e15b0ea40ed2e..60f0ef61dfcaa 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -120,6 +120,8 @@ cdef class IpcReadOptions(_Weakrefable): ---------- ensure_native_endian : bool, default True Whether to convert incoming data to platform-native endianness. + ensure_memory_alignment : bool, default True + Whether to align incoming data if mis-aligned. use_threads : bool Whether to use the global CPU thread pool to parallelize any computational tasks like decompression @@ -133,9 +135,11 @@ cdef class IpcReadOptions(_Weakrefable): # cdef block is in lib.pxd def __init__(self, *, bint ensure_native_endian=True, + bint ensure_memory_alignment=True, bint use_threads=True, list included_fields=None): self.c_options = CIpcReadOptions.Defaults() self.ensure_native_endian = ensure_native_endian + self.ensure_memory_alignment = ensure_memory_alignment self.use_threads = use_threads if included_fields is not None: self.included_fields = included_fields @@ -148,6 +152,14 @@ cdef class IpcReadOptions(_Weakrefable): def ensure_native_endian(self, bint value): self.c_options.ensure_native_endian = value + @property + def ensure_memory_alignment(self): + return self.c_options.ensure_memory_alignment + + @ensure_memory_alignment.setter + def ensure_memory_alignment(self, bint value): + self.c_options.ensure_memory_alignment = value + @property def use_threads(self): return self.c_options.use_threads diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 4be5792a92f6d..11f4e8ec2f3ec 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -548,11 +548,16 @@ def test_read_options(): options = pa.ipc.IpcReadOptions() assert options.use_threads is True assert options.ensure_native_endian is True + assert options.ensure_memory_alignment is True + assert options.ens is True assert options.included_fields == [] options.ensure_native_endian = False assert options.ensure_native_endian is False + options.ensure_memory_alignment = False + assert options.ensure_memory_alignment is False + options.use_threads = False assert options.use_threads is False @@ -564,10 +569,11 @@ def test_read_options(): options = pa.ipc.IpcReadOptions( use_threads=False, ensure_native_endian=False, - included_fields=[1] + ensure_memory_alignment=False, included_fields=[1] ) assert options.use_threads is False assert options.ensure_native_endian is False + assert options.ensure_memory_alignment is False assert options.included_fields == [1]