Skip to content

Commit

Permalink
Fix rebase errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
rainwoodman committed Jun 29, 2020
1 parent b8ec1f5 commit b89afec
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
9 changes: 6 additions & 3 deletions bigfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from .pyxbigfile import ColumnLowLevelAPI
from .pyxbigfile import FileLowLevelAPI
from .pyxbigfile import set_buffer_size
from . import pyxbigfile
from functools import wraps

import os
import numpy
Expand Down Expand Up @@ -243,6 +245,10 @@ def create(self, f, blockname, dtype=None, size=None, Nfile=1):
super(ColumnMPI, self).close()
return self.open(f, blockname)

@_enhance_getslice
def __getitem__(self, index):
return Column.__getitem__(self, index)

def open(self, f, blockname):
if not check_unique(blockname, self.comm):
raise BigFileError("blockname is inconsistent between ranks")
Expand Down Expand Up @@ -429,9 +435,6 @@ def __getitem__(self, sl):
elif isstrlist(sl):
assert all([(col in self.dtype.names) for col in sl])
return type(self)(self.file, sl)
elif numpy.isscalar(sl):
sl = slice(sl, sl + 1)
return self[sl][0]
else:
return self._getslice(sl)

Expand Down
92 changes: 91 additions & 1 deletion bigfile/pyxbigfile.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ except NameError:
return isinstance(s, str)


cdef extern from "bigfile.c":
cdef extern from "bigfile.h":
ctypedef void * CBigFileStream "BigFileStream"

struct CBigFileMethods "BigFileMethods":
Expand Down Expand Up @@ -784,3 +784,93 @@ cdef class ColumnLowLevelAPI:
except:
return "<CBigBlock is invalid>"


cdef class Dataset:
cdef CBigRecordType rtype
cdef readonly FileLowLevelAPI file
cdef readonly size_t size
cdef readonly tuple shape
cdef readonly numpy.dtype dtype

def __init__(self, file, dtype, size):
self.file = file
self.rtype.nfield = 0
big_record_type_clear(&self.rtype)
fields = []

for i, name in enumerate(dtype.names):
basedtype = dtype[name].base.str.encode()
nmemb = int(numpy.prod(dtype[name].shape))

big_record_type_set(&self.rtype, i,
name.encode(),
basedtype,
nmemb,
)
big_record_type_complete(&self.rtype)

self.size = size
self.ndim = 1
self.shape = (size, )

dtype = []
# No need to use offset, because numpy is also
# compactly packed
for i in range(self.rtype.nfield):
if self.rtype.fields[i].nmemb == 1:
shape = 1
else:
shape = (self.rtype.fields[i].nmemb, )
dtype.append((
self.rtype.fields[i].name.decode(),
self.rtype.fields[i].dtype,
shape)
)
self.dtype = numpy.dtype(dtype, align=False)
assert self.dtype.itemsize == self.rtype.itemsize

def read(self, numpy.intp_t start, numpy.intp_t length, numpy.ndarray out=None):
if out is None:
out = numpy.empty(length, self.dtype)
with nogil:
rt = big_file_read_records(&self.file.bf, &self.rtype, start, length, out.data)
if rt != 0:
raise Error()
return out

def _create_records(self, numpy.intp_t size, numpy.intp_t Nfile=1, char * mode=b"w+"):
""" mode can be a+ or w+."""
cdef numpy.ndarray fsize

if Nfile < 0:
raise ValueError("Cannot create negative number of files.")
if Nfile == 0 and size != 0:
raise ValueError("Cannot create zero files for non-zero number of items.")

fsize = numpy.empty(dtype='intp', shape=Nfile)
fsize[:] = (numpy.arange(Nfile) + 1) * size // Nfile \
- (numpy.arange(Nfile)) * size // Nfile

with nogil:
rt = big_file_create_records(&self.file.bf, &self.rtype, mode, Nfile, <size_t*>fsize.data)
if rt != 0:
raise Error()
self.size = self.size + size

def append(self, numpy.ndarray buf, numpy.intp_t Nfile=1):
assert buf.dtype == self.dtype
assert buf.ndim == 1
tail = self.size
self._create_records(len(buf), Nfile=Nfile, mode=b"a+")
self.write(tail, buf)

def write(self, numpy.intp_t start, numpy.ndarray buf):
assert buf.dtype == self.dtype
assert buf.ndim == 1
with nogil:
rt = big_file_write_records(&self.file.bf, &self.rtype, start, buf.shape[0], buf.data)
if rt != 0:
raise Error()

def __dealloc__(self):
big_record_type_clear(&self.rtype)
2 changes: 1 addition & 1 deletion bigfile/tests/test_bigfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def test_mpi_attr(comm):

def test_version():
import bigfile
assert hasattr(File, '__version__')
assert hasattr(bigfile, '__version__')

@MPITest(commsize=[1, 4])
def test_mpi_large(comm):
Expand Down
8 changes: 4 additions & 4 deletions src/bigfile-mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ _big_block_mpi_create(BigBlock * bb,

int i;
for(i = (size_t) bb->Nfile * rank / NTask; i < (size_t) bb->Nfile * (rank + 1) / NTask; i ++) {
FILE * fp = _big_file_open_a_file(bb->methods, bb->basename, i, "w", 1);
BigFileStream fp = _big_file_open_a_file(bb->methods, bb->basename, i, "w", 1);
if(fp == NULL) {
rt = -1;
break;
}
fclose(fp);
bb->methods->fclose(fp);
}

BCAST_AND_RAISEIF(rt, comm);
Expand Down Expand Up @@ -251,12 +251,12 @@ int big_block_mpi_grow(BigBlock * bb,

int i;
for(i = (size_t) Nfile_grow * rank / NTask; i < (size_t) Nfile_grow * (rank + 1) / NTask; i ++) {
FILE * fp = _big_file_open_a_file(bb->methods, bb->basename, i + oldNfile, "w", 1);
BigFileStream fp = _big_file_open_a_file(bb->methods, bb->basename, i + oldNfile, "w", 1);
if(fp == NULL) {
rt = -1;
break;
}
fclose(fp);
bb->methods->fclose(fp);
}

BCAST_AND_RAISEIF(rt, comm);
Expand Down
1 change: 1 addition & 0 deletions src/bigfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ _big_block_grow_internal(BigBlock * bb, int Nfile_grow, const size_t fsize_grow[
bb->fchecksum = fchecksum;
bb->Nfile = Nfile;
bb->size = bb->foffset[Nfile];
bb->dirty = 1;

return 0;
}
Expand Down

0 comments on commit b89afec

Please sign in to comment.