-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Some tips on correct usage? #440
Comments
If the Additionally, each TileDB context has an associated cache for downloaded tiles, which is controlled by the
As above, every time the array is opened, TileDB needs to (1) open the schema (2) download the fragment metadata (this is the range bounding information for each fragment, in order to only download specific tiles necessary to satisfy a given query). So from the standpoint of minimizing i/o traffic, it makes sense. All that said,
If they are truly not doing anything then that is worrisome, and I would like to find out what is happening - possible there is a memory leak somewhere. See also comments in #150.
Adding as feature-request, will add ASAP |
This could very likely be the root cause of our memory leaks. We are explicitly creating a completely new context every time we open the array. In light of #150: import tiledb
import numpy as np
import psutil
x = np.ones(10000000)
ctx = tiledb.Ctx()
path = 'test_tile_db'
d1 = tiledb.Dim(
'test_domain', domain=(0, x.shape[0] - 1), tile=10000, dtype="uint32"
)
domain = tiledb.Domain(d1)
v = tiledb.Attr(
'test_value',
dtype="float32",
)
schema = tiledb.ArraySchema(
domain=domain, attrs=(v,), cell_order="row-major", tile_order="row-major"
)
A = tiledb.DenseArray.create(path, schema)
values = x.astype(np.float32)
with tiledb.DenseArray(path, mode="w", ctx=ctx) as A:
A[:] = {'test_value': values}
for i in range(1000):
with tiledb.DenseArray(path, mode='r') as data:
data[:]
import gc
gc.collect()
print('Gigs:', round(psutil.virtual_memory().used / (10**9), 2)) Result:
It needs about 10 loops to leak 100MB of memory, what would equal to my |
No, it creates one
If you use one |
Hi @Hoeze, From this comment:
It sounds like you are Is that correct? |
Thanks, I'll try that later.
That was my initial guess because we already made that mistake once. I feel bad to ask you for reviewing our code, but if you don't mind, here is a stripped down code sample of our genotype class. class Genotype:
"""
class representing the genoytpes of variants
initialize with specified path where genotype, genotype quality and sequencing depth arrays should be saved
Methods:
create_array: creates GT, GQ, DP arrays
get_variantkey: calculates variantkey given a variant
write: writes input (GT, GQ, DP) into corresponding array
get: get GT, GQ or DP values given a Variant object and a sample/multiple samples
write_genotype: calls write method with specified path "GT"
write_gtquality: calls write method with specified path "GQ"
write_sequencingdepth: calls write method with specified path "DP"
get_genotype: calls get method with specified path "GT"
get_gtquality: calls get method with specified path "GQ"
get_sequencingdepth: calls get method with specified path "DP"
"""
def __init__(self, path=None, create_if_missing=False, sample_anno=None, variant=None, nthreads=1):
"""
create 1 tileDB array if it doesn't exit yet:
:param path: path to tileDB arrays
"""
path = Config().genodb_path(path)
self.genodb_path = path
if not os.path.exists(path):
if create_if_missing:
os.mkdir(path)
self.create_array()
else:
raise FileNotFoundError(f"Array does not exist at path '{path}'.\nPlease create an array first.")
elif not os.path.exists(path + "/genotype"):
self.create_array()
self.tiledb_config = tiledb.Config()
self.tiledb_config["vfs.file.posix_permissions"] = "770"
# Forces a certain number of threads
self.tiledb_config.update({
"sm.num_reader_threads": nthreads,
"sm.num_writer_threads": nthreads,
"sm.compute_concurrency_level": nthreads,
"sm.io_concurrency_level": nthreads,
"vfs.file.max_parallel_ops": nthreads,
"vfs.num_threads": nthreads,
})
# If the tiledb version is >= 2.1, we can also change the thread pool size
if np.all([2, 1] >= np.array(tiledb.libtiledb.version()[:2])):
self.tiledb_config["sm.num_tbb_threads"] = nthreads
# sample selection
if sample_anno is not None:
self._sample_anno = sample_anno
else:
self._sample_anno = SampleAnnotation(genodb_path=self.genodb_path)
# variant selection
self._variant = variant
def create_array(self, tile_genome=float(2 ** 41), tile_sample=10):
"""
creates array with GT, GQ, DP attributes in directory path
"""
dom = tiledb.Domain(
tiledb.Dim(name="variantkey", domain=(0, 2 ** 64 - 2), tile=tile_genome, dtype=np.uint64),
tiledb.Dim(name="sample_idx", domain=(0, 9000), tile=tile_sample, dtype=np.uint64))
schema = tiledb.ArraySchema(domain=dom, sparse=True,
attrs=(tiledb.Attr(name="GT", dtype=np.int8),
tiledb.Attr(name="DP", dtype=np.int64),
tiledb.Attr(name="GQ", dtype=np.float64),
tiledb.Attr(name="MQ", dtype=np.float64),
))
tiledb.SparseArray.create(self.genodb_path + "/genotype", schema)
def get_variantkey(self, var: Variant):
"""
calculates variantkey given a variant
:param var: Variant
:return: variantkey
"""
variantkey = var.df.apply(lambda x: vk.variantkey(x['chrom'], x['start'], x['ref'], x['alt']), axis=1)
return variantkey
def write(self, var: Variant, sample: Union[int, np.ndarray], GT: np.int8, DP: np.int64, GQ: np.float64,
MQ: np.float64, variantkey=None):
"""
writes input (GT, GQ, DP) into array
:param var: Variant object with different variants
:param sample: sample index
:param input: GT, GQ or DP values for each variant-sample combination
:param path: specifies whether input values are GT, GQ or DP values
:param variantkey: optional variant key
"""
if isinstance(sample, int):
sample = [sample]
if variantkey is None:
variantkey = self.get_variantkey(var)
assert np.ndim(variantkey) == 1, 'Variant has to be one dimensional object'
assert np.ndim(sample) == 1, "samples has to be one dimensional object"
variantkey = np.tile(variantkey, len(sample))
sample = np.repeat(sample, len(var.df))
GT = GT.flatten("F")
DP = DP.flatten("F")
GQ = GQ.flatten("F")
MQ = MQ.flatten("F")
with self.open_array(mode='w') as A:
A[variantkey, sample] = {"GT": GT, "DP": DP, "GQ": GQ, "MQ": MQ}
def get(
self,
var: Variant,
sample: Union[int, np.ndarray] = None,
attr: Union[str, List[str]] = ["GT", "GQ", "DP",],
variantkey=None,
return_raw_df=False,
sparse_array=False,
masked=False
) -> Union[np.ndarray, List[np.ndarray]]:
"""
get GT, GQ or DP values given a Variant object and a sample/multiple samples
:param var: Variant object with different variants
:param sample: sample index
:param attr: specifies whether desired values are GT, GQ or DP values
:return: GT, GQ or DP values for all variant-sample combinations
"""
flatten = False
if isinstance(attr, str):
flatten = True
attr = [attr]
# use sample annotation by default
if sample is None:
sample = self.sample_anno.sample_idx
# convert slices to RangeIndex
if isinstance(sample, slice):
sample = pd.RangeIndex(sample.start, sample.stop, sample.step)
# convert to 1D vector of sample id's
sample = np.asarray(sample).astype(int)
if np.ndim(sample) < 1:
sample = np.expand_dims(sample, 0)
if variantkey is None:
variantkey = self.get_variantkey(var).values
assert 1 == len(variantkey.shape), 'Variant has to be one dimensional object'
assert 1 == len(sample.shape), "samples has to be one dimensional object"
assert len(sample) == len(np.unique(sample)), \
"A sample can occur only once, at least one sample occurs multiple times."
schema = tiledb.ArraySchema.load(self.genodb_path + "/genotype")
domain = schema.domain.dim("sample_idx").domain[1]
if np.all(sample < 0) or np.all(sample > domain):
array = np.zeros((len(variantkey) * len(sample)))
output = OrderedDict([
('sample_idx', np.repeat(sample, len(variantkey))),
('variantkey', variantkey),
*[(a, array) for a in attr],
])
else:
with self.open_array() as A:
q = A.query(attrs=attr)
output = q.multi_index[list(variantkey), list(sample)]
output = pd.DataFrame(output, columns=output.keys())
if return_raw_df:
return output
ret_arrays = []
for a in attr:
array = self._convert_array(
output_variantkey=output['variantkey'],
output_sample_idx=output['sample_idx'],
value_vector=output[a],
variantkey=variantkey,
sample=sample,
sparse_array=sparse_array,
masked=masked
)
ret_arrays.append(array)
if flatten:
return ret_arrays[0]
else:
return ret_arrays
[...]
def open_array(self, mode='r'):
return tiledb.SparseArray(
self.genodb_path + '/genotype',
mode=mode,
ctx=tiledb.Ctx(self.tiledb_config)
)
@property
def sample_anno(self):
return self._sample_anno
def samples(self):
return self._sample_anno.sample_id
def sample_idx(self):
return self._sample_anno.sample_idx
def __getstate__(self):
# Copy the object's state from self.__dict__ which contains
# all our instance attributes. Always use the dict.copy()
# method to avoid modifying the original state.
state = self.__dict__.copy()
# Remove the unpicklable entries.
state["tiledb_config"] = state["tiledb_config"].dict()
return state
def __setstate__(self, state):
# Restore instance attributes (i.e., filename and lineno).
self.__dict__.update(state)
self.tiledb_config = tiledb.Config(state["tiledb_config"]) We create an object of this class like I also did some memory profiling (without dask) using # print("%s GB" % round(psutil.virtual_memory().used / (2**30), 3))
19.651 GB
# %mprun -f gt_array.get vep_tl_agg.agg_transcript_level("ENSG00000153707")
Line # Mem usage Increment Occurences Line Contents
============================================================
152 5909.1 MiB 4232.3 MiB 8 def get(
153 self,
154 var: Variant,
155 sample: Union[int, np.ndarray] = None,
156 attr: Union[str, List[str]] = ["GT", "GQ", "DP"],
157 variantkey=None,
158 return_raw_df=False,
159 sparse_array=False,
160 masked=False
161 ) -> Union[np.ndarray, List[np.ndarray]]:
162 """
163 get GT, GQ or DP values given a Variant object and a sample/multiple samples
164 :param var: Variant object with different variants
165 :param sample: sample index
166 :param attr: specifies whether desired values are GT, GQ or DP values
167 :return: GT, GQ or DP values for all variant-sample combinations
168 """
169 5909.1 MiB 0.0 MiB 8 flatten = False
170 5909.1 MiB 0.0 MiB 8 if isinstance(attr, str):
171 flatten = True
172 attr = [attr]
173
174 # use sample annotation by default
175 5909.1 MiB 0.0 MiB 8 if sample is None:
176 5909.1 MiB 0.0 MiB 8 sample = self.sample_anno.sample_idx
177
178 # convert slices to RangeIndex
179 5909.1 MiB 0.0 MiB 8 if isinstance(sample, slice):
180 sample = pd.RangeIndex(sample.start, sample.stop, sample.step)
181 # convert to 1D vector of sample id's
182 5909.1 MiB 0.0 MiB 8 sample = np.asarray(sample).astype(int)
183 5909.1 MiB 0.0 MiB 8 if np.ndim(sample) < 1:
184 sample = np.expand_dims(sample, 0)
185
186 5909.1 MiB 0.0 MiB 8 if variantkey is None:
187 5909.1 MiB 2.3 MiB 8 variantkey = self.get_variantkey(var).values
188
189 5909.1 MiB 0.0 MiB 8 assert 1 == len(variantkey.shape), 'Variant has to be one dimensional object'
190 5909.1 MiB 0.0 MiB 8 assert 1 == len(sample.shape), "samples has to be one dimensional object"
191 5909.1 MiB 0.0 MiB 8 assert len(sample) == len(np.unique(sample)), \
192 "A sample can occur only once, at least one sample occurs multiple times."
193
194 5909.1 MiB 2.5 MiB 8 schema = tiledb.ArraySchema.load(self.genodb_path + "/genotype")
195
196 5909.1 MiB 0.0 MiB 8 domain = schema.domain.dim("sample_idx").domain[1]
197
198 5909.1 MiB 0.0 MiB 8 if np.all(sample < 0) or np.all(sample > domain):
199 array = np.zeros((len(variantkey) * len(sample)))
200 output = OrderedDict([
201 ('sample_idx', np.repeat(sample, len(variantkey))),
202 ('variantkey', variantkey),
203 *[(a, array) for a in attr],
204 ])
205 else:
206 5909.1 MiB 1.3 MiB 8 with self.open_array() as A:
207 5909.1 MiB 0.0 MiB 8 q = A.query(attrs=attr)
208 5909.1 MiB 1670.5 MiB 8 output = q.multi_index[list(variantkey), list(sample)]
209
210 5909.1 MiB 0.1 MiB 8 output = pd.DataFrame(output, columns=output.keys())
211
212 5909.1 MiB 0.0 MiB 8 if return_raw_df:
213 return output
214
215 5909.1 MiB 0.0 MiB 8 ret_arrays = []
216 5909.1 MiB 0.0 MiB 32 for a in attr:
217 5909.1 MiB 0.0 MiB 24 array = self._convert_array(
218 5909.1 MiB 0.0 MiB 24 output_variantkey=output['variantkey'],
219 5909.1 MiB 0.0 MiB 24 output_sample_idx=output['sample_idx'],
220 5909.1 MiB 0.0 MiB 24 value_vector=output[a],
221 5909.1 MiB 0.0 MiB 24 variantkey=variantkey,
222 5909.1 MiB 0.0 MiB 24 sample=sample,
223 5909.1 MiB 0.0 MiB 24 sparse_array=sparse_array,
224 5909.1 MiB 0.0 MiB 24 masked=masked
225 )
226 5909.1 MiB 0.0 MiB 24 ret_arrays.append(array)
227
228 5909.1 MiB 0.0 MiB 8 if flatten:
229 return ret_arrays[0]
230 else:
231 5909.1 MiB 0.0 MiB 8 return ret_arrays
# print("%s GB" % round(psutil.virtual_memory().used / (2**30), 3))
21.422 GB
# %mprun -f gt_array.get vep_tl_agg.agg_transcript_level("ENSG00000153707")
Line # Mem usage Increment Occurences Line Contents
============================================================
152 5951.4 MiB 5842.6 MiB 8 def get(
153 self,
154 var: Variant,
155 sample: Union[int, np.ndarray] = None,
156 attr: Union[str, List[str]] = ["GT", "GQ", "DP"],
157 variantkey=None,
158 return_raw_df=False,
159 sparse_array=False,
160 masked=False
161 ) -> Union[np.ndarray, List[np.ndarray]]:
162 """
163 get GT, GQ or DP values given a Variant object and a sample/multiple samples
164 :param var: Variant object with different variants
165 :param sample: sample index
166 :param attr: specifies whether desired values are GT, GQ or DP values
167 :return: GT, GQ or DP values for all variant-sample combinations
168 """
169 5951.4 MiB -89.2 MiB 8 flatten = False
170 5951.4 MiB -89.2 MiB 8 if isinstance(attr, str):
171 flatten = True
172 attr = [attr]
173
174 # use sample annotation by default
175 5951.4 MiB -89.2 MiB 8 if sample is None:
176 5951.4 MiB -89.2 MiB 8 sample = self.sample_anno.sample_idx
177
178 # convert slices to RangeIndex
179 5951.4 MiB -89.2 MiB 8 if isinstance(sample, slice):
180 sample = pd.RangeIndex(sample.start, sample.stop, sample.step)
181 # convert to 1D vector of sample id's
182 5951.4 MiB -89.2 MiB 8 sample = np.asarray(sample).astype(int)
183 5951.4 MiB -89.2 MiB 8 if np.ndim(sample) < 1:
184 sample = np.expand_dims(sample, 0)
185
186 5951.4 MiB -89.2 MiB 8 if variantkey is None:
187 5951.4 MiB -89.4 MiB 8 variantkey = self.get_variantkey(var).values
188
189 5951.4 MiB -89.2 MiB 8 assert 1 == len(variantkey.shape), 'Variant has to be one dimensional object'
190 5951.4 MiB -89.2 MiB 8 assert 1 == len(sample.shape), "samples has to be one dimensional object"
191 5951.4 MiB -89.2 MiB 8 assert len(sample) == len(np.unique(sample)), \
192 "A sample can occur only once, at least one sample occurs multiple times."
193
194 5951.4 MiB -89.2 MiB 8 schema = tiledb.ArraySchema.load(self.genodb_path + "/genotype")
195
196 5951.4 MiB -89.2 MiB 8 domain = schema.domain.dim("sample_idx").domain[1]
197
198 5951.4 MiB -89.2 MiB 8 if np.all(sample < 0) or np.all(sample > domain):
199 array = np.zeros((len(variantkey) * len(sample)))
200 output = OrderedDict([
201 ('sample_idx', np.repeat(sample, len(variantkey))),
202 ('variantkey', variantkey),
203 *[(a, array) for a in attr],
204 ])
205 else:
206 5951.4 MiB -89.2 MiB 8 with self.open_array() as A:
207 5951.4 MiB -89.2 MiB 8 q = A.query(attrs=attr)
208 5951.4 MiB -98.1 MiB 8 output = q.multi_index[list(variantkey), list(sample)]
209
210 5951.4 MiB -117.9 MiB 8 output = pd.DataFrame(output, columns=output.keys())
211
212 5951.4 MiB -117.9 MiB 8 if return_raw_df:
213 return output
214
215 5951.4 MiB -117.9 MiB 8 ret_arrays = []
216 5951.4 MiB -471.5 MiB 32 for a in attr:
217 5951.4 MiB -353.6 MiB 24 array = self._convert_array(
218 5951.4 MiB -353.6 MiB 24 output_variantkey=output['variantkey'],
219 5951.4 MiB -353.6 MiB 24 output_sample_idx=output['sample_idx'],
220 5951.4 MiB -353.6 MiB 24 value_vector=output[a],
221 5951.4 MiB -353.6 MiB 24 variantkey=variantkey,
222 5951.4 MiB -353.6 MiB 24 sample=sample,
223 5951.4 MiB -353.6 MiB 24 sparse_array=sparse_array,
224 5951.4 MiB -353.6 MiB 24 masked=masked
225 )
226 5951.4 MiB -353.6 MiB 24 ret_arrays.append(array)
227
228 5951.4 MiB -117.9 MiB 8 if flatten:
229 return ret_arrays[0]
230 else:
231 5951.4 MiB -117.9 MiB 8 return ret_arrays
# print("%s GB" % round(psutil.virtual_memory().used / (2**30), 3))
21.367 GB If I interpret this correctly, the first profiling run increases memory by 1670.5 MiB when running |
Hi @Hoeze, I wanted to check in about this, in particular:
Also:
At least in the code section highlighted next to the profiler output, it looks like there is no new context created. So if the profiler is running the code multiple times with the same default context ( |
Hi, I'd have some questions how to correctly read data from arrays.
Consider the following example:
I'm running the
get()
function in parallel and I always have to restart my workers after some time because they quickly use up to 12GB of memory without doing anything.I noticed that this somehow increases the memory leakage.
multi_index
?In many cases, I want to provide some numpy-array as indexer.
My tiledb-config:
The text was updated successfully, but these errors were encountered: