Skip to content

Commit

Permalink
Merge pull request #780 from AlanKuurstra/timezone_aware
Browse files Browse the repository at this point in the history
timezone aware
  • Loading branch information
yarikoptic authored Oct 2, 2024
2 parents 92c49f0 + 84a0cfa commit 0970b86
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 31 deletions.
10 changes: 4 additions & 6 deletions heudiconv/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
remove_suffix,
save_json,
set_readonly,
strptime_micr,
strptime_bids,
update_json,
)

Expand Down Expand Up @@ -952,18 +952,16 @@ def select_fmap_from_compatible_groups(
k for k, v in acq_times_fmaps.items() if v == first_acq_time
][0]
elif criterion == "Closest":
json_acq_time = strptime_micr(
json_acq_time = strptime_bids(
acq_times[
# remove session folder and '.json', add '.nii.gz':
remove_suffix(remove_prefix(json_file, sess_folder + op.sep), ".json")
+ ".nii.gz"
],
"%Y-%m-%dT%H:%M:%S[.%f]",
]
)
# differences in acquisition time (abs value):
diff_fmaps_acq_times = {
k: abs(strptime_micr(v, "%Y-%m-%dT%H:%M:%S[.%f]") - json_acq_time)
for k, v in acq_times_fmaps.items()
k: abs(strptime_bids(v) - json_acq_time) for k, v in acq_times_fmaps.items()
}
min_diff_acq_times = sorted(diff_fmaps_acq_times.values())[0]
selected_fmap_key = [
Expand Down
26 changes: 12 additions & 14 deletions heudiconv/dicoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
get_typed_attr,
load_json,
set_readonly,
strptime_micr,
strptime_dcm_da_tm,
strptime_dcm_dt,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -539,19 +540,16 @@ def get_datetime_from_dcm(dcm_data: dcm.FileDataset) -> Optional[datetime.dateti
3. SeriesDate & SeriesTime (0008,0021); (0008,0031)
"""
acq_date = dcm_data.get("AcquisitionDate", "").strip()
acq_time = dcm_data.get("AcquisitionTime", "").strip()
if acq_date and acq_time:
return strptime_micr(acq_date + acq_time, "%Y%m%d%H%M%S[.%f]")

acq_dt = dcm_data.get("AcquisitionDateTime", "").strip()
if acq_dt:
return strptime_micr(acq_dt, "%Y%m%d%H%M%S[.%f]")

series_date = dcm_data.get("SeriesDate", "").strip()
series_time = dcm_data.get("SeriesTime", "").strip()
if series_date and series_time:
return strptime_micr(series_date + series_time, "%Y%m%d%H%M%S[.%f]")

def check_tag(x: str) -> bool:
return x in dcm_data and dcm_data[x].value.strip()

if check_tag("AcquisitionDate") and check_tag("AcquisitionTime"):
return strptime_dcm_da_tm(dcm_data, "AcquisitionDate", "AcquisitionTime")
if check_tag("AcquisitionDateTime"):
return strptime_dcm_dt(dcm_data, "AcquisitionDateTime")
if check_tag("SeriesDate") and check_tag("SeriesTime"):
return strptime_dcm_da_tm(dcm_data, "SeriesDate", "SeriesTime")
return None


Expand Down
102 changes: 95 additions & 7 deletions heudiconv/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import IO, Any
from unittest.mock import patch

import pydicom as dcm
import pytest

from heudiconv.utils import (
Expand All @@ -22,6 +23,9 @@
remove_prefix,
remove_suffix,
save_json,
strptime_bids,
strptime_dcm_da_tm,
strptime_dcm_dt,
strptime_micr,
update_json,
)
Expand Down Expand Up @@ -178,14 +182,98 @@ def test_get_datetime() -> None:
],
)
def test_strptime_micr(dt: str, fmt: str) -> None:
with pytest.warns(DeprecationWarning):
target = datetime.strptime(dt, fmt)
assert strptime_micr(dt, fmt) == target
assert strptime_micr(dt, fmt + "[.%f]") == target
assert strptime_micr(dt + ".0", fmt + "[.%f]") == target
assert strptime_micr(dt + ".000000", fmt + "[.%f]") == target
assert strptime_micr(dt + ".1", fmt + "[.%f]") == datetime.strptime(
dt + ".1", fmt + ".%f"
)


@pytest.mark.parametrize(
"dt, fmt",
[
("2023-04-02T11:47:09", "%Y-%m-%dT%H:%M:%S"),
("2023-04-02T11:47:09.0", "%Y-%m-%dT%H:%M:%S.%f"),
("2023-04-02T11:47:09.000000", "%Y-%m-%dT%H:%M:%S.%f"),
("2023-04-02T11:47:09.1", "%Y-%m-%dT%H:%M:%S.%f"),
("2023-04-02T11:47:09-0900", "%Y-%m-%dT%H:%M:%S%z"),
("2023-04-02T11:47:09.1-0900", "%Y-%m-%dT%H:%M:%S.%f%z"),
],
)
def test_strptime_bids(dt: str, fmt: str) -> None:
target = datetime.strptime(dt, fmt)
assert strptime_micr(dt, fmt) == target
assert strptime_micr(dt, fmt + "[.%f]") == target
assert strptime_micr(dt + ".0", fmt + "[.%f]") == target
assert strptime_micr(dt + ".000000", fmt + "[.%f]") == target
assert strptime_micr(dt + ".1", fmt + "[.%f]") == datetime.strptime(
dt + ".1", fmt + ".%f"
)
assert strptime_bids(dt) == target


@pytest.mark.parametrize(
"tm, tm_fmt",
[
("114709.1", "%H%M%S.%f"),
("114709", "%H%M%S"),
("1147", "%H%M"),
("11", "%H"),
],
)
@pytest.mark.parametrize(
"offset, offset_fmt",
[
("-0900", "%z"),
("", ""),
],
)
def test_strptime_dcm_da_tm(tm: str, tm_fmt: str, offset: str, offset_fmt: str) -> None:
da = "20230402"
da_fmt = "%Y%m%d"
target = datetime.strptime(da + tm + offset, da_fmt + tm_fmt + offset_fmt)
ds = dcm.dataset.Dataset()
ds["AcquisitionDate"] = dcm.DataElement("AcquisitionDate", "DA", da)
ds["AcquisitionTime"] = dcm.DataElement("AcquisitionTime", "TM", tm)
if offset:
ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset)
assert strptime_dcm_da_tm(ds, "AcquisitionDate", "AcquisitionTime") == target


@pytest.mark.parametrize(
"dt, dt_fmt",
[
("20230402114709.1-0400", "%Y%m%d%H%M%S.%f%z"),
("20230402114709-0400", "%Y%m%d%H%M%S%z"),
("202304021147-0400", "%Y%m%d%H%M%z"),
("2023040211-0400", "%Y%m%d%H%z"),
("20230402-0400", "%Y%m%d%z"),
("202304-0400", "%Y%m%z"),
("2023-0400", "%Y%z"),
("20230402114709.1", "%Y%m%d%H%M%S.%f"),
("20230402114709", "%Y%m%d%H%M%S"),
("202304021147", "%Y%m%d%H%M"),
("2023040211", "%Y%m%d%H"),
("20230402", "%Y%m%d"),
("202304", "%Y%m"),
("2023", "%Y"),
],
)
@pytest.mark.parametrize(
"offset, offset_fmt",
[
("-0900", "%z"),
("", ""),
],
)
def test_strptime_dcm_dt(dt: str, dt_fmt: str, offset: str, offset_fmt: str) -> None:
target = None
if dt_fmt[-2:] == "%z" and offset:
target = datetime.strptime(dt, dt_fmt)
else:
target = datetime.strptime(dt + offset, dt_fmt + offset_fmt)
ds = dcm.dataset.Dataset()
ds["AcquisitionDateTime"] = dcm.DataElement("AcquisitionDateTime", "DT", dt)
if offset:
ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset)
assert strptime_dcm_dt(ds, "AcquisitionDateTime") == target


def test_remove_suffix() -> None:
Expand Down
156 changes: 152 additions & 4 deletions heudiconv/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Callable
from collections.abc import Mapping as MappingABC
import copy
from datetime import datetime
import datetime
from glob import glob
import hashlib
import json
Expand Down Expand Up @@ -35,6 +35,10 @@
cast,
overload,
)
import warnings

import pydicom as dcm
from pydicom.tag import TagType

lgr = logging.getLogger(__name__)

Expand Down Expand Up @@ -662,13 +666,13 @@ def get_datetime(date: str, time: str, *, microseconds: bool = True) -> str:
# add dummy microseconds if not available for strptime to parse
time += ".000000"
td = time + ":" + date
datetime_str = datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat()
datetime_str = datetime.datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat()
if not microseconds:
datetime_str = datetime_str.split(".", 1)[0]
return datetime_str


def strptime_micr(date_string: str, fmt: str) -> datetime:
def strptime_micr(date_string: str, fmt: str) -> datetime.datetime:
r"""
Decorate strptime while supporting optional [.%f] in the format at the end
Expand All @@ -681,12 +685,156 @@ def strptime_micr(date_string: str, fmt: str) -> datetime:
'.\d+' regex and not if it does not.
"""

warnings.warn(
"strptime_micr() is deprecated, please use strptime() instead.",
DeprecationWarning,
stacklevel=2,
)
optional_micr = "[.%f]"
if fmt.endswith(optional_micr):
fmt = fmt[: -len(optional_micr)]
if re.search(r"\.\d+$", date_string):
fmt += ".%f"
return datetime.strptime(date_string, fmt)
return datetime.datetime.strptime(date_string, fmt)


def datetime_utc_offset(
datetime_obj: datetime.datetime, utc_offset: str
) -> datetime.datetime:
"""set the datetime's tzinfo by parsing an utc offset string"""
# https://dicom.innolitics.com/ciods/electromyogram/sop-common/00080201
extract_offset = re.match(r"([+\-]?)(\d{2})(\d{2})", utc_offset)
if extract_offset is None:
raise ValueError(f"utc offset {utc_offset} is not valid")
sign, hours, minutes = extract_offset.groups()
sign = -1 if sign == "-" else 1
hours, minutes = int(hours), int(minutes)
tzinfo = datetime.timezone(sign * datetime.timedelta(hours=hours, minutes=minutes))
return datetime_obj.replace(tzinfo=tzinfo)


def strptime(datetime_string: str, fmts: list[str]) -> datetime.datetime:
"""
Try datetime.strptime on a list of formats returning the first successful attempt.
Parameters
----------
datetime_string: str
Datetime string to parse
fmts: list[str]
List of format strings
"""
datetime_str = datetime_string
for fmt in fmts:
try:
return datetime.datetime.strptime(datetime_str, fmt)
except ValueError:
pass
raise ValueError(f"Unable to parse datetime string: {datetime_str}")


def strptime_bids(datetime_string: str) -> datetime.datetime:
"""
Create a datetime object from a bids datetime string.
Parameters
----------
date_string: str
Datetime string to parse
"""
# https://bids-specification.readthedocs.io/en/stable/common-principles.html#units
fmts = [
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
]
datetime_obj = strptime(datetime_string, fmts)
return datetime_obj


def strptime_dcm_da_tm(
dcm_data: dcm.Dataset, da_tag: TagType, tm_tag: TagType
) -> datetime.datetime:
"""
Create a datetime object from a dicom DA tag and TM tag.
Parameters
----------
dcm_data : dcm.Dataset
DICOM with header, e.g., as read by pydicom.dcmread.
da_tag: str
Dicom tag with DA value representation
tm_tag: str
Dicom tag with TM value representation
"""
# https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html
date_str = dcm_data[da_tag].value
fmts = [
"%Y%m%d",
]
date = strptime(date_str, fmts)

time_str = dcm_data[tm_tag].value
fmts = ["%H", "%H%M", "%H%M%S", "%H%M%S.%f"]
time = strptime(time_str, fmts)

datetime_obj = datetime.datetime.combine(date.date(), time.time())

if utc_offset_dcm := dcm_data.get((0x0008, 0x0201)):
utc_offset = utc_offset_dcm.value
datetime_obj = (
datetime_utc_offset(datetime_obj, utc_offset)
if utc_offset
else datetime_obj
)
return datetime_obj


def strptime_dcm_dt(dcm_data: dcm.Dataset, dt_tag: TagType) -> datetime.datetime:
"""
Create a datetime object from a dicom DT tag.
Parameters
----------
dcm_data : dcm.FileDataset
DICOM with header, e.g., as read by pydicom.dcmread.
Objects with __getitem__ and have those keys with values properly formatted may also work
da_tag: str
Dicom tag with DT value representation
"""
# https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html
datetime_str = dcm_data[dt_tag].value
fmts = [
"%Y%z",
"%Y%m%z",
"%Y%m%d%z",
"%Y%m%d%H%z",
"%Y%m%d%H%M%z",
"%Y%m%d%H%M%S%z",
"%Y%m%d%H%M%S.%f%z",
"%Y",
"%Y%m",
"%Y%m%d",
"%Y%m%d%H",
"%Y%m%d%H%M",
"%Y%m%d%H%M%S",
"%Y%m%d%H%M%S.%f",
]
datetime_obj = strptime(datetime_str, fmts)

if utc_offset_dcm := dcm_data.get((0x0008, 0x0201)):
if utc_offset := utc_offset_dcm.value:
datetime_obj2 = datetime_utc_offset(datetime_obj, utc_offset)
if datetime_obj.tzinfo and datetime_obj2 != datetime_obj:
lgr.warning(
"Unexpectedly previously parsed datetime %s contains zoneinfo which is different from the one obtained from DICOMs UTFOffset field: %s",
datetime_obj,
datetime_obj2,
)
else:
datetime_obj = datetime_obj2
return datetime_obj


def remove_suffix(s: str, suf: str) -> str:
Expand Down

0 comments on commit 0970b86

Please sign in to comment.