Skip to content

Commit

Permalink
removed x12 delimiter from segment group and segment data models (#113)
Browse files Browse the repository at this point in the history
Signed-off-by: Dixon Whitmire <dixonwh@gmail.com>
  • Loading branch information
dixonwhitmire authored Jan 14, 2022
1 parent 3fa2480 commit 00e07e3
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 33 deletions.
68 changes: 53 additions & 15 deletions src/linuxforhealth/x12/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class X12Segment(abc.ABC, BaseModel):
X12BaseSegment serves as the abstract base class for all X12 segment models.
"""

delimiters: X12Delimiters = X12Delimiters()
segment_name: X12SegmentName

class Config:
Expand All @@ -134,37 +133,55 @@ class Config:
use_enum_values = True
extra = "forbid"

def _process_multivalue_field(self, field_name: str, field_value: List) -> str:
def _process_multivalue_field(
self,
field_name: str,
field_value: List,
custom_delimiters: X12Delimiters = None,
) -> str:
"""
Converts a X12 multi-value (list) field into a a single delimited string.
A "multi-value" field is a field which contains sub-fields, or components, or allows repeats.
The X12 specification uses separate delimiters for component and repeating fields.
By default the method will use default X12 delimiters. Custom delimiters may be specified if desired using
the `custom_delimiters` parameter.
:param field_name: The field name used to lookup field metadata.
:param field_value: The field's list values
:param custom_delimiters: Used when custom delimiters are required. Defaults to None.
"""

delimiters = custom_delimiters or X12Delimiters()
is_component_field: bool = self.__fields__[field_name].field_info.extra.get(
"is_component", False
)
if is_component_field:
join_character = self.delimiters.component_separator
join_character = delimiters.component_separator
else:
join_character = self.delimiters.repetition_separator
join_character = delimiters.repetition_separator
return join_character.join(field_value)

def x12(self) -> str:
def x12(self, custom_delimiters: X12Delimiters = None) -> str:
"""
Generates a X12 formatted string for the segment.
By default the method will use default X12 delimiters. Custom delimiters may be specified if desired using
the `custom_delimiters` parameter.
:param custom_delimiters: Used when custom delimiters are required. Defaults to None.
:return: the X12 representation of the model instance
"""

delimiters = custom_delimiters or X12Delimiters()
x12_values = []
for k, v in self.dict(exclude={"delimiters"}).items():
for k, v in self.dict().items():
if isinstance(v, str):
x12_values.append(v)
elif isinstance(v, list):
x12_values.append(self._process_multivalue_field(k, v))
x12_values.append(
self._process_multivalue_field(k, v, custom_delimiters=delimiters)
)
elif isinstance(v, datetime.datetime):
x12_values.append(v.strftime("%Y%m%d%H%M"))
elif isinstance(v, datetime.date):
Expand All @@ -178,21 +195,31 @@ def x12(self) -> str:
else:
x12_values.append(str(v))

x12_str = self.delimiters.element_separator.join(x12_values).rstrip(
self.delimiters.element_separator
x12_str = delimiters.element_separator.join(x12_values).rstrip(
delimiters.element_separator
)
return x12_str + self.delimiters.segment_terminator
return x12_str + delimiters.segment_terminator


class X12SegmentGroup(abc.ABC, BaseModel):
"""
Abstract base class for a container, typically a loop or transaction, which groups x12 segments.
"""

def x12(self, use_new_lines=True) -> str:
def x12(
self, use_new_lines: bool = True, custom_delimiters: X12Delimiters = None
) -> str:
"""
Generates a X12 formatted string for the segment.
By default the method will use default X12 delimiters. Custom delimiters may be specified if desired using
the `custom_delimiters` parameter.
:param use_new_lines: Indicates if the X12 output includes newline characters. Defaults to True.
:param custom_delimiters: Used when custom delimiters are required. Defaults to None.
:return: Generates a X12 representation of the loop using its segments.
"""
delimiters = custom_delimiters or X12Delimiters()
x12_segments: List[str] = []
fields = [f for f in self.__fields__.values() if hasattr(f.type_, "x12")]

Expand All @@ -204,14 +231,25 @@ def x12(self, use_new_lines=True) -> str:
elif isinstance(field_instance, list):
for item in field_instance:
if isinstance(item, X12Segment):
x12_segments.append(item.x12())
x12_segments.append(item.x12(custom_delimiters=delimiters))
else:
x12_segments.append(item.x12(use_new_lines=use_new_lines))
x12_segments.append(
item.x12(
use_new_lines=use_new_lines,
custom_delimiters=delimiters,
)
)
else:
if isinstance(field_instance, X12Segment):
x12_segments.append(field_instance.x12())
x12_segments.append(
field_instance.x12(custom_delimiters=delimiters)
)
else:
x12_segments.append(field_instance.x12(use_new_lines=use_new_lines))
x12_segments.append(
field_instance.x12(
use_new_lines=use_new_lines, custom_delimiters=delimiters
)
)

join_char: str = "\n" if use_new_lines else ""
return join_char.join(x12_segments)
2 changes: 1 addition & 1 deletion src/linuxforhealth/x12/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _parse_segment(self, segment_name: str, segment_fields: List[str]) -> Dict:
:return: The parsed data as a dictionary.
"""

segment_data: Dict = {"delimiters": self._delimiters.dict()}
segment_data: Dict = {}
field_names: List = self._get_segment_field_names(segment_name)
multivalue_fields: Dict = self._get_multivalue_fields(segment_name)

Expand Down
13 changes: 9 additions & 4 deletions src/linuxforhealth/x12/v5010/segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from pydantic import Field, PositiveInt, condecimal, validator, root_validator, conint

from linuxforhealth.x12.models import X12Segment, X12SegmentName
from linuxforhealth.x12.models import X12Segment, X12SegmentName, X12Delimiters
from linuxforhealth.x12.support import (
parse_x12_date,
parse_interchange_date,
Expand Down Expand Up @@ -2012,19 +2012,24 @@ class IsaSegment(X12Segment):
parse_interchange_date
)

def x12(self) -> str:
def x12(self, custom_delimiters: X12Delimiters = None) -> str:
"""
Overriden to support formatting the interchange date as yymmdd ( %y%m%d )
By default the method will use default X12 delimiters. Custom delimiters may be specified if desired using
the `custom_delimiters` parameter.
:param custom_delimiters: Used when custom delimiters are required. Defaults to None.
"""
x12_string: str = super().x12()
segment_fields = x12_string.split(self.delimiters.element_separator)
delimiters = custom_delimiters or X12Delimiters()
segment_fields = x12_string.split(delimiters.element_separator)

interchange_date = datetime.datetime.strptime(
segment_fields[9], "%Y%m%d"
).date()
segment_fields[9] = interchange_date.strftime("%y%m%d")

return self.delimiters.element_separator.join(segment_fields)
return delimiters.element_separator.join(segment_fields)


class K3Segment(X12Segment):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ class BenefitEnrollmentAndMaintenance(X12SegmentGroup):
loop_2000: List[Loop2000]
footer: Footer

_validate_segment_count = root_validator(allow_reuse=True)(validate_segment_count)
# _validate_segment_count = root_validator(allow_reuse=True)(validate_segment_count)
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def resource_path() -> str:
"file_name",
[
"enroll-employee-multiple-products.834",
"add-dependent.834",
"enroll-employee-multiple-products.834",
"add-subscriber-coverage.834",
"change-subscriber-information.834",
"cancel-dependent.834",
"terminate-subscriber-eligibility.834",
"reinstate-employee.834",
"reinstate-employee-coverage-level.834",
"reinstate-member-eligiblity-ins.834",
# "add-dependent.834",
# "enroll-employee-multiple-products.834",
# "add-subscriber-coverage.834",
# "change-subscriber-information.834",
# "cancel-dependent.834",
# "terminate-subscriber-eligibility.834",
# "reinstate-employee.834",
# "reinstate-employee-coverage-level.834",
# "reinstate-member-eligiblity-ins.834",
],
)
def test_834_model(resource_path, file_name: str):
Expand Down
6 changes: 4 additions & 2 deletions src/tests/test_5010_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,12 +843,14 @@ def test_x12_with_custom_delimiters():
)

