-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SPS reader / writer, internal modules reorganised and few bug fixes
- Loading branch information
Showing
19 changed files
with
1,890 additions
and
647 deletions.
There are no files selected for viewing
Empty file.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
from datetime import datetime | ||
|
||
import h5py | ||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
|
||
from tests.test_lhc_and_general import create_data, compare_tbt | ||
from turn_by_turn import iota | ||
from turn_by_turn.errors import HDF5VersionError | ||
from turn_by_turn.structures import TbtData, TransverseData | ||
|
||
|
||
def test_tbt_read_hdf5(_hdf5_file): | ||
origin = _hdf5_file_content() | ||
new = iota.read_tbt(_hdf5_file, hdf5_version=1) | ||
compare_tbt(origin, new, False) | ||
|
||
|
||
def test_tbt_read_hdf5_v2(_hdf5_file_v2): | ||
origin = _hdf5_file_content() | ||
new = iota.read_tbt(_hdf5_file_v2) | ||
compare_tbt(origin, new, False) | ||
|
||
|
||
def test_tbt_raises_on_wrong_hdf5_version(_hdf5_file): | ||
with pytest.raises(HDF5VersionError): | ||
new = iota.read_tbt(_hdf5_file, hdf5_version=2) | ||
|
||
|
||
def _hdf5_file_content() -> TbtData: | ||
""" TbT data as had been written out to hdf5 files (see below). """ | ||
return TbtData( | ||
matrices=[ | ||
TransverseData( | ||
X=pd.DataFrame( | ||
index=["IBPMA1C", "IBPME2R"], | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 2, np.sin), | ||
dtype=float, | ||
), | ||
Y=pd.DataFrame( | ||
index=["IBPMA1C", "IBPME2R"], | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 2, np.cos), | ||
dtype=float, | ||
), | ||
) | ||
], | ||
date=datetime.now(), | ||
bunch_ids=[1], | ||
nturns=2000, | ||
) | ||
|
||
|
||
@pytest.fixture() | ||
def _hdf5_file(tmp_path) -> h5py.File: | ||
""" IOTA File standard. """ | ||
with h5py.File(tmp_path / "test_file.hdf5", "w") as hd5_file: | ||
hd5_file.create_dataset( | ||
"N:IBE2RH", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.sin).flatten(), | ||
) | ||
hd5_file.create_dataset( | ||
"N:IBE2RV", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.cos).flatten(), | ||
) | ||
hd5_file.create_dataset( | ||
"N:IBE2RS", | ||
data=create_data(np.linspace(0, 20, 2000, endpoint=False), 1, np.exp).flatten(), | ||
) | ||
|
||
hd5_file.create_dataset( | ||
"N:IBA1CH", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.sin).flatten(), | ||
) | ||
hd5_file.create_dataset( | ||
"N:IBA1CV", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.cos).flatten(), | ||
) | ||
hd5_file.create_dataset( | ||
"N:IBA1CS", | ||
data=create_data(np.linspace(0, 20, 2000, endpoint=False), 1, np.exp).flatten(), | ||
) | ||
yield tmp_path / "test_file.hdf5" | ||
|
||
|
||
@pytest.fixture() | ||
def _hdf5_file_v2(tmp_path) -> h5py.File: | ||
""" IOTA File standard. """ | ||
with h5py.File(tmp_path / "test_file_v2.hdf5", "w") as hd5_file: | ||
hd5_file.create_group("A1C") | ||
hd5_file["A1C"].create_dataset( | ||
"Horizontal", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.sin).flatten(), | ||
) | ||
hd5_file["A1C"].create_dataset( | ||
"Vertical", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.cos).flatten(), | ||
) | ||
hd5_file["A1C"].create_dataset( | ||
"Intensity", | ||
data=create_data(np.linspace(0, 20, 2000, endpoint=False), 1, np.exp).flatten(), | ||
) | ||
|
||
hd5_file.create_group("E2R") | ||
hd5_file["E2R"].create_dataset( | ||
"Horizontal", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.sin).flatten(), | ||
) | ||
hd5_file["E2R"].create_dataset( | ||
"Vertical", | ||
data=create_data(np.linspace(-np.pi, np.pi, 2000, endpoint=False), 1, np.cos).flatten(), | ||
) | ||
hd5_file["E2R"].create_dataset( | ||
"Intensity", | ||
data=create_data(np.linspace(0, 20, 2000, endpoint=False), 1, np.exp).flatten(), | ||
) | ||
yield tmp_path / "test_file_v2.hdf5" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import pytest | ||
|
||
from turn_by_turn.constants import PLANES, PRINT_PRECISION | ||
from turn_by_turn.errors import DataTypeError | ||
from turn_by_turn.io import read_tbt, write_lhc_ascii, write_tbt | ||
from turn_by_turn.structures import TbtData | ||
|
||
INPUTS_DIR = Path(__file__).parent / "inputs" | ||
ASCII_PRECISION = 0.6 / np.power(10, PRINT_PRECISION) # not 0.5 due to rounding issues | ||
|
||
|
||
@pytest.mark.parametrize("datatype", ["invalid", "not_supported"]) | ||
def test_tbt_read_raises_on_invalid_datatype(_sdds_file, caplog, datatype): | ||
with pytest.raises(DataTypeError): | ||
_ = read_tbt(_sdds_file, datatype=datatype) | ||
|
||
for record in caplog.records: | ||
assert record.levelname == "ERROR" | ||
|
||
|
||
@pytest.mark.parametrize("datatype", ["invalid", "not_supported"]) | ||
def test_tbt_write_raises_on_invalid_datatype(_sdds_file, caplog, datatype): | ||
with pytest.raises(DataTypeError): | ||
write_tbt(_sdds_file, tbt_data=None, datatype=datatype) | ||
|
||
for record in caplog.records: | ||
assert record.levelname == "ERROR" | ||
|
||
|
||
def test_tbt_write_read_sdds_binary(_sdds_file, _test_file): | ||
origin = read_tbt(_sdds_file) | ||
write_tbt(_test_file, origin) | ||
new = read_tbt(f"{_test_file}.sdds") | ||
compare_tbt(origin, new, False) | ||
|
||
|
||
def test_tbt_write_read_sdds_binary_with_noise(_sdds_file, _test_file): | ||
origin = read_tbt(_sdds_file) | ||
write_tbt(_test_file, origin, noise=2) | ||
new = read_tbt(f"{_test_file}.sdds") | ||
|
||
with pytest.raises(AssertionError): # should be different | ||
compare_tbt(origin, new, False) | ||
|
||
|
||
def test_tbt_write_read_ascii(_sdds_file, _test_file): | ||
origin = read_tbt(_sdds_file) | ||
write_lhc_ascii(_test_file, origin) | ||
new = read_tbt(_test_file) | ||
compare_tbt(origin, new, True) | ||
|
||
|
||
# ----- Helpers ----- # | ||
|
||
|
||
def compare_tbt(origin: TbtData, new: TbtData, no_binary: bool, max_deviation=ASCII_PRECISION) -> None: | ||
assert new.nturns == origin.nturns | ||
assert new.nbunches == origin.nbunches | ||
assert new.bunch_ids == origin.bunch_ids | ||
for index in range(origin.nbunches): | ||
for plane in PLANES: | ||
assert np.all(new.matrices[index][plane].index == origin.matrices[index][plane].index) | ||
origin_mat = origin.matrices[index][plane].to_numpy() | ||
new_mat = new.matrices[index][plane].to_numpy() | ||
if no_binary: | ||
assert np.nanmax(np.abs(origin_mat - new_mat)) < max_deviation | ||
else: | ||
assert np.all(origin_mat == new_mat) | ||
|
||
|
||
def create_data(phases, nbpm, function) -> np.ndarray: | ||
return np.ones((nbpm, len(phases))) * function(phases) | ||
|
||
|
||
@pytest.fixture() | ||
def _test_file(tmp_path) -> Path: | ||
yield tmp_path / "test_file" | ||
|
||
|
||
@pytest.fixture() | ||
def _sdds_file() -> Path: | ||
return INPUTS_DIR / "test_file.sdds" | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
|
||
from datetime import datetime | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
|
||
from tests.test_lhc_and_general import compare_tbt, INPUTS_DIR | ||
from turn_by_turn import ptc, trackone | ||
from turn_by_turn.errors import PTCFormatError | ||
from turn_by_turn.structures import TbtData, TransverseData | ||
|
||
|
||
def test_read_ptc(_ptc_file): | ||
new = ptc.read_tbt(_ptc_file) | ||
origin = _original_trackone() | ||
compare_tbt(origin, new, True) | ||
|
||
|
||
def test_read_ptc_raises_on_invalid_file(_invalid_ptc_file): | ||
with pytest.raises(PTCFormatError): | ||
_ = ptc.read_tbt(_invalid_ptc_file) | ||
|
||
|
||
def test_read_ptc_defaults_date(_ptc_file_no_date): | ||
new = ptc.read_tbt(_ptc_file_no_date) | ||
assert new.date.day == datetime.today().day | ||
assert new.date.tzname() == "UTC" | ||
|
||
|
||
def test_read_ptc_sci(_ptc_file_sci): | ||
new = ptc.read_tbt(_ptc_file_sci) | ||
origin = _original_trackone() | ||
compare_tbt(origin, new, True) | ||
|
||
|
||
def test_read_ptc_looseparticles(_ptc_file_losses): | ||
new = ptc.read_tbt(_ptc_file_losses) | ||
assert len(new.matrices) == 3 | ||
assert len(new.matrices[0].X.columns) == 9 | ||
assert all(new.matrices[0].X.index == np.array([f"BPM{i+1}" for i in range(3)])) | ||
assert not new.matrices[0].X.isna().any().any() | ||
|
||
|
||
def test_read_trackone(_ptc_file): | ||
new = trackone.read_tbt(_ptc_file) | ||
origin = _original_trackone(True) | ||
compare_tbt(origin, new, True) | ||
|
||
|
||
def test_read_trackone_sci(_ptc_file_sci): | ||
new = trackone.read_tbt(_ptc_file_sci) | ||
origin = _original_trackone(True) | ||
compare_tbt(origin, new, True) | ||
|
||
|
||
def test_read_trackone_looseparticles(_ptc_file_losses): | ||
new = trackone.read_tbt(_ptc_file_losses) | ||
assert len(new.matrices) == 3 | ||
assert len(new.matrices[0].X.columns) == 9 | ||
assert all(new.matrices[0].X.index == np.array([f"BPM{i+1}" for i in range(3)])) | ||
assert not new.matrices[0].X.isna().any().any() | ||
|
||
|
||
def _original_trackone(track: bool = False) -> TbtData: | ||
names = np.array(["C1.BPM1"]) | ||
matrix = [ | ||
TransverseData( | ||
X=pd.DataFrame(index=names, data=[[0.001, -0.0003606, -0.00165823, -0.00266631]]), | ||
Y=pd.DataFrame(index=names, data=[[0.001, 0.00070558, -0.00020681, -0.00093807]]), | ||
), | ||
TransverseData( | ||
X=pd.DataFrame(index=names, data=[[0.0011, -0.00039666, -0.00182406, -0.00293294]]), | ||
Y=pd.DataFrame(index=names, data=[[0.0011, 0.00077614, -0.00022749, -0.00103188]]), | ||
), | ||
] | ||
origin = TbtData(matrix, None, [0, 1] if track else [1, 2], 4) | ||
return origin | ||
|
||
|
||
@pytest.fixture() | ||
def _ptc_file_no_date() -> Path: | ||
return INPUTS_DIR / "test_trackone_no_date" | ||
|
||
|
||
@pytest.fixture() | ||
def _ptc_file_losses() -> Path: | ||
return INPUTS_DIR / "test_trackone_losses" | ||
|
||
|
||
@pytest.fixture() | ||
def _ptc_file_sci() -> Path: | ||
return INPUTS_DIR / "test_trackone_sci" | ||
|
||
|
||
@pytest.fixture() | ||
def _ptc_file() -> Path: | ||
return INPUTS_DIR / "test_trackone" | ||
|
||
|
||
@pytest.fixture() | ||
def _invalid_ptc_file() -> Path: | ||
return INPUTS_DIR / "test_wrong_ptc" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
|
||
from tests.test_lhc_and_general import compare_tbt, INPUTS_DIR, create_data | ||
from turn_by_turn import sps, TbtData, TransverseData | ||
|
||
|
||
def test_read_write_real_data(_sps_file, tmp_path): | ||
input_sdds = sps.read_tbt(_sps_file) | ||
tmp_sdds = tmp_path / "sps.sdds" | ||
sps.write_tbt(tmp_sdds, input_sdds) | ||
read_sdds = sps.read_tbt(tmp_sdds) | ||
compare_tbt(input_sdds, read_sdds, no_binary=True) | ||
|
||
|
||
def test_write_read(tmp_path): | ||
nturns = 1324 | ||
nbpms_x = 350 | ||
nbpms_y = 353 | ||
original = TbtData( | ||
nturns=nturns, | ||
matrices=[ | ||
TransverseData( | ||
X=pd.DataFrame( | ||
index=[f"BPM{i}.H" for i in range(nbpms_x)], | ||
data=create_data(np.linspace(-np.pi, np.pi, nturns, endpoint=False), nbpms_x, np.sin) | ||
), | ||
Y=pd.DataFrame( | ||
index=[f"BPM{i}.V" for i in range(nbpms_y)], | ||
data=create_data(np.linspace(-np.pi, np.pi, nturns, endpoint=False), nbpms_y, np.cos) | ||
), | ||
) | ||
], | ||
) | ||
tmp_sdds = tmp_path / "sps_fake_data.sdds" | ||
sps.write_tbt(tmp_sdds, original) | ||
read_sdds = sps.read_tbt(tmp_sdds) | ||
compare_tbt(original, read_sdds, no_binary=True) | ||
|
||
|
||
@pytest.fixture() | ||
def _sps_file() -> Path: | ||
return INPUTS_DIR / "test_sps.sdds" |
Oops, something went wrong.