Skip to content

Commit

Permalink
add ResultDocumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 26, 2024
1 parent dbc3cd9 commit 5fd2add
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 9 deletions.
15 changes: 15 additions & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ class ProcessorResult:
"""
Result object to be returned by every processor. It contains the processor name, created data
and errors (incl. warnings).
Parameters
----------
processor_name : str
The name of the processor
event: Optional[dict]
A reference to the event that was processed
data : Optional[list]
The generated extra data
errors : Optional[list]
The errors that occurred during processing
warnings : Optional[list]
The warnings that occurred during processing
"""

data: list = field(validator=validators.instance_of(list), factory=list)
Expand Down
38 changes: 30 additions & 8 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,48 @@
@attrs.define(kw_only=True)
class PipelineResult:
"""Result object to be returned after processing the event.
It contains all results of each processor of the pipeline."""
It contains all results of each processor of the pipeline.
Parameters
----------
event : dict
The event that was processed
pipeline : List[Processor]
The pipeline that processed the event
Returns
-------
PipelineResult
The result object
"""

results: List[ProcessorResult] = attrs.field(
validator=[
attrs.validators.instance_of((list, Generator)),
attrs.validators.instance_of(list),
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(ProcessorResult)
),
]
],
init=False,
)
"""List of ProcessorResults"""
"""List of ProcessorResults. Is populated in __attrs_post_init__"""
event: dict = attrs.field(validator=attrs.validators.instance_of(dict))
"""The event that was processed"""
event_received: dict = attrs.field(
validator=attrs.validators.instance_of(dict), converter=copy.deepcopy
)
"""The event that was received"""
pipeline: list[Processor]

pipeline: list[Processor] = attrs.field(
validator=[
attrs.validators.deep_iterable(
member_validator=attrs.validators.instance_of(Processor),
iterable_validator=attrs.validators.instance_of(list),
),
attrs.validators.min_len(1),
]
)
"""The pipeline that processed the event"""

@cached_property
Expand All @@ -84,9 +108,7 @@ def data(self) -> List[Tuple[dict, dict]]:
return list(itertools.chain(*[result.data for result in self]))

def __attrs_post_init__(self):
self.results = list(
(processor.process(self.event) for processor in self.pipeline if self.event)
)
self.results = [processor.process(self.event) for processor in self.pipeline if self.event]

def __iter__(self):
return iter(self.results)
Expand Down
138 changes: 137 additions & 1 deletion tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Output,
OutputWarning,
)
from logprep.abc.processor import ProcessorResult
from logprep.abc.processor import Processor, ProcessorResult
from logprep.factory import Factory
from logprep.framework.pipeline import Pipeline, PipelineResult
from logprep.processor.base.exceptions import (
Expand Down Expand Up @@ -68,6 +68,13 @@ def get_mock_create():
return mock_create


@pytest.fixture(name="mock_processor")
def get_mock_processor():
mock_processor = mock.create_autospec(spec=Processor)
mock_processor.process.return_value = mock.create_autospec(spec=ProcessorResult)
return mock_processor


@mock.patch("logprep.factory.Factory.create", new_callable=get_mock_create)
class TestPipeline(ConfigurationForTests):
def setup_method(self):
Expand Down Expand Up @@ -743,3 +750,132 @@ def test_health_returns_health_functions_with_multiple_outputs(self):
assert isinstance(health, Tuple)
assert len(health) > 0
assert all(callable(health_function) for health_function in health)


class TestPipelineResult:

@pytest.mark.parametrize(
"parameters, error, message",
[
(
{
"event": {"some": "event"},
"pipeline": [],
},
ValueError,
"Length of 'pipeline' must be >= 1",
),
(
{
"event": {"some": "event"},
"pipeline": [],
"results": [],
},
TypeError,
"got an unexpected keyword argument 'results'",
),
(
{
"event": {"some": "event"},
"pipeline": [
Factory.create(
{
"dummy": {
"type": "dropper",
"specific_rules": [],
"generic_rules": [],
}
}
)
],
"results": [],
},
TypeError,
"got an unexpected keyword argument 'results'",
),
(
{
"event": {"some": "event"},
"pipeline": [
Factory.create(
{
"dummy": {
"type": "dropper",
"specific_rules": [],
"generic_rules": [],
}
}
)
],
},
None,
None,
),
(
{
"event": {"some": "event"},
"pipeline": [
mock.MagicMock(),
],
},
TypeError,
"'pipeline' must be <class 'logprep.abc.processor.Processor'",
),
],
)
def test_sets_attributes(self, parameters, error, message):
if error:
with pytest.raises(error, match=message):
_ = PipelineResult(**parameters)
else:
pipeline_result = PipelineResult(**parameters)
assert pipeline_result.event == parameters["event"]
assert pipeline_result.pipeline == parameters["pipeline"]
assert isinstance(pipeline_result.results[0], ProcessorResult)

def test_pipeline_result_produces_results(self, mock_processor):
pipeline_result = PipelineResult(
event={"some": "event"},
pipeline=[
mock_processor,
mock_processor,
],
)
assert isinstance(pipeline_result.results[0], ProcessorResult)
assert len(pipeline_result.results) == 2

def test_pipeline_result_collects_errors(self, mock_processor):
mock_processor_result = mock.create_autospec(spec=ProcessorResult)
mock_processor_result.errors = [mock.MagicMock(), mock.MagicMock()]
mock_processor.process.return_value = mock_processor_result
assert len(mock_processor.process({"event": "test"}).errors) == 2
pipeline_result = PipelineResult(
event={"some": "event"},
pipeline=[mock_processor],
)
assert isinstance(pipeline_result.results[0], ProcessorResult)
assert len(pipeline_result.errors) == 2

def test_pipeline_result_collects_warnings(self, mock_processor):
mock_processor_result = mock.create_autospec(spec=ProcessorResult)
mock_processor_result.warnings = [mock.MagicMock(), mock.MagicMock()]
mock_processor.process.return_value = mock_processor_result
assert len(mock_processor.process({"event": "test"}).warnings) == 2
pipeline_result = PipelineResult(
event={"some": "event"},
pipeline=[mock_processor],
)
assert isinstance(pipeline_result.results[0], ProcessorResult)
assert len(pipeline_result.warnings) == 2

def test_pipeline_result_collects_data(self, mock_processor):
mock_processor_result = mock.create_autospec(spec=ProcessorResult)
mock_processor_result.data = [mock.MagicMock(), mock.MagicMock()]
mock_processor.process.return_value = mock_processor_result
assert len(mock_processor.process({"event": "test"}).data) == 2
pipeline_result = PipelineResult(
event={"some": "event"},
pipeline=[mock_processor],
)
assert isinstance(pipeline_result.results[0], ProcessorResult)
assert len(pipeline_result.data) == 2

0 comments on commit 5fd2add

Please sign in to comment.