Skip to content

Commit

Permalink
refactoring parquet SC generation
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Sep 25, 2024
1 parent d1f74d6 commit 3909487
Showing 1 changed file with 32 additions and 14 deletions.
46 changes: 32 additions & 14 deletions examples/loom2parquetchunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# import libraries
import pandas as pd
import loompy
import pyarrow.parquet as pq
import pyarrow as pa

# define the path to the loom file
loom_file = "GSE156793_S3_gene_count.loom"
Expand Down Expand Up @@ -32,8 +34,7 @@
development_day = ds.ca["Development_day"]

# make a dataframe with the sample metadata, define the columns types
sample_df = pd.DataFrame(
{
sample_df = pd.DataFrame({
"sample_id": sample_id,
"cell_cluster": cell_cluster,
"assay": assay,
Expand Down Expand Up @@ -68,9 +69,11 @@

# transpose dataset and convert to parquet.
# process the data per chunks.
chunk_size = 50000
number_chunks = 50 # Number of chunks to process, if None, all chunks are processed
chunk_size = 10000
writer = None
count = 0
number_chunks = 10 # number of chunks to process

for ix, selection, view in ds.scan(axis=1, batch_size=chunk_size):
# retrieve the chunk
matrix_chunk = view[:, :]
Expand Down Expand Up @@ -102,17 +105,32 @@
# rename the index column
df_chunk = df_chunk.rename(columns={"index": "sample_id"})

# save the chunk to parquet
df_chunk.to_parquet(
f"gene_count_chunk_{ix}.parquet",
index=False,
engine="pyarrow",
compression="gzip",
)
if writer is None:
# define the schema
schema = pa.schema(
[
pa.field("sample_id", pa.string()),
pa.field("cell_cluster_id", pa.int8()),
pa.field("development_day", pa.int64()),
pa.field("assay_id", pa.int8()),
]
+ [pa.field(gene_id, pa.float32()) for gene_id in gene_ids]
)

print(len(list(df_chunk.columns)))
print(len(schema))

# create the parquet writer
writer = pq.ParquetWriter("GSE156793.parquet", schema, compression="snappy")

writer.write_table(pa.Table.from_pandas(df_chunk, preserve_index=False))

print(f"Chunk {ix} saved")
count = count + 1

# break the loop if the number of chunks is reached
if number_chunks is not None and count >= number_chunks:
count += 1
if count >= number_chunks:
break

if writer is not None:
writer.close()
print(f"Concatenated parquet file written to GSE156793.parquet")

0 comments on commit 3909487

Please sign in to comment.