diff --git a/CHANGES.rst b/CHANGES.rst index c69db004d..d7c6ce377 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,7 @@ We changed the default branch from "master" to "main". - Add a test for the dask bindings (#719) - Refactor input data iteration to need less memory (#707) - Added benchmark tests (#710) + - Make dask a possible input format (#736) - Bugfixes: - Fixed a bug in the selection, that caused all regression tasks with un-ordered index to be wrong (#715) - Fixed readthedocs (#695, #696) @@ -30,6 +31,7 @@ We changed the default branch from "master" to "main". - Fix in the forecasting notebook (#729) - Let tsfresh choose the value column if possible (#722) - Move from coveralls github action to codecov (#734) + - Improve speed of data processing (#735) - Fix for newer, more strict pandas versions (#737) - Fix documentation for feature calculators (#743) diff --git a/docs/index.rst b/docs/index.rst index 6d40b91df..66eab5067 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -24,24 +24,24 @@ The following chapters will explain the tsfresh package in detail: .. toctree:: :maxdepth: 1 - Introduction - Quick Start - Module Reference - Data Formats - scikit-learn Transformers - List of Calculated Features - Feature Calculation - Feature Calculator Settings - Feature Filtering - How to write custom Feature Calculators - Parallelization - tsfresh on a cluster - Rolling/Time Series Forecasting - FAQ - Authors - License - Changelog - How to contribute + text/introduction + text/quick_start + text/data_formats + text/sklearn_transformers + text/list_of_features + text/feature_extraction_settings + text/feature_filtering + text/how_to_add_custom_feature + text/large_data + text/tsfresh_on_a_cluster + text/forecasting + text/faq + api/modules + authors + license + changes + text/how_to_contribute + text/feature_calculation Indices and tables diff --git a/docs/text/data_formats.rst b/docs/text/data_formats.rst index c80030834..8481b3559 100644 --- a/docs/text/data_formats.rst +++ b/docs/text/data_formats.rst @@ -10,14 +10,15 @@ tsfresh offers three different options to specify the time series data to be use Irrespective of the input format, tsfresh will always return the calculated features in the same output format described below. -All three input format options consist of :class:`pandas.DataFrame` objects. There are four important column types that +Typically, the input format options consist of :class:`pandas.DataFrame` objects. +(see :ref:`large-data-label` for other input types) +There are four important column types that make up those DataFrames. Each will be described with an example from the robot failures dataset (see :ref:`quick-start-label`). :`column_id`: This column indicates which entities the time series belong to. Features will be extracted individually for each entity (id). The resulting feature matrix will contain one row per id. Each robot is a different entity, so each of it has a different id. - After rolling, each window will be a distinct entity, so it has a distinct id. :`column_sort`: This column contains values which allow to sort the time series (e.g. time stamps). In general, it is not required to have equidistant time steps or the same time scale for the different ids and/or kinds. @@ -43,12 +44,12 @@ In the following we describe the different input formats, that are build on thos * A dictionary of flat DataFrames The difference between a flat and a stacked DataFrame is indicated by specifying or not specifying the parameters -`column_value` and `column_kind` in the :func:`tsfresh.extract_features` function. +``column_value`` and ``column_kind`` in the :func:`tsfresh.extract_features` function. If you do not know which one to choose, you probably want to try out the flat or stacked DataFrame. -Input Option 1. Flat DataFrame ------------------------------- +Input Option 1. Flat DataFrame or Wide DataFrame +------------------------------------------------ If both ``column_value`` and ``column_kind`` are set to ``None``, the time series data is assumed to be in a flat DataFrame. This means that each different time series must be saved as its own column. @@ -82,8 +83,8 @@ and you would pass to the extraction functions, to extract features separately for all ids and separately for the x and y values. You can also omit the ``column_kind=None, column_value=None`` as this is the default. -Input Option 2. Stacked DataFrame ---------------------------------- +Input Option 2. Stacked DataFrame or Long DataFrame +--------------------------------------------------- If both ``column_value`` and ``column_kind`` are set, the time series data is assumed to be a stacked DataFrame. This means that there are no different columns for the different types of time series. @@ -128,6 +129,7 @@ Then you would set column_id="id", column_sort="time", column_kind="kind", column_value="value" to end up with the same extracted features as above. +You can also omit the value column and let ``tsfresh`` can deduce it automatically. Input Option 3. Dictionary of flat DataFrames @@ -205,4 +207,4 @@ where the x features are calculated using all x values (independently for A and and so on. This form of DataFrame is also the expected input format to the feature selection algorithms (e.g. the -:func:`tsfresh.select_features` function). \ No newline at end of file +:func:`tsfresh.select_features` function). diff --git a/docs/text/feature_calculation.rst b/docs/text/feature_calculation.rst index 2aaebaef2..57190a43a 100644 --- a/docs/text/feature_calculation.rst +++ b/docs/text/feature_calculation.rst @@ -1,10 +1,7 @@ .. _feature-naming-label: -Feature Calculation -=================== - -Feature naming -'''''''''''''' +Feature Calculator Naming +========================= tsfresh enforces a strict naming of the created features, which you have to follow whenever you create new feature calculators. diff --git a/docs/text/feature_extraction_settings.rst b/docs/text/feature_extraction_settings.rst index a9277476b..a58337ed8 100644 --- a/docs/text/feature_extraction_settings.rst +++ b/docs/text/feature_extraction_settings.rst @@ -12,7 +12,7 @@ For the lazy: Just let me calculate some features ------------------------------------------------- So, to just calculate a comprehensive set of features, call the :func:`tsfresh.extract_features` method without -passing a `default_fc_parameters` or `kind_to_fc_parameters` object, which means you are using the default options +passing a ``default_fc_parameters`` or ``kind_to_fc_parameters`` object, which means you are using the default options (which will use all feature calculators in this package for what we think are sane default parameters). For the advanced: How do I set the parameters for all kind of time series? @@ -29,7 +29,7 @@ custom settings object: >>> from tsfresh.feature_extraction import extract_features >>> extract_features(df, default_fc_parameters=settings) -The `default_fc_parameters` is expected to be a dictionary, which maps feature calculator names +The ``default_fc_parameters`` is expected to be a dictionary, which maps feature calculator names (the function names you can find in the :mod:`tsfresh.feature_extraction.feature_calculators` file) to a list of dictionaries, which are the parameters with which the function will be called (as key value pairs). Each function parameter combination, that is in this dict will be called during the extraction and will produce a feature. @@ -79,7 +79,7 @@ For the ambitious: How do I set the parameters for different type of time series It is also possible, to control the features to be extracted for the different kinds of time series individually. You can do so by passing another dictionary to the extract function as a -`kind_to_fc_parameters` = {"kind" : `fc_parameters`} + kind_to_fc_parameters = {"kind" : fc_parameters} parameter. This dict must be a mapping from kind names (as string) to `fc_parameters` objects, which you would normally pass as an argument to the `default_fc_parameters` parameter. diff --git a/docs/text/introduction.rst b/docs/text/introduction.rst index 481bd154a..07f20d404 100644 --- a/docs/text/introduction.rst +++ b/docs/text/introduction.rst @@ -46,8 +46,6 @@ What not to do with tsfresh? Currently, tsfresh is not suitable * for usage with streaming data - * for batch processing over a distributed architecture when different time series are fragmented over different computational units - (but see how to use ``tsfresh`` on a cluster in :ref:`tsfresh-on-a-cluster-label`) * to train models on the features (we do not want to reinvent the wheel, check out the python package `scikit-learn `_ for example) @@ -61,6 +59,7 @@ There is a matlab package called `hctsa `_ extract features from time series. It is also possible to use hctsa from within python by means of the `pyopy `_ package. +There also exist `featuretools `_, `FATS `_ and `cesium `_. References ---------- diff --git a/docs/text/large_data.rst b/docs/text/large_data.rst new file mode 100644 index 000000000..d2e52b849 --- /dev/null +++ b/docs/text/large_data.rst @@ -0,0 +1,84 @@ +.. _large-data-label: + +Large Input Data +================ + +If you are dealing with large time series data, you are facing multiple problems. +Thw two most important ones are +* long execution times for feature extraction +* large memory consumptions, even beyond what a single machine can handle + +To solve only the first problem, you can parallelize the computation as described in :ref:`tsfresh-on-a-cluster-label`. +Please note, that parallelization on your local computer is already turned on by default. + +However, for even larger data you need to handle both problems at once. +You have multiple possibilities here: + +Dask - the simple way +--------------------- + +*tsfresh* accepts a `dask dataframe `_ instead of a +pandas dataframe as input for the :func:`tsfresh.extract_features` function. +Dask dataframes allow you to scale your computation beyond your local memory (via partitioning the data internally) +and even to large clusters of machines. +Its dataframe API is very similar to pandas dataframes and might even be a drop-in replacement. + +All arguments discussed in :ref:`data-formats-label` are also valid for the dask case. +The input data will be transformed into the correct format for *tsfresh* using dask methods +and the feature extraction will be added as additional computations to the computation graph. +You can then add additional computations to the result or trigger the computation as usual with ``.compute()``. + +.. NOTE:: + + The last step of the feature extraction is to bring all features into a tabular format. + Especially for very large data samples, this computation can be a large + performance bottleneck. + We therefore recommend to turn the pivoting off, if you do not really need it + and work with the unpivoted data as much as possible. + +For example, to read in data from parquet and do the feature extraction: + +.. code:: + + import dask.dataframe as dd + from tsfresh import extract_features + + df = dd.read_parquet(...) + + X = extract_features(df, + column_id="id", column_sort="time", + pivot=False) + + result = X.compute() + +Dask - more control +------------------- + +The feature extraction method needs to perform some data transformations, before it +can call the actual feature calculators. +If you want to optimize your data flow, you might want to have more control on how +exactly the feature calculation is added to you dask computation graph. + +Therefore, it is also possible to add the feature extraction directly: + + +.. code:: + + from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk + features = dask_feature_extraction_on_chunk(df_grouped, + column_id="id", + column_kind="kind", + column_sort="time", + column_value="value") + +In this case however, ``df_grouped`` must already be in the correct format. +Check out the documentation of :func:`tsfresh.convenience.bindings.dask_feature_extraction_on_chunk` +for more information. +No pivoting will be performed in this case. + +PySpark +------- + +Similar to dask, it is also possible to ass the feature extraction into a Spark +computation graph. +You can find more information in the documentation of :func:`tsfresh.convenience.bindings.spark_feature_extraction_on_chunk`. \ No newline at end of file diff --git a/docs/text/parallelization.rst b/docs/text/parallelization.rst deleted file mode 100644 index 0eca998db..000000000 --- a/docs/text/parallelization.rst +++ /dev/null @@ -1,42 +0,0 @@ -.. _parallelization-label: - -Parallelization -=============== - -The feature extraction as well as the feature selection offer the possibility of parallelization. -Out of the box both tasks are parallelized by tsfresh. However, the overhead introduced with the -parallelization should not be underestimated. Here we discuss the different settings to control -the parallelization. To achieve best results for your use-case you should experiment with the parameters. - -Please let us know about your results tuning the below mentioned parameters! It will help improve this document as -well as the default settings. - -Parallelization of Feature Selection ------------------------------------- - -We use a :class:`multiprocessing.Pool` to parallelize the calculation of the p-values for each feature. On -instantiation we set the Pool's number of worker processes to -`n_jobs`. This field defaults to -the number of processors on the current system. We recommend setting it to the maximum number of available (and -otherwise idle) processors. - -The chunksize of the Pool's map function is another important parameter to consider. It can be set via the -`chunksize` field. By default it is up to -:class:`multiprocessing.Pool` is parallelisation parameter. One data chunk is -defined as a singular time series for one id and one kind. The chunksize is the -number of chunks that are submitted as one task to one worker process. If you -set the chunksize to 10, then it means that one worker task corresponds to -calculate all features for 10 id/kind time series combinations. If it is set it -to None, depending on distributor, heuristics are used to find the optimal -chunksize. The chunksize can have an crucial influence on the optimal cluster -performance and should be optimised in benchmarks for the problem at hand. - -Parallelization of Feature Extraction -------------------------------------- - -For the feature extraction tsfresh exposes the parameters -`n_jobs` and `chunksize`. Both behave analogue to the parameters -for the feature selection. - -To do performance studies and profiling, it sometimes quite useful to turn off parallelization at all. This can be -setting the parameter `n_jobs` to 0. diff --git a/docs/text/tsfresh_on_a_cluster.rst b/docs/text/tsfresh_on_a_cluster.rst index 530d8bffb..7c0c7b22d 100644 --- a/docs/text/tsfresh_on_a_cluster.rst +++ b/docs/text/tsfresh_on_a_cluster.rst @@ -3,8 +3,54 @@ .. role:: python(code) :language: python -How to deploy tsfresh at scale -============================== +Parallelization +=============== + +The feature extraction, the feature selection as well as the rolling offer the possibility of parallelization. +By default, all of those tasks are parallelized by tsfresh. +Here we discuss the different settings to control the parallelization. +To achieve best results for your use-case you should experiment with the parameters. + +.. NOTE:: + This document describes parallelization for processing time speed up. + If you are dealing with large amounts of data (which might not fit into memory anymore), + you can also have a look into :ref:`large-data-label`. + +Please let us know about your results tuning the below mentioned parameters! It will help improve this document as +well as the default settings. + +Parallelization of Feature Selection +------------------------------------ + +We use a :class:`multiprocessing.Pool` to parallelize the calculation of the p-values for each feature. On +instantiation we set the Pool's number of worker processes to +`n_jobs`. This field defaults to +the number of processors on the current system. We recommend setting it to the maximum number of available (and +otherwise idle) processors. + +The chunksize of the Pool's map function is another important parameter to consider. It can be set via the +`chunksize` field. By default it is up to +:class:`multiprocessing.Pool` is parallelisation parameter. One data chunk is +defined as a singular time series for one id and one kind. The chunksize is the +number of chunks that are submitted as one task to one worker process. If you +set the chunksize to 10, then it means that one worker task corresponds to +calculate all features for 10 id/kind time series combinations. If it is set it +to None, depending on distributor, heuristics are used to find the optimal +chunksize. The chunksize can have an crucial influence on the optimal cluster +performance and should be optimised in benchmarks for the problem at hand. + +Parallelization of Feature Extraction +------------------------------------- + +For the feature extraction tsfresh exposes the parameters +`n_jobs` and `chunksize`. Both behave analogue to the parameters +for the feature selection. + +To do performance studies and profiling, it sometimes quite useful to turn off parallelization at all. This can be +setting the parameter `n_jobs` to 0. + +Parallelization beyond a single machine +--------------------------------------- The high volume of time series data can demand an analysis at scale. So, time series need to be processed on a group of computational units instead of a singular machine. @@ -13,9 +59,6 @@ Accordingly, it may be necessary to distribute the extraction of time series fea Indeed, it is possible to extract features with *tsfresh* in a distributed fashion. This page will explain how to setup a distributed *tsfresh*. -The distributor class -''''''''''''''''''''' - To distribute the calculation of features, we use a certain object, the Distributor class (contained in the :mod:`tsfresh.utilities.distribution` module). @@ -95,6 +138,12 @@ Using dask to distribute the calculations We provide distributor for the `dask framework `_, where *"Dask is a flexible parallel computing library for analytic computing."* +.. NOTE:: + This part of the documentation only handles parallelizing the computation using + a dask cluster. The input and output are still pandas objects. + If you want to use dask's capabilities to scale to data beyond your local + memory, have a look into :ref:`large-data-label`. + Dask is a great framework to distribute analytic calculations to a cluster. It scales up and down, meaning that you can even use it on a singular machine. The only thing that you will need to run *tsfresh* on a Dask cluster is the ip address and port number of the diff --git a/tests/integrations/examples/test_driftbif_simulation.py b/tests/integrations/examples/test_driftbif_simulation.py index bce59cd7d..ed06740da 100644 --- a/tests/integrations/examples/test_driftbif_simulation.py +++ b/tests/integrations/examples/test_driftbif_simulation.py @@ -5,9 +5,11 @@ import numpy as np import unittest import pandas as pd +import dask.dataframe as dd from tsfresh.examples.driftbif_simulation import velocity, load_driftbif, sample_tau -from tsfresh import extract_relevant_features +from tsfresh import extract_relevant_features, extract_features +from tsfresh.feature_extraction import MinimalFCParameters class DriftBifSimlationTestCase(unittest.TestCase): diff --git a/tests/integrations/test_feature_extraction.py b/tests/integrations/test_feature_extraction.py new file mode 100644 index 000000000..512bbba28 --- /dev/null +++ b/tests/integrations/test_feature_extraction.py @@ -0,0 +1,159 @@ +# This file as well as the whole tsfresh package are licenced under the MIT licence (see the LICENCE.txt) +# Maximilian Christ (maximilianchrist.com), Blue Yonder Gmbh, 2016 + +from unittest import TestCase + +import dask.dataframe as dd +import pandas as pd + +from tsfresh.examples.driftbif_simulation import load_driftbif +from tsfresh import extract_relevant_features, extract_features +from tsfresh.feature_extraction import MinimalFCParameters + + +class FeatureExtractionTestCase(TestCase): + def setUp(self): + df, y = load_driftbif(100, 10, classification=True, seed=42) + + df['my_id'] = df['id'].astype('str') + del df["id"] + + self.df = df + + def test_pandas(self): + df = self.df + + # Test shape and a single entry (to see if it works at all) + X = extract_features(df, column_id="my_id", column_sort="time", column_kind="dimension", column_value="value", + default_fc_parameters=MinimalFCParameters()) + self.assertIn("1__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "1__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 16)) + + X = extract_features(df, column_id="my_id", column_sort="time", column_kind="dimension", + default_fc_parameters=MinimalFCParameters()) + self.assertIn("1__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "1__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 16)) + + X = extract_features(df.drop(columns=["dimension"]), column_id="my_id", column_sort="time", + default_fc_parameters=MinimalFCParameters()) + self.assertIn("value__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "value__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 8)) + + X = extract_features(df.drop(columns=["dimension", "time"]), column_id="my_id", + default_fc_parameters=MinimalFCParameters()) + self.assertIn("value__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "value__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 8)) + + def test_pandas_no_pivot(self): + df = self.df + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", column_value="value", + pivot=False, + default_fc_parameters=MinimalFCParameters()) + X = pd.DataFrame(X, columns=["my_id", "variable", "value"]) + self.assertIn("1__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "1__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*16, 3)) + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", + pivot=False, + default_fc_parameters=MinimalFCParameters()) + X = pd.DataFrame(X, columns=["my_id", "variable", "value"]) + self.assertIn("1__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "1__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*16, 3)) + + X = extract_features(df.drop(columns=["dimension"]), column_id="my_id", + column_sort="time", + pivot=False, + default_fc_parameters=MinimalFCParameters()) + X = pd.DataFrame(X, columns=["my_id", "variable", "value"]) + self.assertIn("value__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "value__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*8, 3)) + + X = extract_features(df.drop(columns=["dimension", "time"]), column_id="my_id", + pivot=False, + default_fc_parameters=MinimalFCParameters()) + X = pd.DataFrame(X, columns=["my_id", "variable", "value"]) + self.assertIn("value__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "value__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*8, 3)) + + def test_dask(self): + df = dd.from_pandas(self.df, npartitions=1) + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", column_value="value", + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("1__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "1__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 16)) + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("1__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "1__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 16)) + + X = extract_features(df.drop(columns=["dimension"]), column_id="my_id", + column_sort="time", + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("value__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "value__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 8)) + + X = extract_features(df.drop(columns=["dimension", "time"]), column_id="my_id", + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("value__mean", X.columns) + self.assertAlmostEqual(X.loc["5", "value__mean"], 5.516e-05, 4) + self.assertIn("11", X.index) + self.assertEqual(X.shape, (100, 8)) + + def test_dask_no_pivot(self): + df = dd.from_pandas(self.df, npartitions=1) + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", column_value="value", + pivot=False, + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("1__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "1__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*16, 3)) + + X = extract_features(df, column_id="my_id", column_sort="time", + column_kind="dimension", + pivot=False, + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("1__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "1__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*16, 3)) + + X = extract_features(df.drop(columns=["dimension"]), column_id="my_id", + column_sort="time", + pivot=False, + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("value__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "value__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*8, 3)) + + X = extract_features(df.drop(columns=["dimension", "time"]), column_id="my_id", + pivot=False, + default_fc_parameters=MinimalFCParameters()).compute() + self.assertIn("value__mean", X["variable"].values) + self.assertAlmostEqual(X[(X["my_id"] == "5") & (X["variable"] == "value__mean")]["value"].iloc[0], 5.516e-05, 4) + self.assertEqual(X.shape, (100*8, 3)) diff --git a/tests/units/feature_extraction/test_data.py b/tests/units/feature_extraction/test_data.py index e5d9ae6f2..da89c8629 100644 --- a/tests/units/feature_extraction/test_data.py +++ b/tests/units/feature_extraction/test_data.py @@ -2,9 +2,18 @@ import numpy as np import pandas as pd +from dask import dataframe as dd +from unittest import TestCase +from unittest.mock import Mock from tests.fixtures import DataTestCase -from tsfresh.feature_extraction.data import to_tsdata, LongTsFrameAdapter, WideTsFrameAdapter, TsDictAdapter +from tsfresh.feature_extraction.data import ( + to_tsdata, + LongTsFrameAdapter, + WideTsFrameAdapter, + TsDictAdapter, + PartitionedTsData +) from tsfresh.utilities.distribution import MultiprocessingDistributor TEST_DATA_EXPECTED_TUPLES = \ @@ -71,14 +80,6 @@ def test_dict_tsframe(self): def assert_tsdata(self, data, expected): self.assertEqual(len(data), len(expected)) self.assertEqual(sum(1 for _ in data), len(data)) - self.assertEqual(sum(1 for _ in data.partition(1)), len(expected)) - self.assertEqual(sum(1 for _ in data.partition(2)), math.ceil(len(expected) / 2)) - self.assertEqual(sum(1 for _ in data.partition(3)), math.ceil(len(expected) / 3)) - self.assertEqual((sum(sum(1 for _ in g) for g in data.partition(1))), len(data)) - first_partition = next(data.partition(1)) - self.assertEqual(len(first_partition), 1) - first_partition = next(data.partition(4)) - self.assertEqual(len(first_partition), 4) self.assert_data_chunk_object_equal(data, expected) def assert_data_chunk_object_equal(self, result, expected): @@ -143,20 +144,92 @@ def test_wide_dataframe_order_preserved_with_sort_column(self): ("b", 'v2', pd.Series([11], index=[2], name="v2"))] self.assert_data_chunk_object_equal(result, expected) + def test_dask_dataframe_with_kind(self): + test_df = dd.from_pandas(pd.DataFrame({ + "id": [1, 2], + "kind": ["a", "a"], + "value": [1, 2] + }), npartitions=1) + + result = to_tsdata(test_df, column_id="id", column_kind="kind") + self.assertEqual(result.column_id, "id") + self.assertEqual(result.column_kind, "kind") + self.assertEqual(result.column_value, "value") + + def test_f(chunk): + return pd.DataFrame({"id": chunk[0], "variable": chunk[1], "value": chunk[2]}) + + return_f = result.apply(test_f, meta=(("id", "int"), ("variable", "int"), ("value", "int"))).compute() + pd.testing.assert_frame_equal(return_f, pd.DataFrame({ + "id": [1, 2], + "variable": ["a", "a"], + "value": [1.0, 2.0] + })) + + def test_dask_dataframe_without_kind(self): + test_df = dd.from_pandas(pd.DataFrame({ + "id": [1, 2], + "value_a": [1, 2], + "value_b": [3, 4] + }), npartitions=1) + + result = to_tsdata(test_df, column_id="id") + self.assertEqual(result.column_id, "id") + + def test_f(chunk): + return pd.DataFrame({"id": chunk[0], "variable": chunk[1], "value": chunk[2]}) + + return_f = result.apply(test_f, meta=(("id", "int"), ("variable", "int"), ("value", "int"))).compute() + pd.testing.assert_frame_equal(return_f.reset_index(drop=True), pd.DataFrame({ + "id": [1, 2, 1, 2], + "variable": ["value_a", "value_a", "value_b", "value_b"], + "value": [1.0, 2.0, 3.0, 4.0] + })) + + test_df = dd.from_pandas(pd.DataFrame({ + "id": [1, 1], + "sort": [2, 1], + "value_a": [1, 2], + "value_b": [3, 4] + }), npartitions=1) + + result = to_tsdata(test_df, column_id="id", column_sort="sort") + self.assertEqual(result.column_id, "id") + + def test_f(chunk): + return pd.DataFrame({"id": chunk[0], "variable": chunk[1], "value": chunk[2]}) + + return_f = result.apply(test_f, meta=(("id", "int"), ("variable", "int"), ("value", "int"))).compute() + + pd.testing.assert_frame_equal(return_f.reset_index(drop=True), pd.DataFrame({ + "id": [1, 1, 1, 1], + "variable": ["value_a", "value_a", "value_b", "value_b"], + "value": [2.0, 1.0, 4.0, 3.0] + })) + def test_with_wrong_input(self): test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": np.NaN}]) self.assertRaises(ValueError, to_tsdata, test_df, "id", "kind", "value", "sort") test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": 1}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "strange_id", "kind", "value", "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "strange_id", "kind", "value", "sort") test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "value_2": 1, "sort": 1}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "strange_id", "kind", None, "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "strange_id", "kind", None, "sort") test_df = pd.DataFrame([{"id": 0, "kind": "a", "value": 3, "sort": 1}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "id", "strange_kind", "value", "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "id", "strange_kind", "value", "sort") @@ -169,13 +242,18 @@ def test_with_wrong_input(self): "id", "kind", "value", "sort") test_df = pd.DataFrame([{"id": 2}, {"id": 1}]) + test_dd = dd.from_pandas(test_df, npartitions=1) test_dict = {"a": test_df, "b": test_df} # column_id needs to be given self.assertRaises(ValueError, to_tsdata, test_df, None, "a", "b", None) + self.assertRaises(ValueError, to_tsdata, test_dd, + None, "a", "b", None) self.assertRaises(ValueError, to_tsdata, test_df, None, "a", "b", "a") + self.assertRaises(ValueError, to_tsdata, test_dd, + None, "a", "b", "a") self.assertRaises(ValueError, to_tsdata, test_dict, None, "a", "b", None) self.assertRaises(ValueError, to_tsdata, test_dict, @@ -201,21 +279,86 @@ def test_with_wrong_input(self): None, None, "value", None) test_df = pd.DataFrame([{"id": 0, "a_": 3, "b": 5, "sort": 1}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "id", None, None, "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "id", None, None, "sort") test_df = pd.DataFrame([{"id": 0, "a__c": 3, "b": 5, "sort": 1}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "id", None, None, "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "id", None, None, "sort") test_df = pd.DataFrame([{"id": 0}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "id", None, None, None) + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "id", None, None, None) test_df = pd.DataFrame([{"id": 0, "sort": 0}]) + self.assertRaises(ValueError, to_tsdata, test_df, + "id", None, None, "sort") + test_df = dd.from_pandas(test_df, npartitions=1) self.assertRaises(ValueError, to_tsdata, test_df, "id", None, None, "sort") test_df = [1, 2, 3] self.assertRaises(ValueError, to_tsdata, test_df, "a", "b", "c", "d") + + +class PivotListTestCase(TestCase): + def test_empty_list(self): + mock_ts_data = Mock() + mock_ts_data.df_id_type = str + + return_df = PartitionedTsData.pivot(mock_ts_data, []) + + self.assertEqual(len(return_df), 0) + self.assertEqual(len(return_df.index), 0) + self.assertEqual(len(return_df.columns), 0) + + def test_different_input(self): + mock_ts_data = Mock() + mock_ts_data.df_id_type = str + + input_list = [ + ("a", "b", 1), + ("a", "c", 2), + ("A", "b", 3), + ("A", "c", 4), + ("X", "Y", 5), + ] + return_df = PartitionedTsData.pivot(mock_ts_data, input_list) + + self.assertEqual(len(return_df), 3) + self.assertEqual(set(return_df.index), {"a", "A", "X"}) + self.assertEqual(set(return_df.columns), {"b", "c", "Y"}) + + self.assertEqual(return_df.loc["a", "b"], 1) + self.assertEqual(return_df.loc["a", "c"], 2) + self.assertEqual(return_df.loc["A", "b"], 3) + self.assertEqual(return_df.loc["A", "c"], 4) + self.assertEqual(return_df.loc["X", "Y"], 5) + + def test_long_input(self): + mock_ts_data = Mock() + mock_ts_data.df_id_type = str + + input_list = [] + for i in range(100): + for j in range(100): + input_list.append((i, j, i*j)) + + return_df = PartitionedTsData.pivot(mock_ts_data, input_list) + + self.assertEqual(len(return_df), 100) + self.assertEqual(len(return_df.columns), 100) + # every cell should be filled + self.assertEqual(np.sum(np.sum(np.isnan(return_df))), 0) + # test if all entries are there + self.assertEqual(return_df.sum().sum(), 24502500) diff --git a/tests/units/feature_extraction/test_extraction.py b/tests/units/feature_extraction/test_extraction.py index ab0178618..ec6b391eb 100644 --- a/tests/units/feature_extraction/test_extraction.py +++ b/tests/units/feature_extraction/test_extraction.py @@ -12,7 +12,7 @@ from tests.fixtures import DataTestCase from tsfresh.feature_extraction.extraction import extract_features from tsfresh.feature_extraction.settings import ComprehensiveFCParameters -from tsfresh.utilities.distribution import DistributorBaseClass +from tsfresh.utilities.distribution import IterableDistributorBaseClass, MapDistributor class ExtractionTestCase(DataTestCase): @@ -241,29 +241,28 @@ def setUp(self): # only calculate some features to reduce load on travis ci self.name_to_param = {"maximum": None} - def test_assert_is_distributor(self): + def test_distributor_map_reduce_is_called(self): df = self.create_test_data_sample() - self.assertRaises(ValueError, extract_features, - timeseries_container=df, column_id="id", column_sort="sort", - column_kind="kind", column_value="val", - default_fc_parameters=self.name_to_param, distributor=object()) + mock = Mock(spec=IterableDistributorBaseClass) + mock.close.return_value = None + mock.map_reduce.return_value = [] - self.assertRaises(ValueError, extract_features, - timeseries_container=df, column_id="id", column_sort="sort", - column_kind="kind", column_value="val", - default_fc_parameters=self.name_to_param, distributor=13) + X = extract_features(timeseries_container=df, column_id="id", column_sort="sort", + column_kind="kind", column_value="val", + default_fc_parameters=self.name_to_param, distributor=mock) - def test_distributor_map_reduce_and_close_are_called(self): + self.assertTrue(mock.map_reduce.called) + + def test_distributor_close_is_called(self): df = self.create_test_data_sample() - mock = Mock(spec=DistributorBaseClass) + mock = MapDistributor() + mock.close = Mock() mock.close.return_value = None - mock.map_reduce.return_value = [] X = extract_features(timeseries_container=df, column_id="id", column_sort="sort", column_kind="kind", column_value="val", default_fc_parameters=self.name_to_param, distributor=mock) self.assertTrue(mock.close.called) - self.assertTrue(mock.map_reduce.called) diff --git a/tests/units/utilities/test_dataframe_functions.py b/tests/units/utilities/test_dataframe_functions.py index d1f9ce240..5130875d8 100644 --- a/tests/units/utilities/test_dataframe_functions.py +++ b/tests/units/utilities/test_dataframe_functions.py @@ -900,47 +900,3 @@ def test_dict_input(self): self.assertEqual(list(extended_dataframe["id"]), [(0, 1), (0, 1), (1, 1), (1, 1), (0, 2), (0, 2), (1, 2), (1, 2), (2, 2)]) assert_series_equal(dataframe["value"], extended_dataframe["value"]) - - -class PivotListTestCase(TestCase): - def test_empty_list(self): - return_df = dataframe_functions.pivot_list([]) - - self.assertEqual(len(return_df), 0) - self.assertEqual(len(return_df.index), 0) - self.assertEqual(len(return_df.columns), 0) - - def test_different_input(self): - input_list = [ - ("a", "b", 1), - ("a", "c", 2), - ("A", "b", 3), - ("A", "c", 4), - ("X", "Y", 5), - ] - return_df = dataframe_functions.pivot_list(input_list) - - self.assertEqual(len(return_df), 3) - self.assertEqual(set(return_df.index), {"a", "A", "X"}) - self.assertEqual(set(return_df.columns), {"b", "c", "Y"}) - - self.assertEqual(return_df.loc["a", "b"], 1) - self.assertEqual(return_df.loc["a", "c"], 2) - self.assertEqual(return_df.loc["A", "b"], 3) - self.assertEqual(return_df.loc["A", "c"], 4) - self.assertEqual(return_df.loc["X", "Y"], 5) - - def test_long_input(self): - input_list = [] - for i in range(100): - for j in range(100): - input_list.append((i, j, i*j)) - - return_df = dataframe_functions.pivot_list(input_list) - - self.assertEqual(len(return_df), 100) - self.assertEqual(len(return_df.columns), 100) - # every cell should be filled - self.assertEqual(np.sum(np.sum(np.isnan(return_df))), 0) - # test if all entries are there - self.assertEqual(return_df.sum().sum(), 24502500) diff --git a/tsfresh/feature_extraction/data.py b/tsfresh/feature_extraction/data.py index 66db256ee..15da29384 100644 --- a/tsfresh/feature_extraction/data.py +++ b/tsfresh/feature_extraction/data.py @@ -1,8 +1,28 @@ import itertools -from collections import namedtuple -from typing import Generator, Iterable, Sized +from collections import namedtuple, defaultdict +from typing import Iterable, Sized import pandas as pd +try: + from dask import dataframe as dd +except ImportError: # pragma: no cover + dd = None + + +def _binding_helper(f, kwargs, column_sort, column_id, column_kind, column_value): + def wrapped_feature_extraction(x): + if column_sort is not None: + x = x.sort_values(column_sort) + + chunk = Timeseries(x[column_id].iloc[0], x[column_kind].iloc[0], x[column_value]) + result = f(chunk, **kwargs) + + result = pd.DataFrame(result, columns=[column_id, "variable", "value"]) + result["value"] = result["value"].astype("double") + + return result[[column_id, "variable", "value"]] + + return wrapped_feature_extraction class Timeseries(namedtuple('Timeseries', ['id', 'kind', 'data'])): @@ -14,95 +34,64 @@ class Timeseries(namedtuple('Timeseries', ['id', 'kind', 'data'])): """ -class TsData(Iterable[Timeseries], Sized): +class TsData: """ TsData provides access to time series data for internal usage. - Implementations must implement `__iter__` and `__len__`. - Other methods should be overwritten if a more efficient solution exists for the underlying data store. + Distributors will use this data class to apply functions on the data. + All derived classes must either implement the `apply` method, + which is used to apply the given function directly on the data + or the __iter__ method, which can be used to get an iterator of + Timeseries instances (which distributors can use to apply the function on). + Other methods can be overwritten if a more efficient solution exists for the underlying data store. """ - - def __iter__(self): - raise NotImplementedError - - def __len__(self): - raise NotImplementedError - - def partition(self, chunk_size): - """ - Split the data into iterable chunks of given `chunk_size`. - - :param chunk_size: the size of the chunks - :type chunk_size: int - - :return: chunks with at most `chunk_size` items - :rtype: Generator[Iterable[Timeseries]] - """ - iterable = iter(self) - while True: - next_chunk = list(itertools.islice(iterable, chunk_size)) - if not next_chunk: - return - - yield next_chunk + pass -class SliceableTsData(TsData): +class PartitionedTsData(Iterable[Timeseries], Sized, TsData): """ - SliceableTsData uses a slice strategy to implement data partitioning. - Implementations of `TsData` may extend this class if they can iterate their elements in deterministic order and - if an efficient strategy exists to slice the data. - - Because `__iter__` defaults to `slice(0)`, implementations must only implement `slice`. + Special class of TsData, which can be partitioned. + Derived classes should implement __iter__ and __len__. """ + def __init__(self, df, column_id): + self.df_id_type = df[column_id].dtype - def slice(self, offset, length=None): + def pivot(self, results): """ - Get a subset of the timeseries elements + Helper function to turn an iterable of tuples with three entries into a dataframe. - :param offset: the offset in the sequence of elements - :type offset: int + The input ``list_of_tuples`` needs to be an iterable with tuples containing three + entries: (a, b, c). + Out of this, a pandas dataframe will be created with all a's as index, + all b's as columns and all c's as values. - :param length: if not `None`, the maximum number of elements the slice will yield - :type length: int + It basically does a pd.pivot(first entry, second entry, third entry), + but optimized for non-pandas input (= python list of tuples). - :return: a slice of the data - :rtype: Iterable[Timeseries] + This function is called in the end of the extract_features call. """ - raise NotImplementedError + return_df_dict = defaultdict(dict) + for chunk_id, variable, value in results: + # we turn it into a nested mapping `column -> index -> value` + return_df_dict[variable][chunk_id] = value - def __iter__(self): - return self.slice(0) + # the mapping column -> {index -> value} + # is now a dict of dicts. The pandas dataframe + # constructor will peel this off: + # first, the keys of the outer dict (the column) + # will turn into a column header and the rest into a column + # the rest is {index -> value} which will be turned into a + # column with index. + # All index will be aligned. + return_df = pd.DataFrame(return_df_dict, dtype=float) - def partition(self, chunk_size): - div, mod = divmod(len(self), chunk_size) - chunk_info = [(i * chunk_size, chunk_size) for i in range(div)] - if mod > 0: - chunk_info += [(div * chunk_size, mod)] + # copy the type of the index + return_df.index = return_df.index.astype(self.df_id_type) - for offset, length in chunk_info: - yield _Slice(self, offset, length) + # Sort by index to be backward compatible + return_df = return_df.sort_index() - -class _Slice(Iterable[Timeseries]): - def __init__(self, data, offset, length): - """ - Wraps the `slice` generator function as an iterable object which can be pickled and passed to the distributor - backend. - - :type data: SliceableTsData - :type offset: int - :type length: int - """ - self.data = data - self.offset = offset - self.length = length - - def __iter__(self): - return self.data.slice(self.offset, self.length) - - def __len__(self): - return self.length + return return_df def _check_colname(*columns): @@ -156,7 +145,7 @@ def _get_value_columns(df, *other_columns): return value_columns -class WideTsFrameAdapter(SliceableTsData): +class WideTsFrameAdapter(PartitionedTsData): def __init__(self, df, column_id, column_sort=None, value_columns=None): """ Adapter for Pandas DataFrames in wide format, where multiple columns contain different time series for @@ -190,31 +179,25 @@ def __init__(self, df, column_id, column_sort=None, value_columns=None): if column_sort is not None: _check_nan(df, column_sort) - self.df_grouped = df.sort_values([column_sort]).groupby([column_id]) - else: - self.df_grouped = df.groupby([column_id]) + + self.column_sort = column_sort + self.df_grouped = df.groupby([column_id]) + + super().__init__(df, column_id) def __len__(self): return self.df_grouped.ngroups * len(self.value_columns) - def slice(self, offset, length=None): - if 0 < offset >= len(self): - raise ValueError("offset out of range: {}".format(offset)) - - group_offset, column_offset = divmod(offset, len(self.value_columns)) - kinds = self.value_columns[column_offset:] - i = 0 + def __iter__(self): + for group_name, group in self.df_grouped: + if self.column_sort is not None: + group = group.sort_values(self.column_sort) - for group_name, group in itertools.islice(self.df_grouped, group_offset, None): - for kind in kinds: - i += 1 - if length is not None and i > length: - return + for kind in self.value_columns: yield Timeseries(group_name, kind, group[kind]) - kinds = self.value_columns -class LongTsFrameAdapter(TsData): +class LongTsFrameAdapter(PartitionedTsData): def __init__(self, df, column_id, column_kind, column_value=None, column_sort=None): """ Adapter for Pandas DataFrames in long format, where different time series for the same id are @@ -252,23 +235,26 @@ def __init__(self, df, column_id, column_kind, column_value=None, column_sort=No self.column_value = column_value _check_nan(df, column_id, column_kind, self.column_value) - _check_colname(column_kind) if column_sort is not None: _check_nan(df, column_sort) - self.df_grouped = df.sort_values([column_sort]).groupby([column_id, column_kind]) - else: - self.df_grouped = df.groupby([column_id, column_kind]) + + self.column_sort = column_sort + self.df_grouped = df.groupby([column_id, column_kind]) + + super().__init__(df, column_id) def __len__(self): return len(self.df_grouped) def __iter__(self): for group_key, group in self.df_grouped: + if self.column_sort is not None: + group = group.sort_values(self.column_sort) yield Timeseries(group_key[0], str(group_key[1]), group[self.column_value]) -class TsDictAdapter(TsData): +class TsDictAdapter(PartitionedTsData): def __init__(self, ts_dict, column_id, column_value, column_sort=None): """ Adapter for a dict, which maps different time series kinds to Pandas DataFrames. @@ -285,7 +271,6 @@ def __init__(self, ts_dict, column_id, column_value, column_sort=None): :param column_sort: the name of the column to sort on :type column_sort: str|None """ - _check_colname(*list(ts_dict.keys())) for df in ts_dict.values(): _check_nan(df, column_id, column_value) @@ -301,6 +286,8 @@ def __init__(self, ts_dict, column_id, column_value, column_sort=None): else: self.grouped_dict = {key: df.groupby(column_id) for key, df in ts_dict.items()} + super().__init__(df, column_id) + def __iter__(self): for kind, grouped_df in self.grouped_dict.items(): for ts_id, group in grouped_df: @@ -310,6 +297,91 @@ def __len__(self): return sum(grouped_df.ngroups for grouped_df in self.grouped_dict.values()) +class DaskTsAdapter(TsData): + def __init__(self, df, column_id, column_kind=None, column_value=None, column_sort=None): + if column_id is None: + raise ValueError("column_id must be set") + + if column_id not in df.columns: + raise ValueError(f"Column not found: {column_id}") + + # Get all columns, which are not id, kind or sort + possible_value_columns = _get_value_columns(df, column_id, column_sort, column_kind) + + # The user has already a kind column. That means we just need to group by id (and additionally by id) + if column_kind is not None: + if column_kind not in df.columns: + raise ValueError(f"Column not found: {column_kind}") + + self.df = df.groupby([column_id, column_kind]) + + # We assume the last remaining column is the value - but there needs to be one! + if column_value is None: + if len(possible_value_columns) != 1: + raise ValueError("Could not guess the value column! Please hand it to the function as an argument.") + column_value = possible_value_columns[0] + else: + # Ok, the user has no kind, so it is in Wide format. + # That means we have do melt before we can group. + # TODO: here is some room for optimization! + # we could choose the same way as for the Wide and LongTsAdapter + + # We first choose a name for our future kind column + column_kind = "kind" + + # if the user has specified a value column, use it + # if not, just go with every remaining columns + if column_value is not None: + value_vars = [column_value] + else: + value_vars = possible_value_columns + column_value = "value" + + _check_colname(*value_vars) + + id_vars = [column_id, column_sort] if column_sort else [column_id] + + # Now melt and group + df_melted = df.melt(id_vars=id_vars, value_vars=value_vars, + var_name=column_kind, value_name=column_value) + + self.df = df_melted.groupby([column_id, column_kind]) + + self.column_id = column_id + self.column_kind = column_kind + self.column_value = column_value + self.column_sort = column_sort + + def apply(self, f, meta, **kwargs): + """ + Apply the wrapped feature extraction function "f" + onto the data. + Before that, turn the data into the correct form of Timeseries instances + usable the the feature extraction. + After the call, turn it back into pandas dataframes + for further processing. + """ + bound_function = _binding_helper(f, kwargs, self.column_sort, self.column_id, + self.column_kind, self.column_value) + return self.df.apply(bound_function, meta=meta) + + def pivot(self, results): + """ + The extract features function for dask returns a + dataframe of [id, variable, value]. + Turn this into a pivoted dataframe, where only the variables are the columns + and the ids are the rows. + + Attention: this is highly non-optimized! + """ + results = results.reset_index(drop=True).persist() + results = results.categorize(columns=["variable"]) + feature_table = results.pivot_table(index=self.column_id, columns="variable", + values="value", aggfunc="sum") + + return feature_table + + def to_tsdata(df, column_id=None, column_kind=None, column_value=None, column_sort=None): """ Wrap supported data formats as a TsData object, i.e. an iterable of individual time series. @@ -349,7 +421,6 @@ def to_tsdata(df, column_id=None, column_kind=None, column_value=None, column_so :return: a data adapter :rtype: TsData """ - if isinstance(df, TsData): return df @@ -365,6 +436,9 @@ def to_tsdata(df, column_id=None, column_kind=None, column_value=None, column_so elif isinstance(df, dict): return TsDictAdapter(df, column_id, column_value, column_sort) + elif dd and isinstance(df, dd.DataFrame): + return DaskTsAdapter(df, column_id, column_kind, column_value, column_sort) + else: raise ValueError("df must be a DataFrame or a dict of DataFrames. " "See https://tsfresh.readthedocs.io/en/latest/text/data_formats.html") diff --git a/tsfresh/feature_extraction/extraction.py b/tsfresh/feature_extraction/extraction.py index 0aafd01d8..485aecbc8 100644 --- a/tsfresh/feature_extraction/extraction.py +++ b/tsfresh/feature_extraction/extraction.py @@ -7,6 +7,7 @@ import logging import warnings +from collections.abc import Iterable import pandas as pd @@ -16,9 +17,8 @@ from tsfresh.feature_extraction.settings import ComprehensiveFCParameters from tsfresh.utilities import profiling from tsfresh.utilities.distribution import MapDistributor, MultiprocessingDistributor, \ - DistributorBaseClass + DistributorBaseClass, ApplyDistributor from tsfresh.utilities.string_manipulation import convert_to_output_format -from tsfresh.utilities.dataframe_functions import pivot_list _logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ def extract_features(timeseries_container, default_fc_parameters=None, profile=defaults.PROFILING, profiling_filename=defaults.PROFILING_FILENAME, profiling_sorting=defaults.PROFILING_SORTING, - distributor=None): + distributor=None, pivot=True): """ Extract features from @@ -157,7 +157,8 @@ def extract_features(timeseries_container, default_fc_parameters=None, show_warnings=show_warnings, default_fc_parameters=default_fc_parameters, kind_to_fc_parameters=kind_to_fc_parameters, - distributor=distributor) + distributor=distributor, + pivot=pivot) # Impute the result if requested if impute_function is not None: @@ -173,7 +174,8 @@ def extract_features(timeseries_container, default_fc_parameters=None, def _do_extraction(df, column_id, column_value, column_kind, column_sort, default_fc_parameters, kind_to_fc_parameters, - n_jobs, chunk_size, disable_progressbar, show_warnings, distributor): + n_jobs, chunk_size, disable_progressbar, show_warnings, distributor, + pivot): """ Wrapper around the _do_extraction_on_chunk, which calls it on all chunks in the data frame. A chunk is a subset of the data, with a given kind and id - so a single time series. @@ -229,14 +231,18 @@ def _do_extraction(df, column_id, column_value, column_kind, column_sort, data = to_tsdata(df, column_id, column_kind, column_value, column_sort) if distributor is None: - if n_jobs == 0: - distributor = MapDistributor(disable_progressbar=disable_progressbar, - progressbar_title="Feature Extraction") + if isinstance(data, Iterable): + if n_jobs == 0: + distributor = MapDistributor(disable_progressbar=disable_progressbar, + progressbar_title="Feature Extraction") + else: + distributor = MultiprocessingDistributor(n_workers=n_jobs, + disable_progressbar=disable_progressbar, + progressbar_title="Feature Extraction", + show_warnings=show_warnings) else: - distributor = MultiprocessingDistributor(n_workers=n_jobs, - disable_progressbar=disable_progressbar, - progressbar_title="Feature Extraction", - show_warnings=show_warnings) + distributor = ApplyDistributor(meta=[(data.column_id, 'int64'), ('variable', 'object'), + ('value', 'float64')]) if not isinstance(distributor, DistributorBaseClass): raise ValueError("the passed distributor is not an DistributorBaseClass object") @@ -247,16 +253,11 @@ def _do_extraction(df, column_id, column_value, column_kind, column_sort, result = distributor.map_reduce(_do_extraction_on_chunk, data=data, chunk_size=chunk_size, function_kwargs=kwargs) - distributor.close() - - return_df = pivot_list(result, dtype=float) - - # copy the type of the index - return_df.index = return_df.index.astype(df[column_id].dtype) - # Sort by index to be backward compatible - return_df = return_df.sort_index() + if not pivot: + return result + return_df = data.pivot(result) return return_df diff --git a/tsfresh/utilities/dataframe_functions.py b/tsfresh/utilities/dataframe_functions.py index 0cc7e3a4f..2e40262f6 100644 --- a/tsfresh/utilities/dataframe_functions.py +++ b/tsfresh/utilities/dataframe_functions.py @@ -6,7 +6,6 @@ (see ``normalize_input_to_internal_representation``) or on how to handle ``NaN`` and ``inf`` in the DataFrames. """ import warnings -from collections import defaultdict import numpy as np import pandas as pd @@ -644,31 +643,3 @@ def _add_id_column(df_chunk): df = df.set_index(df.index.get_level_values(-1)) return df - - -def pivot_list(list_of_tuples, **kwargs): - """ - Helper function to turn an iterable of tuples with three entries into a dataframe. - - The input ``list_of_tuples`` needs to be an iterable with tuples containing three - entries: (a, b, c). - Out of this, a pandas dataframe will be created with all a's as index, - all b's as columns and all c's as values. - - It basically does a pd.pivot(first entry, second entry, third entry), - but optimized for non-pandas input (= python list of tuples). - """ - return_df_dict = defaultdict(dict) - for chunk_id, variable, value in list_of_tuples: - # we turn it into a nested mapping `column -> index -> value` - return_df_dict[variable][chunk_id] = value - - # the mapping column -> {index -> value} - # is now a dict of dicts. The pandas dataframe - # constructor will peel this off: - # first, the keys of the outer dict (the column) - # will turn into a column header and the rest into a column - # the rest is {index -> value} which will be turned into a - # column with index. - # All index will be aligned. - return pd.DataFrame(return_df_dict, **kwargs) diff --git a/tsfresh/utilities/distribution.py b/tsfresh/utilities/distribution.py index acd35358c..17f720f88 100644 --- a/tsfresh/utilities/distribution.py +++ b/tsfresh/utilities/distribution.py @@ -14,6 +14,7 @@ from collections.abc import Generator, Iterable from functools import partial from multiprocessing import Pool +from itertools import takewhile, islice, repeat from tqdm import tqdm @@ -67,9 +68,41 @@ class DistributorBaseClass: The main purpose of the instances of the DistributorBaseClass subclasses is to evaluate a function (called map_function) on a list of data items (called data). + Dependent on the implementation of the distribute function, this is done in parallel or using a cluster of nodes. + """ + def map_reduce(self, map_function, data, function_kwargs=None, chunk_size=None, data_length=None): + """ + This method contains the core functionality of the DistributorBaseClass class. + + It maps the map_function to each element of the data and reduces the results to return a flattened list. + + It needs to be implemented for each of the subclasses. + + :param map_function: a function to apply to each data item. + :type map_function: callable + :param data: the data to use in the calculation + :type data: iterable + :param function_kwargs: parameters for the map function + :type function_kwargs: dict of string to parameter + :param chunk_size: If given, chunk the data according to this size. If not given, use an empirical value. + :type chunk_size: int + :param data_length: If the data is a generator, you have to set the length here. If it is none, the + length is deduced from the len of the data. + :type data_length: int + + :return: the calculated results + :rtype: list + """ + raise NotImplementedError + + +class IterableDistributorBaseClass(DistributorBaseClass): + """ + Distributor Base Class that can handle all iterable items and calculate + a map_function on each item separately. + This is done on chunks of the data, meaning, that the DistributorBaseClass classes will chunk the data into chunks, - distribute the data and apply the feature calculator functions from - :mod:`tsfresh.feature_extraction.feature_calculators` on the time series. + distribute the data and apply the map_function functions on the items separately. Dependent on the implementation of the distribute function, this is done in parallel or using a cluster of nodes. """ @@ -80,6 +113,13 @@ def partition(data, chunk_size): This generator partitions an iterable into slices of length `chunk_size`. If the chunk size is not a divider of the data length, the last slice will be shorter. + Taken from + https://stackoverflow.com/questions/1915170/split-a-generator-iterable-every-n-items-in-python-splitevery + + The important part here is, that the iterable is only + traversed once and the chunks are produced one at a time. + This is good for both memory as well as speed. + :param data: The data to partition. :type data: Iterable :param chunk_size: The chunk size. The last chunk might be smaller. @@ -88,19 +128,14 @@ def partition(data, chunk_size): :return: A generator producing the chunks of data. :rtype: Generator[Iterable] """ + # Make sure we have an iterable + iterator = iter(data) - if isinstance(data, TsData): - return data.partition(chunk_size) - else: - def partition_iterable(): - iterable = iter(data) - while True: - next_chunk = list(itertools.islice(iterable, chunk_size)) - if not next_chunk: - return - yield next_chunk - - return partition_iterable() + # takewhile(true, ...) generates an iterator until the items are empty + # (= we have reached the end) + # The islice(iterator, n) gets the next n elements from the iterator. + # The list(...) makes sure we do not pass + return takewhile(bool, (list(islice(iterator, chunk_size)) for _ in repeat(None))) def __init__(self): """ @@ -154,6 +189,9 @@ def map_reduce(self, map_function, data, function_kwargs=None, chunk_size=None, :return: the calculated results :rtype: list """ + if not isinstance(data, Iterable): + raise ValueError("You passed data, which can not be handled by this distributor!") + if data_length is None: data_length = len(data) @@ -174,6 +212,8 @@ def map_reduce(self, map_function, data, function_kwargs=None, chunk_size=None, result = list(itertools.chain.from_iterable(result)) + self.close() + return result def distribute(self, func, partitioned_chunks, kwargs): @@ -201,7 +241,7 @@ def close(self): pass -class MapDistributor(DistributorBaseClass): +class MapDistributor(IterableDistributorBaseClass): """ Distributor using the python build-in map, which calculates each job sequentially one after the other. """ @@ -245,7 +285,7 @@ def calculate_best_chunk_size(self, data_length): return 1 -class LocalDaskDistributor(DistributorBaseClass): +class LocalDaskDistributor(IterableDistributorBaseClass): """ Distributor using a local dask cluster and inproc communication. """ @@ -299,7 +339,7 @@ def close(self): self.client.close() -class ClusterDaskDistributor(DistributorBaseClass): +class ClusterDaskDistributor(IterableDistributorBaseClass): """ Distributor using a dask cluster, meaning that the calculation is spread over a cluster """ @@ -358,7 +398,7 @@ def close(self): self.client.close() -class MultiprocessingDistributor(DistributorBaseClass): +class MultiprocessingDistributor(IterableDistributorBaseClass): """ Distributor using a multiprocessing Pool to calculate the jobs in parallel on the local machine. """ @@ -406,3 +446,11 @@ def close(self): self.pool.close() self.pool.terminate() self.pool.join() + + +class ApplyDistributor(DistributorBaseClass): + def __init__(self, meta): + self.meta = meta + + def map_reduce(self, map_function, data, function_kwargs=None, chunk_size=None, data_length=None): + return data.apply(map_function, meta=self.meta, **function_kwargs)