diff --git a/python_src/poetry.lock b/python_src/poetry.lock index 347b2482..f7163ab5 100644 --- a/python_src/poetry.lock +++ b/python_src/poetry.lock @@ -394,6 +394,7 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, + {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -402,6 +403,7 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, + {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -431,6 +433,7 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, + {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -439,6 +442,7 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, + {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -1160,37 +1164,41 @@ tests = ["pytest"] [[package]] name = "pyarrow" -version = "11.0.0" +version = "13.0.0" description = "Python library for Apache Arrow" category = "main" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pyarrow-11.0.0-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:40bb42afa1053c35c749befbe72f6429b7b5f45710e85059cdd534553ebcf4f2"}, - {file = "pyarrow-11.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:7c28b5f248e08dea3b3e0c828b91945f431f4202f1a9fe84d1012a761324e1ba"}, - {file = "pyarrow-11.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a37bc81f6c9435da3c9c1e767324ac3064ffbe110c4e460660c43e144be4ed85"}, - {file = "pyarrow-11.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ad7c53def8dbbc810282ad308cc46a523ec81e653e60a91c609c2233ae407689"}, - {file = "pyarrow-11.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:25aa11c443b934078bfd60ed63e4e2d42461682b5ac10f67275ea21e60e6042c"}, - {file = "pyarrow-11.0.0-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:e217d001e6389b20a6759392a5ec49d670757af80101ee6b5f2c8ff0172e02ca"}, - {file = "pyarrow-11.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ad42bb24fc44c48f74f0d8c72a9af16ba9a01a2ccda5739a517aa860fa7e3d56"}, - {file = "pyarrow-11.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2d942c690ff24a08b07cb3df818f542a90e4d359381fbff71b8f2aea5bf58841"}, - {file = "pyarrow-11.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f010ce497ca1b0f17a8243df3048055c0d18dcadbcc70895d5baf8921f753de5"}, - {file = "pyarrow-11.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:2f51dc7ca940fdf17893227edb46b6784d37522ce08d21afc56466898cb213b2"}, - {file = "pyarrow-11.0.0-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:1cbcfcbb0e74b4d94f0b7dde447b835a01bc1d16510edb8bb7d6224b9bf5bafc"}, - {file = "pyarrow-11.0.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:aaee8f79d2a120bf3e032d6d64ad20b3af6f56241b0ffc38d201aebfee879d00"}, - {file = "pyarrow-11.0.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:410624da0708c37e6a27eba321a72f29d277091c8f8d23f72c92bada4092eb5e"}, - {file = "pyarrow-11.0.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2d53ba72917fdb71e3584ffc23ee4fcc487218f8ff29dd6df3a34c5c48fe8c06"}, - {file = "pyarrow-11.0.0-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:f12932e5a6feb5c58192209af1d2607d488cb1d404fbc038ac12ada60327fa34"}, - {file = "pyarrow-11.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:41a1451dd895c0b2964b83d91019e46f15b5564c7ecd5dcb812dadd3f05acc97"}, - {file = "pyarrow-11.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:becc2344be80e5dce4e1b80b7c650d2fc2061b9eb339045035a1baa34d5b8f1c"}, - {file = "pyarrow-11.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f40be0d7381112a398b93c45a7e69f60261e7b0269cc324e9f739ce272f4f70"}, - {file = "pyarrow-11.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:362a7c881b32dc6b0eccf83411a97acba2774c10edcec715ccaab5ebf3bb0835"}, - {file = "pyarrow-11.0.0-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:ccbf29a0dadfcdd97632b4f7cca20a966bb552853ba254e874c66934931b9841"}, - {file = "pyarrow-11.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3e99be85973592051e46412accea31828da324531a060bd4585046a74ba45854"}, - {file = "pyarrow-11.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69309be84dcc36422574d19c7d3a30a7ea43804f12552356d1ab2a82a713c418"}, - {file = "pyarrow-11.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:da93340fbf6f4e2a62815064383605b7ffa3e9eeb320ec839995b1660d69f89b"}, - {file = "pyarrow-11.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:caad867121f182d0d3e1a0d36f197df604655d0b466f1bc9bafa903aa95083e4"}, - {file = "pyarrow-11.0.0.tar.gz", hash = "sha256:5461c57dbdb211a632a48facb9b39bbeb8a7905ec95d768078525283caef5f6d"}, + {file = "pyarrow-13.0.0-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:1afcc2c33f31f6fb25c92d50a86b7a9f076d38acbcb6f9e74349636109550148"}, + {file = "pyarrow-13.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:70fa38cdc66b2fc1349a082987f2b499d51d072faaa6b600f71931150de2e0e3"}, + {file = "pyarrow-13.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cd57b13a6466822498238877892a9b287b0a58c2e81e4bdb0b596dbb151cbb73"}, + {file = "pyarrow-13.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f8ce69f7bf01de2e2764e14df45b8404fc6f1a5ed9871e8e08a12169f87b7a26"}, + {file = "pyarrow-13.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:588f0d2da6cf1b1680974d63be09a6530fd1bd825dc87f76e162404779a157dc"}, + {file = "pyarrow-13.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:6241afd72b628787b4abea39e238e3ff9f34165273fad306c7acf780dd850956"}, + {file = "pyarrow-13.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:fda7857e35993673fcda603c07d43889fca60a5b254052a462653f8656c64f44"}, + {file = "pyarrow-13.0.0-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:aac0ae0146a9bfa5e12d87dda89d9ef7c57a96210b899459fc2f785303dcbb67"}, + {file = "pyarrow-13.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d7759994217c86c161c6a8060509cfdf782b952163569606bb373828afdd82e8"}, + {file = "pyarrow-13.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:868a073fd0ff6468ae7d869b5fc1f54de5c4255b37f44fb890385eb68b68f95d"}, + {file = "pyarrow-13.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51be67e29f3cfcde263a113c28e96aa04362ed8229cb7c6e5f5c719003659d33"}, + {file = "pyarrow-13.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:d1b4e7176443d12610874bb84d0060bf080f000ea9ed7c84b2801df851320295"}, + {file = "pyarrow-13.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:69b6f9a089d116a82c3ed819eea8fe67dae6105f0d81eaf0fdd5e60d0c6e0944"}, + {file = "pyarrow-13.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:ab1268db81aeb241200e321e220e7cd769762f386f92f61b898352dd27e402ce"}, + {file = "pyarrow-13.0.0-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:ee7490f0f3f16a6c38f8c680949551053c8194e68de5046e6c288e396dccee80"}, + {file = "pyarrow-13.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e3ad79455c197a36eefbd90ad4aa832bece7f830a64396c15c61a0985e337287"}, + {file = "pyarrow-13.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:68fcd2dc1b7d9310b29a15949cdd0cb9bc34b6de767aff979ebf546020bf0ba0"}, + {file = "pyarrow-13.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc6fd330fd574c51d10638e63c0d00ab456498fc804c9d01f2a61b9264f2c5b2"}, + {file = "pyarrow-13.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:e66442e084979a97bb66939e18f7b8709e4ac5f887e636aba29486ffbf373763"}, + {file = "pyarrow-13.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:0f6eff839a9e40e9c5610d3ff8c5bdd2f10303408312caf4c8003285d0b49565"}, + {file = "pyarrow-13.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:8b30a27f1cddf5c6efcb67e598d7823a1e253d743d92ac32ec1eb4b6a1417867"}, + {file = "pyarrow-13.0.0-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:09552dad5cf3de2dc0aba1c7c4b470754c69bd821f5faafc3d774bedc3b04bb7"}, + {file = "pyarrow-13.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3896ae6c205d73ad192d2fc1489cd0edfab9f12867c85b4c277af4d37383c18c"}, + {file = "pyarrow-13.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6647444b21cb5e68b593b970b2a9a07748dd74ea457c7dadaa15fd469c48ada1"}, + {file = "pyarrow-13.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47663efc9c395e31d09c6aacfa860f4473815ad6804311c5433f7085415d62a7"}, + {file = "pyarrow-13.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:b9ba6b6d34bd2563345488cf444510588ea42ad5613df3b3509f48eb80250afd"}, + {file = "pyarrow-13.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:d00d374a5625beeb448a7fa23060df79adb596074beb3ddc1838adb647b6ef09"}, + {file = "pyarrow-13.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:c51afd87c35c8331b56f796eff954b9c7f8d4b7fef5903daf4e05fcf017d23a8"}, + {file = "pyarrow-13.0.0.tar.gz", hash = "sha256:83333726e83ed44b0ac94d8d7a21bbdee4a05029c3b1e8db58a863eec8fd8a33"}, ] [package.dependencies] @@ -1787,4 +1795,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "0d1b61d35e2e86f9180a845dc773718c367d1afddb0a758243003569ae041ed7" +content-hash = "c2a124a2b86705112334456a42c1983343f4e044a27bb5d44ca87da92731e572" diff --git a/python_src/pyproject.toml b/python_src/pyproject.toml index 648da4d2..3093c97b 100644 --- a/python_src/pyproject.toml +++ b/python_src/pyproject.toml @@ -17,7 +17,7 @@ snapshot = 'lamp_py.postgres.snapshot:run' [tool.poetry.dependencies] python = "^3.9" SQLAlchemy = "^1.4.39" -pyarrow = "^11.0.0" +pyarrow = "^13.0.0" boto3 = "^1.23.3" pandas = "^1.4.3" numpy = "^1.23.1" @@ -77,6 +77,6 @@ max-line-length = 80 min-similarity-lines = 10 # ignore session maker as it gives pylint fits # https://github.com/PyCQA/pylint/issues/7090 -ignored-classes = ['sqlalchemy.orm.session.sessionmaker'] +ignored-classes = ['sqlalchemy.orm.session.sessionmaker','pyarrow.compute'] # ignore the migrations directory. its going to have duplication and _that is ok_. ignore-paths = ["^src/lamp_py/migrations/.*$"] diff --git a/python_src/src/lamp_py/ingestion/config_busloc_trip.py b/python_src/src/lamp_py/ingestion/config_busloc_trip.py new file mode 100644 index 00000000..14215988 --- /dev/null +++ b/python_src/src/lamp_py/ingestion/config_busloc_trip.py @@ -0,0 +1,89 @@ +from typing import List, Tuple +import pyarrow + +from .gtfs_rt_detail import GTFSRTDetail +from .gtfs_rt_structs import ( + trip_descriptor, + vehicle_descriptor, + stop_time_event, +) + + +class RtBusTripDetail(GTFSRTDetail): + """ + Detail for how to convert RT GTFS Trip Updates from json entries into + parquet tables. + """ + + def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: + """modify table schema before write to parquet""" + return self.flatten_schema( + self.explode_table_column( + self.flatten_schema(table), "trip_update.stop_time_update" + ) + ) + + @property + def import_schema(self) -> pyarrow.schema: + return pyarrow.schema( + [ + ("id", pyarrow.string()), + ( + "trip_update", + pyarrow.struct( + [ + ( + "timestamp", + pyarrow.uint64(), + ), # Not currently provided by Busloc + ( + "delay", + pyarrow.int32(), + ), # Not currently provided by Busloc + ( + "trip", + trip_descriptor, + ), # Busloc currently only provides trip_id, route_id and schedule_relationship + ( + "vehicle", + vehicle_descriptor, + ), # Busloc currently only provides id and label + ( + "stop_time_update", + pyarrow.list_( + pyarrow.struct( + [ + ("stop_sequence", pyarrow.uint32()), + ("stop_id", pyarrow.string()), + ("arrival", stop_time_event), + ("departure", stop_time_event), + ( + "schedule_relationship", + pyarrow.string(), + ), + ("cause_id", pyarrow.uint16()), + ( + "cause_description", + pyarrow.string(), + ), + ("remark", pyarrow.string()), + ] + ) + ), + ), + ] + ), + ), + ] + ) + + @property + def table_sort_order(self) -> List[Tuple[str, str]]: + return [ + ("trip_update.trip.start_date", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.route_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), + ("feed_timestamp", "ascending"), + ] diff --git a/python_src/src/lamp_py/ingestion/config_busloc_vehicle.py b/python_src/src/lamp_py/ingestion/config_busloc_vehicle.py new file mode 100644 index 00000000..24c67ab8 --- /dev/null +++ b/python_src/src/lamp_py/ingestion/config_busloc_vehicle.py @@ -0,0 +1,69 @@ +from typing import List, Tuple +import pyarrow + +from .gtfs_rt_detail import GTFSRTDetail +from .gtfs_rt_structs import ( + position, + vehicle_descriptor, + trip_descriptor, +) + + +class RtBusVehicleDetail(GTFSRTDetail): + """ + Detail for how to convert RT GTFS Bus Vehicle Positions from json + entries into parquet tables. + """ + + @property + def import_schema(self) -> pyarrow.schema: + return pyarrow.schema( + [ + ("id", pyarrow.string()), + ("is_deleted", pyarrow.bool_()), + ( + "vehicle", + pyarrow.struct( + [ + ("position", position), + ("location_source", pyarrow.string()), + ("timestamp", pyarrow.uint64()), + ("trip", trip_descriptor), + ("vehicle", vehicle_descriptor), + ( + "operator", + pyarrow.struct( + [ + ("id", pyarrow.string()), + ("first_name", pyarrow.string()), + ("last_name", pyarrow.string()), + ("name", pyarrow.string()), + ("logon_time", pyarrow.uint64()), + ] + ), + ), + ("block_id", pyarrow.string()), + ("run_id", pyarrow.string()), + ("stop_id", pyarrow.string()), + ("current_stop_sequence", pyarrow.uint32()), + ("revenue", pyarrow.bool_()), + ("current_status", pyarrow.string()), + ("load", pyarrow.uint16()), + ("capacity", pyarrow.uint16()), + ("occupancy_percentage", pyarrow.uint16()), + ("occupancy_status", pyarrow.string()), + ] + ), + ), + ] + ) + + @property + def table_sort_order(self) -> List[Tuple[str, str]]: + return [ + ("vehicle.trip.start_date", "ascending"), + ("vehicle.trip.route_id", "ascending"), + ("vehicle.block_id", "ascending"), + ("vehicle.vehicle.id", "ascending"), + ("feed_timestamp", "ascending"), + ] diff --git a/python_src/src/lamp_py/ingestion/config_rt_alerts.py b/python_src/src/lamp_py/ingestion/config_rt_alerts.py index 39c438e7..19479e0c 100644 --- a/python_src/src/lamp_py/ingestion/config_rt_alerts.py +++ b/python_src/src/lamp_py/ingestion/config_rt_alerts.py @@ -2,6 +2,10 @@ import pyarrow from .gtfs_rt_detail import GTFSRTDetail +from .gtfs_rt_structs import ( + trip_descriptor, + translated_string, +) class RtAlertsDetail(GTFSRTDetail): @@ -11,211 +15,118 @@ class RtAlertsDetail(GTFSRTDetail): """ @property - def export_schema(self) -> pyarrow.schema: + def import_schema(self) -> pyarrow.schema: return pyarrow.schema( [ - # header -> timestamp - ("year", pyarrow.int16()), - ("month", pyarrow.int8()), - ("day", pyarrow.int8()), - ("hour", pyarrow.int8()), - ("feed_timestamp", pyarrow.int64()), - # entity - ("entity_id", pyarrow.string()), # actual label: id - # entity -> alert - ("effect", pyarrow.string()), - ("effect_detail", pyarrow.string()), - ("cause", pyarrow.string()), - ("cause_detail", pyarrow.string()), - ("severity", pyarrow.int64()), - ("severity_level", pyarrow.string()), - ("created_timestamp", pyarrow.int64()), - ("last_modified_timestamp", pyarrow.int64()), - ("alert_lifecycle", pyarrow.string()), - ("duration_certainty", pyarrow.string()), - ("last_push_notification", pyarrow.int64()), + ("id", pyarrow.string()), ( - "active_period", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("start", pyarrow.int64()), - pyarrow.field("end", pyarrow.int64()), - ] - ) - ), - ), - ("reminder_times", pyarrow.list_(pyarrow.int64())), - ("closed_timestamp", pyarrow.int64()), - # entity -> alert -> short_header_text - ( - "short_header_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> header_text - ( - "header_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> description_text - ( - "description_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> service_effect_text - ( - "service_effect_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> timeframe_text - ( - "timeframe_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> url - ( - "url_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - # entity -> alert -> recurrence_text - ( - "recurrence_text_translation", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("text", pyarrow.string()), - pyarrow.field("language", pyarrow.string()), - ] - ) - ), - ), # actual label: translation - ( - "informed_entity", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("stop_id", pyarrow.string()), - pyarrow.field("facility_id", pyarrow.string()), - pyarrow.field( - "activities", - pyarrow.list_(pyarrow.string()), + "alert", + pyarrow.struct( + [ + ( + "active_period", + pyarrow.list_( + pyarrow.struct( + [ + ("start", pyarrow.uint64()), + ("end", pyarrow.uint64()), + ] + ) ), - pyarrow.field("agency_id", pyarrow.string()), - pyarrow.field("route_type", pyarrow.int64()), - pyarrow.field("route_id", pyarrow.string()), - pyarrow.field( - "trip", + ), + ( + "informed_entity", + pyarrow.list_( pyarrow.struct( [ - pyarrow.field( - "route_id", pyarrow.string() - ), - pyarrow.field( - "trip_id", pyarrow.string() - ), - pyarrow.field( - "direcction_id", pyarrow.int64() + ("agency_id", pyarrow.string()), + ("route_id", pyarrow.string()), + ("route_type", pyarrow.int32()), + ("direction_id", pyarrow.uint8()), + ("trip", trip_descriptor), + ("stop_id", pyarrow.string()), + ("facility_id", pyarrow.string()), + ( + "activities", + pyarrow.list_(pyarrow.string()), ), ] - ), + ) ), - pyarrow.field("direction_id", pyarrow.int64()), - ] - ) + ), + ("cause", pyarrow.string()), + ( + "cause_detail", + pyarrow.string(), + ), # type does not match spec type of + ("effect", pyarrow.string()), + ( + "effect_detail", + pyarrow.string(), + ), # type does not match spec type of + ("url", translated_string), + ("header_text", translated_string), + ("description_text", translated_string), + ("severity_level", pyarrow.string()), + ( + "severity", + pyarrow.uint16(), + ), # not in message Alert struct spec + ( + "created_timestamp", + pyarrow.uint64(), + ), # not in message Alert struct spec + ( + "last_modified_timestamp", + pyarrow.uint64(), + ), # not in message Alert struct spec + ( + "last_push_notification_timestamp", + pyarrow.uint64(), + ), # not in message Alert struct spec + ( + "closed_timestamp", + pyarrow.int64(), + ), # not in message Alert struct spec + ( + "alert_lifecycle", + pyarrow.string(), + ), # not in message Alert struct spec + ( + "duration_certainty", + pyarrow.string(), + ), # not in message Alert struct spec + ( + "reminder_times", + pyarrow.list_(pyarrow.uint64()), + ), # not in message Alert struct spec + ( + "short_header_text", + translated_string, + ), # not in message Alert struct spec + ( + "service_effect_text", + translated_string, + ), # not in message Alert struct spec + ( + "timeframe_text", + translated_string, + ), # not in message Alert struct spec + ( + "recurrence_text", + translated_string, + ), # not in message Alert struct spec + ] ), ), ] ) - @property - def transformation_schema(self) -> dict: - return { - "entity": (("id", "entity_id"),), - "entity,alert": ( - ("effect",), - ("effect_detail",), - ("cause",), - ("cause_detail",), - ("severity",), - ("severity_level",), - ("created_timestamp",), - ("last_modified_timestamp",), - ("alert_lifecycle",), - ("duration_certainty",), - ("last_push_notification",), - ("active_period",), - ("reminder_times",), - ("closed_timestamp",), - ("informed_entity",), - ), - "entity,alert,short_header_text": ( - ("translation", "short_header_text_translation"), - ), - "entity,alert,header_text": ( - ("translation", "header_text_translation"), - ), - "entity,alert,description_text": ( - ("translation", "description_text_translation"), - ), - "entity,alert,service_effect_text": ( - ("translation", "service_effect_text_translation"), - ), - "entity,alert,timeframe_text": ( - ("translation", "timeframe_text_translation"), - ), - "entity,alert,url": (("translation", "url_translation"),), - "entity,alert,recurrence_text": ( - ("translation", "recurrence_text_translation"), - ), - } - @property def table_sort_order(self) -> List[Tuple[str, str]]: return [ - ("severity", "ascending"), - ("effect", "ascending"), - ("cause", "ascending"), + ("alert.severity", "ascending"), + ("alert.effect", "ascending"), + ("alert.cause", "ascending"), ("feed_timestamp", "ascending"), ] diff --git a/python_src/src/lamp_py/ingestion/config_rt_bus_trip.py b/python_src/src/lamp_py/ingestion/config_rt_bus_trip.py deleted file mode 100644 index c91cfc7b..00000000 --- a/python_src/src/lamp_py/ingestion/config_rt_bus_trip.py +++ /dev/null @@ -1,129 +0,0 @@ -from typing import List, Tuple -import pyarrow - -from .gtfs_rt_detail import GTFSRTDetail - - -class RtBusTripDetail(GTFSRTDetail): - """ - Detail for how to convert RT GTFS Trip Updates from json entries into - parquet tables. - """ - - @property - def export_schema(self) -> pyarrow.schema: - return pyarrow.schema( - [ - # header -> timestamp - ("year", pyarrow.int16()), - ("month", pyarrow.int8()), - ("day", pyarrow.int8()), - ("hour", pyarrow.int8()), - ("feed_timestamp", pyarrow.int64()), # actual: timestamp - # entity - ("entity_id", pyarrow.string()), # actual label: id - # "is_deleted" all null during schema review - # "alert" all null during schema review - # "vehicle" all null during schema review - # entity -> trip_update - # "timestamp" all null during schema review - # "cause_description" all null during schema review - # "cause_id" all null during schema review - # "delay" all null during schema review - # "remark" all null during schema review - ( - "stop_time_update", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field( - "departure", - pyarrow.struct( - [ - pyarrow.field( - "delay", pyarrow.int64() - ), - pyarrow.field( - "time", pyarrow.int64() - ), - pyarrow.field( - "uncertainty", pyarrow.int64() - ), - ] - ), - ), - pyarrow.field("stop_id", pyarrow.string()), - pyarrow.field("stop_sequence", pyarrow.int64()), - pyarrow.field("cause_id", pyarrow.int64()), - pyarrow.field( - "cause_description", pyarrow.string() - ), - pyarrow.field( - "arrival", - pyarrow.struct( - [ - pyarrow.field( - "delay", pyarrow.int64() - ), - pyarrow.field( - "time", pyarrow.int64() - ), - pyarrow.field( - "uncertainty", pyarrow.int64() - ), - ] - ), - ), - pyarrow.field( - "schedule_relationship", pyarrow.string() - ), - pyarrow.field("remark", pyarrow.string()), - ] - ) - ), - ), - # entity -> trip_update -> trip - ("direction_id", pyarrow.int64()), - ("route_id", pyarrow.string()), - ("route_pattern_id", pyarrow.string()), - ("schedule_relationship", pyarrow.string()), - ("start_date", pyarrow.string()), - ("start_time", pyarrow.string()), - ("trip_id", pyarrow.string()), - # entity -> trip_update -> vehicle - # "license_plate" all null during schema review - ("vehicle_id", pyarrow.string()), # actual label: id - ("vehicle_label", pyarrow.string()), # actual label: label - ] - ) - - @property - def transformation_schema(self) -> dict: - return { - "entity": (("id", "entity_id"),), - "entity,trip_update": (("stop_time_update",),), - "entity,trip_update,trip": ( - ("direction_id",), - ("route_id",), - ("route_pattern_id",), - ("schedule_relationship",), - ("start_date",), - ("start_time",), - ("trip_id",), - ), - "entity,trip_update,vehicle": ( - ("id", "vehicle_id"), - ("label", "vehicle_label"), - ), - } - - @property - def table_sort_order(self) -> List[Tuple[str, str]]: - return [ - ("start_date", "ascending"), - ("route_pattern_id", "ascending"), - ("route_id", "ascending"), - ("direction_id", "ascending"), - ("vehicle_id", "ascending"), - ("feed_timestamp", "ascending"), - ] diff --git a/python_src/src/lamp_py/ingestion/config_rt_bus_vehicle.py b/python_src/src/lamp_py/ingestion/config_rt_bus_vehicle.py deleted file mode 100644 index 780273a3..00000000 --- a/python_src/src/lamp_py/ingestion/config_rt_bus_vehicle.py +++ /dev/null @@ -1,125 +0,0 @@ -from typing import List, Tuple -import pyarrow - -from .gtfs_rt_detail import GTFSRTDetail - - -class RtBusVehicleDetail(GTFSRTDetail): - """ - Detail for how to convert RT GTFS Bus Vehicle Positions from json - entries into parquet tables. - """ - - @property - def export_schema(self) -> pyarrow.schema: - return pyarrow.schema( - [ - # header -> timestamp - ("year", pyarrow.int16()), - ("month", pyarrow.int8()), - ("day", pyarrow.int8()), - ("hour", pyarrow.int8()), - ("feed_timestamp", pyarrow.int64()), - # entity - ("entity_id", pyarrow.string()), # actual label: id - ( - "entity_is_deleted", - pyarrow.bool_(), - ), # actual label: is_deleted - # entity -> vehicle - ("block_id", pyarrow.string()), - ("capacity", pyarrow.int64()), - ("current_stop_sequence", pyarrow.int64()), - ("load", pyarrow.int64()), - ("location_source", pyarrow.string()), - ("occupancy_percentage", pyarrow.int64()), - ("occupancy_status", pyarrow.string()), - ("revenue", pyarrow.bool_()), - ("run_id", pyarrow.string()), - ("stop_id", pyarrow.string()), - ( - "vehicle_timestamp", - pyarrow.int64(), - ), # actual label: timestamp - # entity -> vehicle -> position - ("bearing", pyarrow.int64()), - ("latitude", pyarrow.float64()), - ("longitude", pyarrow.float64()), - ("speed", pyarrow.float64()), - # entity -> vehicle -> trip - ("overload_id", pyarrow.int64()), - ("overload_offset", pyarrow.int64()), - ("route_id", pyarrow.string()), - ("schedule_relationship", pyarrow.string()), - ("start_date", pyarrow.string()), - ("trip_id", pyarrow.string()), - # entity -> vehicle -> vehicle - ("vehicle_id", pyarrow.string()), # actual label: id - ("vehicle_label", pyarrow.string()), # actual label: label - ("assignment_status", pyarrow.string()), - # entity -> vehicle -> operator - ("operator_id", pyarrow.string()), # actual label: id - ("logon_time", pyarrow.int64()), - ("name", pyarrow.string()), - ("first_name", pyarrow.string()), - ("last_name", pyarrow.string()), - ] - ) - - @property - def transformation_schema(self) -> dict: - return { - "entity": ( - ("id", "entity_id"), - ("is_deleted", "entity_is_deleted"), - ), - "entity,vehicle": ( - ("block_id",), - ("capacity",), - ("current_stop_sequence",), - ("load",), - ("location_source",), - ("occupancy_percentage",), - ("occupancy_status",), - ("revenue",), - ("run_id",), - ("stop_id",), - ("timestamp", "vehicle_timestamp"), - ), - "entity,vehicle,position": ( - ("bearing",), - ("latitude",), - ("longitude",), - ("speed",), - ), - "entity,vehicle,trip": ( - ("overload_id",), - ("overload_offset",), - ("route_id",), - ("schedule_relationship",), - ("start_date",), - ("trip_id",), - ), - "entity,vehicle,vehicle": ( - ("id", "vehicle_id"), - ("label", "vehicle_label"), - ("assignment_status",), - ), - "entity,vehicle,operator": ( - ("id", "operator_id"), - ("logon_time",), - ("name",), - ("first_name",), - ("last_name",), - ), - } - - @property - def table_sort_order(self) -> List[Tuple[str, str]]: - return [ - ("start_date", "ascending"), - ("route_id", "ascending"), - ("block_id", "ascending"), - ("vehicle_id", "ascending"), - ("feed_timestamp", "ascending"), - ] diff --git a/python_src/src/lamp_py/ingestion/config_rt_trip.py b/python_src/src/lamp_py/ingestion/config_rt_trip.py index 939082e8..44eae397 100644 --- a/python_src/src/lamp_py/ingestion/config_rt_trip.py +++ b/python_src/src/lamp_py/ingestion/config_rt_trip.py @@ -2,6 +2,11 @@ import pyarrow from .gtfs_rt_detail import GTFSRTDetail +from .gtfs_rt_structs import ( + trip_descriptor, + vehicle_descriptor, + stop_time_event, +) class RtTripDetail(GTFSRTDetail): @@ -10,116 +15,64 @@ class RtTripDetail(GTFSRTDetail): parquet tables. """ + def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: + """modify table schema before write to parquet""" + return self.flatten_schema( + self.explode_table_column( + self.flatten_schema(table), "trip_update.stop_time_update" + ) + ) + @property - def export_schema(self) -> pyarrow.schema: + def import_schema(self) -> pyarrow.schema: return pyarrow.schema( [ - # header -> timestamp - ("year", pyarrow.int16()), - ("month", pyarrow.int8()), - ("day", pyarrow.int8()), - ("hour", pyarrow.int8()), - ("feed_timestamp", pyarrow.int64()), - # entity - ("entity_id", pyarrow.string()), # actual label: id - # entity -> trip_update - ("timestamp", pyarrow.int64()), + ("id", pyarrow.string()), ( - "stop_time_update", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field( - "departure", - pyarrow.struct( - [ - pyarrow.field( - "time", pyarrow.int64() - ), - pyarrow.field( - "uncertainty", pyarrow.int64() - ), - ] - ), - ), - pyarrow.field("stop_id", pyarrow.string()), - pyarrow.field("stop_sequence", pyarrow.int64()), - pyarrow.field( - "arrival", + "trip_update", + pyarrow.struct( + [ + ("timestamp", pyarrow.uint64()), + ("delay", pyarrow.int32()), + ("trip", trip_descriptor), + ("vehicle", vehicle_descriptor), + ( + "stop_time_update", + pyarrow.list_( pyarrow.struct( [ - pyarrow.field( - "time", pyarrow.int64() + ("stop_sequence", pyarrow.uint32()), + ("stop_id", pyarrow.string()), + ("arrival", stop_time_event), + ("departure", stop_time_event), + ( + "schedule_relationship", + pyarrow.string(), ), - pyarrow.field( - "uncertainty", pyarrow.int64() + ( + "boarding_status", + pyarrow.string(), ), ] - ), + ) ), - pyarrow.field( - "schedule_relationship", pyarrow.string() - ), - pyarrow.field( - "boarding_status", pyarrow.string() - ), - ] - ) + ), + ] ), ), - # entity -> trip_update -> trip - ("direction_id", pyarrow.int64()), - ("route_id", pyarrow.string()), - ("start_date", pyarrow.string()), - ("start_time", pyarrow.string()), - ("trip_id", pyarrow.string()), - ("route_pattern_id", pyarrow.string()), - ("schedule_relationship", pyarrow.string()), - # entity -> trip_update -> vehicle - ("vehicle_id", pyarrow.string()), # actual label: id - ("vehicle_label", pyarrow.string()), # actual label: label ] ) - @property - def transformation_schema(self) -> dict: - return { - "entity": (("id", "entity_id"),), - "entity,trip_update": ( - ("timestamp",), - ("stop_time_update",), - ), - "entity,trip_update,trip": ( - ("direction_id",), - ("route_id",), - ("start_date",), - ("start_time",), - ("trip_id",), - ("route_pattern_id",), - ("schedule_relationship",), - ), - "entity,trip_update,vehicle": ( - ( - "id", - "vehicle_id", - ), - ( - "label", - "vehicle_label", - ), - ), - } - # pylint: disable=R0801 # Similar lines in 2 files @property def table_sort_order(self) -> List[Tuple[str, str]]: return [ - ("start_date", "ascending"), - ("route_pattern_id", "ascending"), - ("route_id", "ascending"), - ("direction_id", "ascending"), - ("vehicle_id", "ascending"), + ("trip_update.trip.start_date", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.route_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), ("feed_timestamp", "ascending"), ] diff --git a/python_src/src/lamp_py/ingestion/config_rt_vehicle.py b/python_src/src/lamp_py/ingestion/config_rt_vehicle.py index 5f43d9c9..7b133f48 100644 --- a/python_src/src/lamp_py/ingestion/config_rt_vehicle.py +++ b/python_src/src/lamp_py/ingestion/config_rt_vehicle.py @@ -2,6 +2,7 @@ import pyarrow from .gtfs_rt_detail import GTFSRTDetail +from .gtfs_rt_structs import position, trip_descriptor, vehicle_descriptor class RtVehicleDetail(GTFSRTDetail): @@ -11,103 +12,35 @@ class RtVehicleDetail(GTFSRTDetail): """ @property - def export_schema(self) -> pyarrow.schema: + def import_schema(self) -> pyarrow.schema: return pyarrow.schema( [ - # header -> timestamp - ("year", pyarrow.int16()), - ("month", pyarrow.int8()), - ("day", pyarrow.int8()), - ("hour", pyarrow.int8()), - ("feed_timestamp", pyarrow.int64()), - # entity - ("entity_id", pyarrow.string()), # actual label: id - # entity -> vehicle - ("current_status", pyarrow.string()), - ("current_stop_sequence", pyarrow.int64()), - ("occupancy_percentage", pyarrow.int64()), - ("occupancy_status", pyarrow.string()), - ("stop_id", pyarrow.string()), + ("id", pyarrow.string()), ( - "vehicle_timestamp", - pyarrow.int64(), - ), # actual label: timestamp - # entity -> vehicle -> position - ("bearing", pyarrow.int64()), - ("latitude", pyarrow.float64()), - ("longitude", pyarrow.float64()), - ("speed", pyarrow.float64()), - # entity -> vehicle -> trip - ("direction_id", pyarrow.int64()), - ("route_id", pyarrow.string()), - ("schedule_relationship", pyarrow.string()), - ("start_date", pyarrow.string()), - ("start_time", pyarrow.string()), - ("trip_id", pyarrow.string()), - # entity -> vehicle -> vehicle - ("vehicle_id", pyarrow.string()), # actual label: id - ("vehicle_label", pyarrow.string()), # actual label: label - ( - "vehicle_consist", - pyarrow.list_( - pyarrow.struct( - [ - pyarrow.field("label", pyarrow.string()), - ] - ) + "vehicle", + pyarrow.struct( + [ + ("current_status", pyarrow.string()), + ("current_stop_sequence", pyarrow.uint32()), + ("occupancy_percentage", pyarrow.uint32()), + ("occupancy_status", pyarrow.string()), + ("stop_id", pyarrow.string()), + ("timestamp", pyarrow.uint64()), + ("position", position), + ("trip", trip_descriptor), + ("vehicle", vehicle_descriptor), + ] ), - ), # actual label: consist + ), ] ) - @property - def transformation_schema(self) -> dict: - return { - "entity": (("id", "entity_id"),), - "entity,vehicle": ( - ("current_status",), - ("current_stop_sequence",), - ("occupancy_percentage",), - ("occupancy_status",), - ("stop_id",), - ("timestamp", "vehicle_timestamp"), - ), - "entity,vehicle,position": ( - ("bearing",), - ("latitude",), - ("longitude",), - ("speed",), - ), - "entity,vehicle,trip": ( - ("direction_id",), - ("route_id",), - ("schedule_relationship",), - ("start_date",), - ("start_time",), - ("trip_id",), - ), - "entity,vehicle,vehicle": ( - ( - "id", - "vehicle_id", - ), - ( - "label", - "vehicle_label", - ), - ( - "consist", - "vehicle_consist", - ), - ), - } - @property def table_sort_order(self) -> List[Tuple[str, str]]: return [ - ("start_date", "ascending"), - ("route_id", "ascending"), - ("vehicle_id", "ascending"), - ("direction_id", "ascending"), + ("vehicle.trip.start_date", "ascending"), + ("vehicle.trip.route_id", "ascending"), + ("vehicle.vehicle.id", "ascending"), + ("vehicle.trip.direction_id", "ascending"), ("feed_timestamp", "ascending"), ] diff --git a/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py b/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py index b70f2248..91adf376 100644 --- a/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/python_src/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -15,8 +15,8 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger from .config_rt_alerts import RtAlertsDetail -from .config_rt_bus_trip import RtBusTripDetail -from .config_rt_bus_vehicle import RtBusVehicleDetail +from .config_busloc_trip import RtBusTripDetail +from .config_busloc_vehicle import RtBusVehicleDetail from .config_rt_trip import RtTripDetail from .config_rt_vehicle import RtVehicleDetail from .converter import ConfigType, Converter @@ -248,33 +248,6 @@ def yield_check( yield table del self.table_groups[iter_ts] - def record_from_entity(self, entity: dict) -> dict: - """ - Convert an entity in the ingested json dict into a record for a parquet - table. - """ - - def drill_entity(drill_keys: str) -> Optional[dict]: - """Util function for recursively getting data out of entity""" - ret_dict = entity - for key in drill_keys.split(",")[1:]: - value = ret_dict.get(key) - if value is None: - return value - ret_dict = value - return ret_dict - - record: dict = {} - for drill_keys, fields in self.detail.transformation_schema.items(): - pull_dict = drill_entity(drill_keys) - for get_field in fields: - if pull_dict is None: - record[get_field[-1]] = None - else: - record[get_field[-1]] = pull_dict.get(get_field[0]) - - return record - def gz_to_pyarrow( self, filename: str ) -> Tuple[Optional[datetime], str, Optional[pyarrow.table]]: @@ -311,28 +284,44 @@ def gz_to_pyarrow( ) as file: json_data = json.load(file) - # Create empty 'table' as dict of lists for export schema - table = self.detail.empty_table() - # parse timestamp info out of the header feed_timestamp = json_data["header"]["timestamp"] timestamp = datetime.fromtimestamp(feed_timestamp, timezone.utc) - # for each entity in the list, create a record, add it to the table - for entity in json_data["entity"]: - record = self.record_from_entity(entity=entity) - record.update( - { - "year": timestamp.year, - "month": timestamp.month, - "day": timestamp.day, - "hour": timestamp.hour, - "feed_timestamp": feed_timestamp, - } - ) + table = pyarrow.Table.from_pylist( + json_data["entity"], schema=self.detail.import_schema + ) - for key, value in record.items(): - table[key].append(value) + table = table.append_column( + "year", + pyarrow.array( + [timestamp.year] * table.num_rows, pyarrow.uint16() + ), + ) + table = table.append_column( + "month", + pyarrow.array( + [timestamp.month] * table.num_rows, pyarrow.uint8() + ), + ) + table = table.append_column( + "day", + pyarrow.array( + [timestamp.day] * table.num_rows, pyarrow.uint8() + ), + ) + table = table.append_column( + "hour", + pyarrow.array( + [timestamp.hour] * table.num_rows, pyarrow.uint8() + ), + ) + table = table.append_column( + "feed_timestamp", + pyarrow.array( + [feed_timestamp] * table.num_rows, pyarrow.uint64() + ), + ) except FileNotFoundError as _: return (None, filename, None) @@ -343,7 +332,7 @@ def gz_to_pyarrow( return ( timestamp, filename, - pyarrow.table(table, schema=self.detail.export_schema), + table, ) def write_table(self, table: pyarrow.table) -> None: @@ -355,6 +344,8 @@ def write_table(self, table: pyarrow.table) -> None: try: s3_prefix = str(self.config_type) + table = self.detail.transform_for_write(table) + if self.detail.table_sort_order is not None: table = table.sort_by(self.detail.table_sort_order) diff --git a/python_src/src/lamp_py/ingestion/gtfs_rt_detail.py b/python_src/src/lamp_py/ingestion/gtfs_rt_detail.py index d66e7648..a9a47816 100644 --- a/python_src/src/lamp_py/ingestion/gtfs_rt_detail.py +++ b/python_src/src/lamp_py/ingestion/gtfs_rt_detail.py @@ -1,8 +1,9 @@ from abc import ABC from abc import abstractmethod -from typing import Dict, Optional, List, Tuple +from typing import Optional, List, Tuple import pyarrow +import pyarrow.compute as pc class GTFSRTDetail(ABC): @@ -13,25 +14,47 @@ class GTFSRTDetail(ABC): defined. """ - @property - @abstractmethod - def export_schema(self) -> pyarrow.schema: - """Get the schema for the parquet table generated by this config""" + def flatten_schema(self, table: pyarrow.table) -> pyarrow.table: + """flatten pyarrow table if struct column type exists""" + for field in table.schema: + if str(field.type).startswith("struct"): + return self.flatten_schema(table.flatten()) + return table + + def explode_table_column( + self, table: pyarrow.table, column: str + ) -> pyarrow.table: + """explode list-like column of pyarrow table by creating rows for each list value""" + other_columns = list(table.schema.names) + other_columns.remove(column) + indices = pc.list_parent_indices(table[column]) + return pyarrow.concat_tables( + [ + table.select(other_columns) + .take(indices) + .append_column( + pyarrow.field( + column, table.schema.field(column).type.value_type + ), + pc.list_flatten(table[column]), + ), + table.filter( + pc.list_value_length(table[column]).is_null() + ).select(other_columns), + ], + promote=True, + ) + + def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: + """modify table schema before write to parquet""" + return self.flatten_schema(table) @property @abstractmethod - def transformation_schema(self) -> dict: - """ - Get the transformation schema that describes how what json keys to copy - and how to remap them if necessary - """ - - def empty_table(self) -> Dict[str, list]: - """Create an empty table using this details parrow schema.""" - return {key.name: [] for key in self.export_schema} + def import_schema(self) -> pyarrow.schema: + """Get the import schema for the parquet table generated by this config""" @property - @abstractmethod def table_sort_order(self) -> Optional[List[Tuple[str, str]]]: """Provide list of fields to sort pyarrow table before writing to parquet""" return None diff --git a/python_src/src/lamp_py/ingestion/gtfs_rt_structs.py b/python_src/src/lamp_py/ingestion/gtfs_rt_structs.py new file mode 100644 index 00000000..8a58d91d --- /dev/null +++ b/python_src/src/lamp_py/ingestion/gtfs_rt_structs.py @@ -0,0 +1,69 @@ +import pyarrow + +position = pyarrow.struct( + [ + ("bearing", pyarrow.uint16()), + ("latitude", pyarrow.float64()), + ("longitude", pyarrow.float64()), + ("speed", pyarrow.float64()), + ("odometer", pyarrow.float64()), + ] +) + +trip_descriptor = pyarrow.struct( + [ + ("trip_id", pyarrow.string()), + ("route_id", pyarrow.string()), + ("direction_id", pyarrow.uint8()), + ("start_time", pyarrow.string()), + ("start_date", pyarrow.string()), + ("schedule_relationship", pyarrow.string()), + ("route_pattern_id", pyarrow.string()), + ("tm_trip_id", pyarrow.string()), # Only used by Busloc + ("overload_id", pyarrow.int64()), # Only used by Busloc + ("overload_offset", pyarrow.int64()), # Only used by Busloc + ] +) + +vehicle_descriptor = pyarrow.struct( + [ + ("id", pyarrow.string()), + ("label", pyarrow.string()), + ("license_plate", pyarrow.string()), + ( + "consist", + pyarrow.list_( + pyarrow.struct( + [ + ("label", pyarrow.string()), + ] + ), + ), + ), + ("assignment_status", pyarrow.string()), # Only used by Busloc + ] +) + +translated_string = pyarrow.struct( + [ + ( + "translation", + pyarrow.list_( + pyarrow.struct( + [ + ("text", pyarrow.string()), + ("language", pyarrow.string()), + ] + ) + ), + ) + ] +) + +stop_time_event = pyarrow.struct( + [ + ("delay", pyarrow.int32()), + ("time", pyarrow.int64()), + ("uncertainty", pyarrow.int32()), + ] +) diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py index d8b0f601..55d4f17b 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_trip_updates.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Iterator, List, Optional, Union +from typing import Iterator, List, Union import numpy import pandas @@ -25,91 +25,34 @@ def get_tu_dataframe_chunks( """ trip_update_columns = [ "feed_timestamp", - "timestamp", - "stop_time_update", - "direction_id", - "route_id", - "start_date", - "start_time", - "vehicle_id", - "trip_id", + "trip_update.timestamp", + "trip_update.stop_time_update.stop_id", + "trip_update.stop_time_update.arrival.time", + "trip_update.trip.direction_id", + "trip_update.trip.route_id", + "trip_update.trip.start_date", + "trip_update.trip.start_time", + "trip_update.vehicle.id", + "trip_update.trip.trip_id", ] trip_update_filters = [ - ("direction_id", "in", (0, 1)), - ("trip_id", "!=", "None"), - ("vehicle_id", "!=", "None"), - ("route_id", "in", route_ids), + ("trip_update.trip.direction_id", "in", (0, 1)), + ("trip_update.trip.trip_id", "!=", "None"), + ("trip_update.vehicle.id", "!=", "None"), + ("trip_update.trip.route_id", "in", route_ids), + ("trip_update.stop_time_update.arrival.time", ">", 0), ] + # 100_000 batch size should result in ~5-6 GB of memory use per batch # of trip update records return read_parquet_chunks( to_load, - max_rows=100_000, + max_rows=250_000, columns=trip_update_columns, filters=trip_update_filters, ) -# pylint: disable=too-many-arguments -def explode_stop_time_update( - stop_time_update: Optional[List[Dict[str, Any]]], - timestamp: int, - direction_id: bool, - route_id: Any, - service_date: Optional[int], - start_time: Optional[int], - vehicle_id: Any, - trip_id: Any, -) -> Optional[List[dict]]: - """ - explode nested list of dicts in stop_time_update column - - to be used with numpy vectorize - """ - append_dict = { - "timestamp": timestamp, - "direction_id": direction_id, - "route_id": route_id, - "service_date": service_date, - "start_time": start_time, - "vehicle_id": vehicle_id, - "trip_id": trip_id, - } - return_list: List[Dict[str, Any]] = [] - - # fix: https://app.asana.com/0/1203185331040541/1203495730837934 - # it appears that numpy.vectorize batches function inputs which can - # result in None being passed in for stop_time_update - if stop_time_update is None: - return None - - for record in stop_time_update: - try: - arrival_time = int(record["arrival"]["time"]) - except (TypeError, KeyError): - continue - # filter out stop event predictions that are too far into the future - # and are unlikely to be used as a final stop event prediction - # (2 minutes) or predictions that go into the past (negative values) - if arrival_time - timestamp < 0 or arrival_time - timestamp > 120: - continue - append_dict.update( - { - "stop_id": record.get("stop_id"), - "tu_stop_timestamp": arrival_time, - } - ) - return_list.append(append_dict.copy()) - - if len(return_list) == 0: - return None - - return return_list - - -# pylint: enable=too-many-arguments - - def get_and_unwrap_tu_dataframe( paths: Union[str, List[str]], route_ids: List[str] ) -> pandas.DataFrame: @@ -122,13 +65,27 @@ def get_and_unwrap_tu_dataframe( process_logger = ProcessLogger("tu.get_and_unwrap_dataframe") process_logger.log_start() - events = pandas.Series(dtype="object") + trip_updates = pandas.DataFrame() + + rename_mapper = { + "trip_update.timestamp": "timestamp", + "trip_update.stop_time_update.stop_id": "stop_id", + "trip_update.stop_time_update.arrival.time": "tu_stop_timestamp", + "trip_update.trip.direction_id": "direction_id", + "trip_update.trip.route_id": "route_id", + "trip_update.trip.start_date": "start_date", + "trip_update.trip.start_time": "start_time", + "trip_update.vehicle.id": "vehicle_id", + "trip_update.trip.trip_id": "trip_id", + } # get_tu_dataframe_chunks set to pull ~100_000 trip update records # per batch, this should result in ~5-6 GB of memory use per batch # after batch goes through explod_stop_time_update vectorize operation, # resulting Series has negligible memory use for batch_events in get_tu_dataframe_chunks(paths, route_ids): + # rename columns from trip update parquet schema + batch_events = batch_events.rename(columns=rename_mapper) # use feed_timestamp if timestamp value is null batch_events["timestamp"] = batch_events["timestamp"].where( batch_events["timestamp"].notna(), @@ -156,26 +113,22 @@ def get_and_unwrap_tu_dataframe( .astype("Int64") ) - # expand and filter stop_time_update column using numpy vectorize - # numpy vectorize offers significantly better performance over pandas apply - # this will return a ndarray with values being list of dicts - vector_explode = numpy.vectorize(explode_stop_time_update) - batch_events = pandas.Series( - vector_explode( - batch_events.stop_time_update, - batch_events.timestamp, - batch_events.direction_id, - batch_events.route_id, - batch_events.service_date, - batch_events.start_time, - batch_events.vehicle_id, - batch_events.trip_id, + batch_events["tu_stop_timestamp"] = pandas.to_numeric( + batch_events["tu_stop_timestamp"] + ).astype("Int64") + + # filter out stop event predictions that are too far into the future + # and are unlikely to be used as a final stop event prediction + # (2 minutes) or predictions that go into the past (negative values) + batch_events = batch_events[ + (batch_events["tu_stop_timestamp"] - batch_events["timestamp"] >= 0) + & ( + batch_events["tu_stop_timestamp"] - batch_events["timestamp"] + < 120 ) - ).dropna() - events = pandas.concat([events, batch_events]) + ] - # transform Series of list of dicts into dataframe - trip_updates = pandas.json_normalize(events.explode()) + trip_updates = pandas.concat([trip_updates, batch_events]) process_logger.add_metadata(row_count=trip_updates.shape[0]) process_logger.log_complete() diff --git a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py index 81344664..bed9aa2f 100644 --- a/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py +++ b/python_src/src/lamp_py/performance_manager/l0_rt_vehicle_positions.py @@ -27,36 +27,53 @@ def get_vp_dataframe( process_logger.log_start() vehicle_position_cols = [ - "current_status", - "current_stop_sequence", - "stop_id", - "vehicle_timestamp", - "direction_id", - "route_id", - "start_date", - "start_time", - "vehicle_id", - "trip_id", - "vehicle_label", - "vehicle_consist", + "vehicle.current_status", + "vehicle.current_stop_sequence", + "vehicle.stop_id", + "vehicle.timestamp", + "vehicle.trip.direction_id", + "vehicle.trip.route_id", + "vehicle.trip.start_date", + "vehicle.trip.start_time", + "vehicle.vehicle.id", + "vehicle.trip.trip_id", + "vehicle.vehicle.label", + "vehicle.vehicle.consist", ] vehicle_position_filters = [ - ("current_status", "!=", "None"), - ("current_stop_sequence", ">=", 0), - ("stop_id", "!=", "None"), - ("vehicle_timestamp", ">", 0), - ("direction_id", "in", (0, 1)), - ("route_id", "!=", "None"), - ("vehicle_id", "!=", "None"), - ("route_id", "in", route_ids), - ("trip_id", "!=", "None"), + ("vehicle.current_status", "!=", "None"), + ("vehicle.current_stop_sequence", ">=", 0), + ("vehicle.stop_id", "!=", "None"), + ("vehicle.timestamp", ">", 0), + ("vehicle.trip.direction_id", "in", (0, 1)), + ("vehicle.trip.route_id", "!=", "None"), + ("vehicle.vehicle.id", "!=", "None"), + ("vehicle.trip.route_id", "in", route_ids), + ("vehicle.trip.trip_id", "!=", "None"), ] + rename_mapper = { + "vehicle.current_status": "current_status", + "vehicle.current_stop_sequence": "current_stop_sequence", + "vehicle.stop_id": "stop_id", + "vehicle.timestamp": "vehicle_timestamp", + "vehicle.trip.direction_id": "direction_id", + "vehicle.trip.route_id": "route_id", + "vehicle.trip.start_date": "start_date", + "vehicle.trip.start_time": "start_time", + "vehicle.vehicle.id": "vehicle_id", + "vehicle.trip.trip_id": "trip_id", + "vehicle.vehicle.label": "vehicle_label", + "vehicle.vehicle.consist": "vehicle_consist", + } + result = read_parquet( to_load, columns=vehicle_position_cols, filters=vehicle_position_filters ) + result = result.rename(columns=rename_mapper) + process_logger.add_metadata(row_count=result.shape[0]) process_logger.log_complete() diff --git a/python_src/tests/ingestion/test_gtfs_rt_converter.py b/python_src/tests/ingestion/test_gtfs_rt_converter.py index 9041479e..5a95293b 100644 --- a/python_src/tests/ingestion/test_gtfs_rt_converter.py +++ b/python_src/tests/ingestion/test_gtfs_rt_converter.py @@ -4,11 +4,15 @@ from unittest.mock import patch from pyarrow import fs +import pandas from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter from lamp_py.ingestion.converter import ConfigType -from ..test_resources import incoming_dir +from ..test_resources import ( + incoming_dir, + test_files_dir, +) def test_bad_conversion_local() -> None: @@ -70,7 +74,8 @@ def test_empty_files() -> None: assert filename == empty_file assert table.to_pandas().shape == ( 0, - len(converter.detail.export_schema), + len(converter.detail.import_schema) + + 5, # add 5 for header timestamp columns ) one_blank_file = os.path.join(incoming_dir, "one_blank_record.json.gz") @@ -78,10 +83,30 @@ def test_empty_files() -> None: assert filename == one_blank_file assert table.to_pandas().shape == ( 1, - len(converter.detail.export_schema), + len(converter.detail.import_schema) + + 5, # add 5 for header timestamp columns ) +def drop_list_columns(table: pandas.DataFrame) -> pandas.DataFrame: + """ + drop any columns with list objects to perform dataframe compare + """ + list_columns = ( + "consist", + "translation", + "informed_entity", + "active_period", + "reminder_times", + "stop_time_update", + ) + for column in table.columns: + for list_col in list_columns: + if list_col in column: + table.drop(columns=column, inplace=True) + return table + + def test_vehicle_positions_file_conversion() -> None: """ TODO - convert a dummy json data to parquet and check that the new file @@ -104,55 +129,21 @@ def test_vehicle_positions_file_conversion() -> None: assert filename == gtfs_rt_file - np_df = table.to_pandas() - - # tuple(na count, dtype, max, min) - file_details = { - "year": (0, "int16", "2022", "2022"), - "month": (0, "int8", "1", "1"), - "day": (0, "int8", "1", "1"), - "hour": (0, "int8", "0", "0"), - "feed_timestamp": (0, "int64", "1640995202", "1640995202"), - "entity_id": (0, "object", "y4124", "1625"), - "current_status": (0, "object", "STOPPED_AT", "INCOMING_AT"), - "current_stop_sequence": (29, "float64", "640.0", "1.0"), - "occupancy_percentage": (426, "float64", "nan", "nan"), - "occupancy_status": (190, "object", "nan", "nan"), - "stop_id": (29, "object", "nan", "nan"), - "vehicle_timestamp": (0, "int64", "1640995198", "1640994366"), - "bearing": (0, "int64", "360", "0"), - "latitude": (0, "float64", "42.778499603271484", "41.82632064819336"), - "longitude": (0, "float64", "-70.7895736694336", "-71.5481185913086"), - "speed": (392, "float64", "29.9", "2.6"), - "direction_id": (4, "float64", "1.0", "0.0"), - "route_id": (0, "object", "Shuttle-Generic", "1"), - "schedule_relationship": (0, "object", "UNSCHEDULED", "ADDED"), - "start_date": (40, "object", "nan", "nan"), - "start_time": (87, "object", "nan", "nan"), - "trip_id": (0, "object", "nan", "nan"), - "vehicle_id": (0, "object", "y4124", "1625"), - "vehicle_label": (0, "object", "4124", "0420"), - "vehicle_consist": (324, "object", "nan", "nan"), - } - # 426 records in 'entity' for 2022-01-01T00:00:03Z_https_cdn.mbta.com_realtime_VehiclePositions_enhanced.json.gz - assert np_df.shape == (426, len(converter.detail.export_schema)) + assert table.num_rows == 426 + assert ( + table.num_columns == len(converter.detail.import_schema) + 5 + ) # add 5 for header timestamp columns - all_expected_paths = set(file_details.keys()) + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) - # ensure all of the expected paths were found and there aren't any - # additional ones - assert all_expected_paths == set(np_df.columns) + parquet_file = os.path.join(test_files_dir, "ingestion_GTFS-RT_VP.parquet") + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) - # check file details - for col, (na_count, d_type, upper, lower) in file_details.items(): - print(f"checking: {col}") - assert na_count == np_df[col].isna().sum() - assert d_type == np_df[col].dtype - if upper != "nan": - assert upper == str(np_df[col].max()) - if lower != "nan": - assert lower == str(np_df[col].min()) + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" # check that we are able to handle older vp files from 16 sept 2019 - 4 march 2020 old_gtfs_rt_file = os.path.join( @@ -165,8 +156,17 @@ def test_vehicle_positions_file_conversion() -> None: assert timestamp.year == 2019 assert timestamp.day == 12 - assert table.shape[0] > 0 - assert table.shape[1] > 0 + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) + + parquet_file = os.path.join( + test_files_dir, "ingestion_GTFS-RT_VP_OLD.parquet" + ) + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) + + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" def test_rt_alert_file_conversion() -> None: @@ -192,58 +192,23 @@ def test_rt_alert_file_conversion() -> None: assert filename == gtfs_rt_file - np_df = table.to_pandas() - - # tuple(na count, dtype, max, min) - file_details = { - "year": (0, "int16", "2022", "2022"), - "month": (0, "int8", "5", "5"), - "day": (0, "int8", "4", "4"), - "hour": (0, "int8", "15", "15"), - "feed_timestamp": (0, "int64", "1651679986", "1651679986"), - "entity_id": (0, "object", "442146", "293631"), - "effect": (0, "object", "UNKNOWN_EFFECT", "DETOUR"), - "effect_detail": (0, "object", "TRACK_CHANGE", "BIKE_ISSUE"), - "cause": (0, "object", "UNKNOWN_CAUSE", "CONSTRUCTION"), - "cause_detail": (0, "object", "UNKNOWN_CAUSE", "CONSTRUCTION"), - "severity": (0, "int64", "10", "0"), - "severity_level": (0, "object", "WARNING", "INFO"), - "created_timestamp": (0, "int64", "1651679836", "1549051333"), - "last_modified_timestamp": (0, "int64", "1651679848", "1549051333"), - "alert_lifecycle": (0, "object", "UPCOMING_ONGOING", "NEW"), - "duration_certainty": (0, "object", "UNKNOWN", "ESTIMATED"), - "last_push_notification": (144, "float64", "nan", "nan"), - "active_period": (2, "object", "nan", "nan"), - "reminder_times": (132, "object", "nan", "nan"), - "closed_timestamp": (142, "float64", "1651679848.0", "1651679682.0"), - "short_header_text_translation": (0, "object", "nan", "nan"), - "header_text_translation": (0, "object", "nan", "nan"), - "description_text_translation": (0, "object", "nan", "nan"), - "service_effect_text_translation": (0, "object", "nan", "nan"), - "timeframe_text_translation": (44, "object", "nan", "nan"), - "url_translation": (138, "object", "nan", "nan"), - "recurrence_text_translation": (139, "object", "nan", "nan"), - "informed_entity": (0, "object", "nan", "nan"), - } - # 144 records in 'entity' for 2022-05-04T15:59:48Z_https_cdn.mbta.com_realtime_Alerts_enhanced.json.gz - assert np_df.shape == (144, len(converter.detail.export_schema)) + assert table.num_rows == 144 + assert ( + table.num_columns == len(converter.detail.import_schema) + 5 + ) # add 5 for header timestamp columns - all_expected_paths = set(file_details.keys()) + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) - # ensure all of the expected paths were found and there aren't any - # additional ones - assert all_expected_paths == set(np_df.columns) + parquet_file = os.path.join( + test_files_dir, "ingestion_GTFS-RT_ALERT.parquet" + ) + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) - # check file details - for col, (na_count, d_type, upper, lower) in file_details.items(): - print(f"checking: {col}") - assert na_count == np_df[col].isna().sum() - assert d_type == np_df[col].dtype - if upper != "nan": - assert upper == str(np_df[col].max()) - if lower != "nan": - assert lower == str(np_df[col].min()) + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" def test_rt_trip_file_conversion() -> None: @@ -269,48 +234,22 @@ def test_rt_trip_file_conversion() -> None: assert filename == gtfs_rt_file - np_df = table.to_pandas() - - # tuple(na count, dtype, max, min) - file_details = { - "year": (0, "int16", "2022", "2022"), - "month": (0, "int8", "5", "5"), - "day": (0, "int8", "8", "8"), - "hour": (0, "int8", "6", "6"), - "feed_timestamp": (0, "int64", "1651989896", "1651989896"), - "entity_id": (0, "object", "CR-532710-1518", "50922039"), - "timestamp": (51, "float64", "1651989886.0", "1651989816.0"), - "stop_time_update": (0, "object", "nan", "nan"), - "direction_id": (0, "int64", "1", "0"), - "route_id": (0, "object", "SL1", "1"), - "start_date": (50, "object", "nan", "nan"), - "start_time": (51, "object", "nan", "nan"), - "trip_id": (0, "object", "CR-532710-1518", "50922039"), - "route_pattern_id": (79, "object", "nan", "nan"), - "schedule_relationship": (79, "object", "nan", "nan"), - "vehicle_id": (39, "object", "nan", "nan"), - "vehicle_label": (55, "object", "nan", "nan"), - } - # 79 records in 'entity' for # 2022-05-08T06:04:57Z_https_cdn.mbta.com_realtime_TripUpdates_enhanced.json.gz - assert np_df.shape == (79, len(converter.detail.export_schema)) + assert table.num_rows == 79 + assert ( + table.num_columns == len(converter.detail.import_schema) + 5 + ) # add 5 for header timestamp columns - all_expected_paths = set(file_details.keys()) + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) - # ensure all of the expected paths were found and there aren't any - # additional ones - assert all_expected_paths == set(np_df.columns) + parquet_file = os.path.join(test_files_dir, "ingestion_GTFS-RT_TU.parquet") + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) - # check file details - for col, (na_count, d_type, upper, lower) in file_details.items(): - print(f"checking: {col}") - assert na_count == np_df[col].isna().sum() - assert d_type == np_df[col].dtype - if upper != "nan": - assert upper == str(np_df[col].max()) - if lower != "nan": - assert lower == str(np_df[col].min()) + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" # check that we are able to handle older vp files from 16 sept 2019 - 4 march 2020 old_gtfs_rt_file = os.path.join( @@ -322,8 +261,17 @@ def test_rt_trip_file_conversion() -> None: assert timestamp.year == 2019 assert timestamp.day == 12 - assert table.shape[0] > 0 - assert table.shape[1] > 0 + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) + + parquet_file = os.path.join( + test_files_dir, "ingestion_GTFS-RT_TU_OLD.parquet" + ) + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) + + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" def test_bus_vehicle_positions_file_conversion() -> None: @@ -349,66 +297,21 @@ def test_bus_vehicle_positions_file_conversion() -> None: assert filename == gtfs_rt_file - np_df = table.to_pandas() - - # tuple(na count, dtype, max, min) - file_details = { - "year": (0, "int16", "2022", "2022"), - "month": (0, "int8", "5", "5"), - "day": (0, "int8", "5", "5"), - "hour": (0, "int8", "16", "16"), - "feed_timestamp": (0, "int64", "1651766414", "1651766414"), - "entity_id": (0, "object", "1651766413_1740", "1651764730_1426"), - "block_id": (484, "object", "nan", "nan"), - "capacity": (12, "float64", "57.0", "36.0"), - "current_stop_sequence": (844, "float64", "nan", "nan"), - "load": (569, "float64", "42.0", "0.0"), - "location_source": (0, "object", "transitmaster", "samsara"), - "occupancy_percentage": (569, "float64", "80.0", "0.0"), - "occupancy_status": (569, "object", "nan", "nan"), - "revenue": (0, "bool", "True", "False"), - "run_id": (484, "object", "nan", "nan"), - "stop_id": (844, "object", "nan", "nan"), - "vehicle_timestamp": (0, "int64", "1651766413", "1651764730"), - "bearing": (0, "int64", "356", "0"), - "latitude": (0, "float64", "42.65629769", "42.1069972"), - "longitude": (0, "float64", "-70.62653102", "-71.272446339"), - "speed": (163, "float64", "25.1189", "0.0"), - "overload_id": (844, "float64", "nan", "nan"), - "overload_offset": (844, "float64", "nan", "nan"), - "route_id": (529, "object", "nan", "nan"), - "schedule_relationship": (529, "object", "nan", "nan"), - "start_date": (779, "object", "nan", "nan"), - "trip_id": (529, "object", "nan", "nan"), - "vehicle_id": (0, "object", "y3159", "y0408"), - "vehicle_label": (0, "object", "3159", "0408"), - "assignment_status": (358, "object", "nan", "nan"), - "operator_id": (844, "object", "nan", "nan"), - "logon_time": (484, "float64", "1651766401.0", "1651740838.0"), - "name": (844, "object", "nan", "nan"), - "first_name": (844, "object", "nan", "nan"), - "last_name": (844, "object", "nan", "nan"), - "entity_is_deleted": (0, "bool", "False", "False"), - } - # 844 records in 'entity' for 2022-05-05T16_00_15Z_https_mbta_busloc_s3.s3.amazonaws.com_prod_VehiclePositions_enhanced.json.gz - assert np_df.shape == (844, len(converter.detail.export_schema)) + assert table.num_rows == 844 + assert ( + table.num_columns == len(converter.detail.import_schema) + 5 + ) # add 5 for header timestamp columns - all_expected_paths = set(file_details.keys()) + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) - # ensure all of the expected paths were found and there aren't any - # additional ones - assert all_expected_paths == set(np_df.columns) + parquet_file = os.path.join(test_files_dir, "ingestion_BUSLOC_VP.parquet") + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) - # check file details - for col, (na_count, d_type, upper, lower) in file_details.items(): - print(f"checking: {col}") - assert na_count == np_df[col].isna().sum() - assert d_type == np_df[col].dtype - if upper != "nan": - assert upper == str(np_df[col].max()) - if lower != "nan": - assert lower == str(np_df[col].min()) + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" def test_bus_trip_updates_file_conversion() -> None: @@ -434,51 +337,21 @@ def test_bus_trip_updates_file_conversion() -> None: assert filename == gtfs_rt_file - np_df = table.to_pandas() + # 157 records in 'entity' for 2022-06-28T10_03_18Z_https_mbta_busloc_s3.s3.amazonaws.com_prod_TripUpdates_enhanced.json.gz + assert table.num_rows == 157 + assert ( + table.num_columns == len(converter.detail.import_schema) + 5 + ) # add 5 for header timestamp columns - # tuple(na count, dtype, max, min) - file_details = { - "year": (0, "int16", "2022", "2022"), - "month": (0, "int8", "6", "6"), - "day": (0, "int8", "28", "28"), - "hour": (0, "int8", "10", "10"), - "feed_timestamp": (0, "int64", "1656410597", "1656410597"), - "entity_id": ( - 0, - "object", - "T86-136:86:52101511:495418", - "A19-16:19:52110036:495423", - ), - "stop_time_update": (0, "object", "nan", "nan"), - "direction_id": (157, "float64", "nan", "nan"), - "route_id": (0, "object", "93", "111"), - "route_pattern_id": (157, "object", "nan", "nan"), - "schedule_relationship": (0, "object", "SCHEDULED", "SCHEDULED"), - "start_date": (157, "object", "nan", "nan"), - "start_time": (157, "object", "nan", "nan"), - "trip_id": (0, "object", "52271793", "52017959"), - "vehicle_id": (145, "object", "nan", "nan"), - "vehicle_label": (145, "object", "nan", "nan"), - } + np_df = converter.detail.flatten_schema(table).to_pandas() + np_df = drop_list_columns(np_df) - # 157 records in 'entity' for 2022-06-28T10_03_18Z_https_mbta_busloc_s3.s3.amazonaws.com_prod_TripUpdates_enhanced.json.gz - assert np_df.shape == (157, len(converter.detail.export_schema)) - - all_expected_paths = set(file_details.keys()) - - # ensure all of the expected paths were found and there aren't any - # additional ones - assert all_expected_paths == set(np_df.columns) - - # check file details - for col, (na_count, d_type, upper, lower) in file_details.items(): - print(f"checking: {col}") - assert na_count == np_df[col].isna().sum() - assert d_type == np_df[col].dtype - if upper != "nan": - assert upper == str(np_df[col].max()) - if lower != "nan": - assert lower == str(np_df[col].min()) + parquet_file = os.path.join(test_files_dir, "ingestion_BUSLOC_TU.parquet") + parquet_df = pandas.read_parquet(parquet_file) + parquet_df = drop_list_columns(parquet_df) + + compare_result = np_df.compare(parquet_df, align_axis=1) + assert compare_result.shape[0] == 0, f"{compare_result}" def test_start_of_hour() -> None: diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/506974e326b64407aa2e72cfe3ed3c9a-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/506974e326b64407aa2e72cfe3ed3c9a-0.parquet deleted file mode 100644 index e9767873..00000000 Binary files a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/506974e326b64407aa2e72cfe3ed3c9a-0.parquet and /dev/null differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/8e2c182968e24ecea3d37f03d6bae84d-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/8e2c182968e24ecea3d37f03d6bae84d-0.parquet new file mode 100644 index 00000000..3c7e19f7 Binary files /dev/null and b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=12/8e2c182968e24ecea3d37f03d6bae84d-0.parquet differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/b323d1f74c024e1c9367a71dec50dcec-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/b323d1f74c024e1c9367a71dec50dcec-0.parquet deleted file mode 100644 index 2f311e90..00000000 Binary files a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/b323d1f74c024e1c9367a71dec50dcec-0.parquet and /dev/null differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/eaeee968b94b4a74b166df4b8ffd9f29-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/eaeee968b94b4a74b166df4b8ffd9f29-0.parquet new file mode 100644 index 00000000..d327e44d Binary files /dev/null and b/python_src/tests/test_files/SPRINGBOARD/RT_TRIP_UPDATES/year=2023/month=5/day=8/hour=13/eaeee968b94b4a74b166df4b8ffd9f29-0.parquet differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/04fae375300c45e2a0f70b8fbefa8f57-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/04fae375300c45e2a0f70b8fbefa8f57-0.parquet deleted file mode 100644 index 255c154f..00000000 Binary files a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/04fae375300c45e2a0f70b8fbefa8f57-0.parquet and /dev/null differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/1613b49e4fa1459eabe9c83553ef1045-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/1613b49e4fa1459eabe9c83553ef1045-0.parquet new file mode 100644 index 00000000..a9376d26 Binary files /dev/null and b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=12/1613b49e4fa1459eabe9c83553ef1045-0.parquet differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/5f9b9d4a1fcc417eae0543ce3fe35c8f-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/5f9b9d4a1fcc417eae0543ce3fe35c8f-0.parquet deleted file mode 100644 index 9ad8ba97..00000000 Binary files a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/5f9b9d4a1fcc417eae0543ce3fe35c8f-0.parquet and /dev/null differ diff --git a/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/9a1bb1c5269042a284b2ed57b4dfebb9-0.parquet b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/9a1bb1c5269042a284b2ed57b4dfebb9-0.parquet new file mode 100644 index 00000000..e04bf3e8 Binary files /dev/null and b/python_src/tests/test_files/SPRINGBOARD/RT_VEHICLE_POSITIONS/year=2023/month=5/day=8/hour=13/9a1bb1c5269042a284b2ed57b4dfebb9-0.parquet differ diff --git a/python_src/tests/test_files/ingestion_BUSLOC_TU.parquet b/python_src/tests/test_files/ingestion_BUSLOC_TU.parquet new file mode 100644 index 00000000..f7cac691 Binary files /dev/null and b/python_src/tests/test_files/ingestion_BUSLOC_TU.parquet differ diff --git a/python_src/tests/test_files/ingestion_BUSLOC_VP.parquet b/python_src/tests/test_files/ingestion_BUSLOC_VP.parquet new file mode 100644 index 00000000..d45f09f4 Binary files /dev/null and b/python_src/tests/test_files/ingestion_BUSLOC_VP.parquet differ diff --git a/python_src/tests/test_files/ingestion_GTFS-RT_ALERT.parquet b/python_src/tests/test_files/ingestion_GTFS-RT_ALERT.parquet new file mode 100644 index 00000000..6ba64ba4 Binary files /dev/null and b/python_src/tests/test_files/ingestion_GTFS-RT_ALERT.parquet differ diff --git a/python_src/tests/test_files/ingestion_GTFS-RT_TU.parquet b/python_src/tests/test_files/ingestion_GTFS-RT_TU.parquet new file mode 100644 index 00000000..8408d3f9 Binary files /dev/null and b/python_src/tests/test_files/ingestion_GTFS-RT_TU.parquet differ diff --git a/python_src/tests/test_files/ingestion_GTFS-RT_TU_OLD.parquet b/python_src/tests/test_files/ingestion_GTFS-RT_TU_OLD.parquet new file mode 100644 index 00000000..0b6595e3 Binary files /dev/null and b/python_src/tests/test_files/ingestion_GTFS-RT_TU_OLD.parquet differ diff --git a/python_src/tests/test_files/ingestion_GTFS-RT_VP.parquet b/python_src/tests/test_files/ingestion_GTFS-RT_VP.parquet new file mode 100644 index 00000000..6a10451c Binary files /dev/null and b/python_src/tests/test_files/ingestion_GTFS-RT_VP.parquet differ diff --git a/python_src/tests/test_files/ingestion_GTFS-RT_VP_OLD.parquet b/python_src/tests/test_files/ingestion_GTFS-RT_VP_OLD.parquet new file mode 100644 index 00000000..15e8c185 Binary files /dev/null and b/python_src/tests/test_files/ingestion_GTFS-RT_VP_OLD.parquet differ diff --git a/python_src/tests/test_files/tu_missing_start_date.parquet b/python_src/tests/test_files/tu_missing_start_date.parquet index f5a8a252..e153054b 100644 Binary files a/python_src/tests/test_files/tu_missing_start_date.parquet and b/python_src/tests/test_files/tu_missing_start_date.parquet differ diff --git a/python_src/tests/test_files/vehicle_positions_flat_input.csv b/python_src/tests/test_files/vehicle_positions_flat_input.csv index 628ae608..ea5153d2 100644 --- a/python_src/tests/test_files/vehicle_positions_flat_input.csv +++ b/python_src/tests/test_files/vehicle_positions_flat_input.csv @@ -1,4 +1,4 @@ -current_status,current_stop_sequence,stop_id,vehicle_timestamp,direction_id,route_id,start_date,start_time,vehicle_id,trip_id,vehicle_label,vehicle_consist +vehicle.current_status,vehicle.current_stop_sequence,vehicle.stop_id,vehicle.timestamp,vehicle.trip.direction_id,vehicle.trip.route_id,vehicle.trip.start_date,vehicle.trip.start_time,vehicle.vehicle.id,vehicle.trip.trip_id,vehicle.vehicle.label,vehicle.vehicle.consist STOPPED_AT,1,70059,1683547153,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, INCOMING_AT,10,70057,1683547208,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, STOPPED_AT,10,70057,1683547246,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, diff --git a/python_src/tests/test_files/vp_missing_start_date.csv b/python_src/tests/test_files/vp_missing_start_date.csv index 8f8208bb..334be4da 100644 --- a/python_src/tests/test_files/vp_missing_start_date.csv +++ b/python_src/tests/test_files/vp_missing_start_date.csv @@ -1,4 +1,4 @@ -current_status,current_stop_sequence,stop_id,vehicle_timestamp,direction_id,route_id,start_date,start_time,vehicle_id,trip_id,vehicle_label,vehicle_consist +vehicle.current_status,vehicle.current_stop_sequence,vehicle.stop_id,vehicle.timestamp,vehicle.trip.direction_id,vehicle.trip.route_id,vehicle.trip.start_date,vehicle.trip.start_time,vehicle.vehicle.id,vehicle.trip.trip_id,vehicle.vehicle.label,vehicle.vehicle.consist STOPPED_AT,1,70059,1683547153,0,Blue,,07:38:00,B-54768A0A,55458882,0713, INCOMING_AT,10,70057,,0,Blue,20230508,07:38:00,B-54768A0A,55458882,0713, STOPPED_AT,10,70057,1683547246,0,Blue,,07:38:00,B-54768A0A,55458882,0713, diff --git a/python_src/tests/test_files/vp_missing_start_time.csv b/python_src/tests/test_files/vp_missing_start_time.csv index e8f29ea7..ffd9008c 100644 --- a/python_src/tests/test_files/vp_missing_start_time.csv +++ b/python_src/tests/test_files/vp_missing_start_time.csv @@ -1,4 +1,4 @@ -current_status,current_stop_sequence,stop_id,vehicle_timestamp,direction_id,route_id,start_date,start_time,vehicle_id,trip_id,vehicle_label,vehicle_consist +vehicle.current_status,vehicle.current_stop_sequence,vehicle.stop_id,vehicle.timestamp,vehicle.trip.direction_id,vehicle.trip.route_id,vehicle.trip.start_date,vehicle.trip.start_time,vehicle.vehicle.id,vehicle.trip.trip_id,vehicle.vehicle.label,vehicle.vehicle.consist STOPPED_AT,1,70059,1683547153,0,Blue,20230508,,B-54768A0A,55458882,0713, INCOMING_AT,10,70057,1683547208,0,Blue,20230508,,B-54768A0A,55458882,0713, STOPPED_AT,10,70057,1683547246,0,Blue,20230508,,B-54768A0A,55458882,0713, diff --git a/python_src/tests/test_resources.py b/python_src/tests/test_resources.py index f902e770..1b4003b9 100644 --- a/python_src/tests/test_resources.py +++ b/python_src/tests/test_resources.py @@ -14,17 +14,17 @@ def csv_to_vp_parquet(csv_filepath: str, parquet_filepath: str) -> None: """ vp_csv_options = csv.ConvertOptions( column_types={ - "current_status": pyarrow.string(), - "current_stop_sequence": pyarrow.int64(), - "stop_id": pyarrow.string(), - "vehicle_timestamp": pyarrow.int64(), - "direction_id": pyarrow.int64(), - "route_id": pyarrow.string(), - "trip_id": pyarrow.string(), - "start_date": pyarrow.string(), - "start_time": pyarrow.string(), - "vehicle_id": pyarrow.string(), - "vehicle_consist": pyarrow.string(), + "vehicle.current_status": pyarrow.string(), + "vehicle.current_stop_sequence": pyarrow.uint32(), + "vehicle.stop_id": pyarrow.string(), + "vehicle.timestamp": pyarrow.uint64(), + "vehicle.trip.direction_id": pyarrow.uint8(), + "vehicle.trip.route_id": pyarrow.string(), + "vehicle.trip.trip_id": pyarrow.string(), + "vehicle.trip.start_date": pyarrow.string(), + "vehicle.trip.start_time": pyarrow.string(), + "vehicle.vehicle.id": pyarrow.string(), + "vehicle.vehicle.consist": pyarrow.string(), }, # in our ingestion, if a key is missing, the value written to the # parquet file is null. mimic this behavior by making empty strings