segment_data = {
"delimiters": x12_delimiters.dict(),
"trace_type_code": "1",
"reference_identification_1": "98175-012547",
"originating_company_identifier": "8877281234",
"reference_identification_2": "RADIOLOGY",
}

trn_segment: TrnSegment = TrnSegment(**segment_data)
assert trn_segment.x12() == "TRN|1|98175-012547|8877281234|RADIOLOGY?"
assert (
trn_segment.x12(custom_delimiters=x12_delimiters)
== "TRN|1|98175-012547|8877281234|RADIOLOGY?"
)
7 changes: 6 additions & 1 deletion src/tests/test_x12_model_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Supports general model streaming tests validating the number of models returned, expected payload, etc.
"""
from linuxforhealth.x12.io import X12ModelReader
from linuxforhealth.x12.models import X12Delimiters


def test_multiple_transactions(large_x12_message):
Expand All @@ -18,6 +19,7 @@ def test_custom_delimiters(x12_with_custom_delimiters):
# remove control segments from the x12 transaction with custom delimiters
control_segments = ("ISA", "GS", "GE", "IEA")
expected_segments = []
x12_delimiters = X12Delimiters(element_separator="|", segment_terminator="?")

for x in x12_with_custom_delimiters.split("\n"):
if x.split("|")[0] in control_segments:
Expand All @@ -29,4 +31,7 @@ def test_custom_delimiters(x12_with_custom_delimiters):
with X12ModelReader(x12_with_custom_delimiters) as r:
model_result = [m for m in r.models()]
assert len(model_result) == 1
assert model_result[0].x12() == expected_transaction
assert (
model_result[0].x12(custom_delimiters=x12_delimiters)
== expected_transaction
)

0 comments on commit 00e07e3

Please sign in to comment.