Skip to content

Commit

Permalink
fix csv save plugin ... works!
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Aug 1, 2023
1 parent b80e589 commit d0a2b63
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 34 deletions.
4 changes: 2 additions & 2 deletions countess/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def multi_iterator_map(function, values, args) -> Iterable:
to organize `values` and another queue to organize the
returned values."""

nproc = min((cpu_count()//2, len(values)))
nproc = min((cpu_count(), len(values)))
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue(maxsize=nproc)

Expand Down Expand Up @@ -150,7 +150,7 @@ def execute(self, logger: Logger, row_limit: Optional[int] = None):
args=(logger, row_limit_each_file)
)
else:
self.plugin.prepare([p.name for p in self.parent_nodes])
self.plugin.prepare([p.name for p in self.parent_nodes], row_limit)
self.result = self.process_parent_iterables(logger)

self.result = self.plugin.collect(self.result)
Expand Down
8 changes: 4 additions & 4 deletions countess/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def hash(self):
"""Returns a hex digest of the hash of all configuration parameters"""
return self.get_parameter_hash().hexdigest()

def prepare(self, sources: List[str]):
def prepare(self, sources: List[str], row_limit: Optional[int]):
pass

def process(self, data, source: str, logger: Logger) -> Optional[Iterable[pd.DataFrame]]:
Expand Down Expand Up @@ -225,7 +225,7 @@ class PandasSimplePlugin(PandasBasePlugin):

input_columns: Dict[str, np.dtype] = {}

def prepare(self, sources: list[str]):
def prepare(self, sources: list[str], row_limit: Optional[int]):
self.input_columns = {}

def process(self, data: pd.DataFrame, source: str, logger: Logger) -> Iterable[pd.DataFrame]:
Expand Down Expand Up @@ -255,13 +255,13 @@ class PandasProductPlugin(PandasBasePlugin):
mem1 = None
mem2 = None

def prepare(self, sources: list[str]):
def prepare(self, sources: list[str], row_limit: Optional[int]):

if len(sources) != 2:
raise ValueError(f"{self.__class__} required exactly two inputs")
self.source1, self.source2 = sources

super().prepare(sources)
super().prepare(sources, row_limit)

self.mem1 = []
self.mem2 = []
Expand Down
43 changes: 19 additions & 24 deletions countess/plugins/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,24 @@ class SaveCsvPlugin(PandasOutputPlugin):
}

filehandle: Optional[Union[BufferedWriter | BytesIO]] = None
csv_columns = []
csv_columns = None

SEPARATORS = {",": ",", ";": ";", "SPACE": " ", "TAB": "\t"}
QUOTING = {False: csv.QUOTE_MINIMAL, True: csv.QUOTE_NONNUMERIC}

def output_dataframe(self, dataframe: pd.DataFrame, logger: Logger):
def prepare(self, sources: list[str], row_limit: Optional[int] = None):
if row_limit is None:
filename = self.parameters["filename"].value
if filename.endswith(".gz"):
self.filehandle = gzip.open(filename, "wb")
else:
self.filehandle = open(filename, "wb")
else:
self.filehandle = BytesIO()

self.csv_columns = None

def process(self, dataframe: pd.DataFrame, source: str, logger: Logger):
# reset indexes so we can treat all columns equally.
# if there's just a nameless index then we don't care about it, drop it.

Expand All @@ -181,7 +193,7 @@ def output_dataframe(self, dataframe: pd.DataFrame, logger: Logger):

# if this is our first dataframe to write then decide whether to
# include the header or not.
if self.filehandle.tell() == 0:
if self.csv_columns is None:
self.csv_columns = list(dataframe.columns)
emit_header = self.parameters["header"].value
else:
Expand All @@ -203,25 +215,8 @@ def output_dataframe(self, dataframe: pd.DataFrame, logger: Logger):
sep=self.SEPARATORS[self.parameters["delimiter"].value],
quoting=self.QUOTING[self.parameters["quoting"].value],
)
return []

def process_inputs(
self,
inputs: Mapping[str, Iterable[pd.DataFrame]],
logger,
row_limit: Optional[int] = None,
) -> Optional[str]:
assert isinstance(self.parameters["filename"], StringParam)

if row_limit is None:
filename = self.parameters["filename"].value
print(f"OPEN {filename}")
if filename.endswith(".gz"):
self.filehandle = gzip.open(filename, "wb")
else:
self.filehandle = open(filename, "wb")
super().process_inputs(inputs, logger, row_limit)
return None
else:
self.filehandle = BytesIO()
super().process_inputs(inputs, logger, row_limit)
return self.filehandle.getvalue().decode("utf-8")
def finalize(self, logger: Logger):
if isinstance(self.filehandle, BytesIO):
yield self.filehandle.getvalue().decode("utf-8")
7 changes: 4 additions & 3 deletions countess/plugins/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class JoinPlugin(PandasProductPlugin):
input_columns_1 = None
input_columns_2 = None

def prepare(self, sources: list[str]):
super().prepare(sources)
def prepare(self, sources: list[str], row_limit: Optional[int] = None):
super().prepare(sources, row_limit)

assert isinstance(self.parameters["inputs"], ArrayParam)
assert len(self.parameters["inputs"]) == 2
Expand Down Expand Up @@ -93,9 +93,10 @@ def process_dataframes(self, dataframe1: pd.DataFrame, dataframe2: pd.DataFrame,

return dataframe

def finalize(self, logger: Logger) -> None:
def finalize(self, logger: Logger) -> Iterable:
assert isinstance(self.parameters["inputs"], ArrayParam)
assert len(self.parameters["inputs"]) == 2
ip1, ip2 = self.parameters["inputs"]
ip1.set_column_choices(self.input_columns_1.keys())
ip2.set_column_choices(self.input_columns_2.keys())
return []
2 changes: 1 addition & 1 deletion countess/plugins/regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RegexToolPlugin(PandasTransformSingleToTuplePlugin):

compiled_re = None

def prepare(self, sources: list[str]):
def prepare(self, sources: list[str], row_limit: Optional[int] = None):

self.compiled_re = re.compile(self.parameters["regex"].value)

Expand Down

0 comments on commit d0a2b63

Please sign in to comment.