From 1e8037e5b41a79b536148822093295faa18c5472 Mon Sep 17 00:00:00 2001 From: John Lazarus Date: Sun, 21 Jun 2026 17:18:03 +0000 Subject: [PATCH] Add read-only India iSMART API support --- poetry.lock | 128 ++++++- pyproject.toml | 3 +- src/saic_ismart_client_ng/india/__init__.py | 365 +++++++++++++++++++ src/saic_ismart_client_ng/india/bitcodec.py | 101 +++++ src/saic_ismart_client_ng/india/tap_codec.py | 95 +++++ tests/test_india_api.py | 190 ++++++++++ 6 files changed, 878 insertions(+), 4 deletions(-) create mode 100644 src/saic_ismart_client_ng/india/__init__.py create mode 100644 src/saic_ismart_client_ng/india/bitcodec.py create mode 100644 src/saic_ismart_client_ng/india/tap_codec.py create mode 100644 tests/test_india_api.py diff --git a/poetry.lock b/poetry.lock index f6d90c2..810fa88 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.4.1 and should not be changed by hand. [[package]] name = "anyio" @@ -22,6 +22,25 @@ doc = ["Sphinx (>=8.2,<9.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", test = ["anyio[trio]", "blockbuster (>=1.5.23)", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\" and python_version < \"3.14\""] trio = ["trio (>=0.26.1)"] +[[package]] +name = "asn1tools" +version = "0.167.0" +description = "ASN.1 parsing, encoding and decoding." +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "asn1tools-0.167.0.tar.gz", hash = "sha256:cad53f6f6d788a6eec5e37543401cd8c39f138cc8016b64629ec29fb4735f5b2"}, +] + +[package.dependencies] +bitstruct = "*" +pyparsing = ">=3.1.2" + +[package.extras] +cache = ["diskcache"] +shell = ["prompt_toolkit"] + [[package]] name = "astroid" version = "3.3.10" @@ -34,6 +53,94 @@ files = [ {file = "astroid-3.3.10.tar.gz", hash = "sha256:c332157953060c6deb9caa57303ae0d20b0fbdb2e59b4a4f2a6ba49d0a7961ce"}, ] +[[package]] +name = "bitstruct" +version = "8.22.1" +description = "This module performs conversions between Python values and C bit field structs represented as Python byte strings." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "bitstruct-8.22.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:9c250d8dbd6350dfd73cf4bccab700122072388bd388d284ada37149eeb49979"}, + {file = "bitstruct-8.22.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:7fb40f50be4e60cb7a345ca4ceb16afcbcfecfa7cedf1cded44b8e1d8b8c3109"}, + {file = "bitstruct-8.22.1-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8ebcc2ad3b9addd4a6a409481754c78ade65a8d24a4d34352e37df9b48aeabea"}, + {file = "bitstruct-8.22.1-cp310-cp310-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:ec4d4e8308b3098e4de43f7ccd05d626bf2e4b650bea2b5fec9581f7bd7b4d01"}, + {file = "bitstruct-8.22.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8e44c42bbfa9540a3df261c9a9613a88e5e5df0ad3931bcc76b9f1cc215f4ece"}, + {file = "bitstruct-8.22.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:8140c51b2e663d0efba9a6f5a77be4f26f3e147e65579be83f7c9b370df56cbe"}, + {file = "bitstruct-8.22.1-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:f7e6bff01ea3765d28d8e426b49e6a0e4f581a4d47329f54d3a83f3f0c23380e"}, + {file = "bitstruct-8.22.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:9430cc3849c24549578210c3e185427d01bc48734acaa9d7d17a767b19719381"}, + {file = "bitstruct-8.22.1-cp310-cp310-win32.whl", hash = "sha256:9a00e7077c81318a2d7aea6a5e2988301ef316621136ddede68b4a361afc7581"}, + {file = "bitstruct-8.22.1-cp310-cp310-win_amd64.whl", hash = "sha256:43e3d8113263894e2ca107d2f7376f2767cb880a6f9952b0f66087545a230994"}, + {file = "bitstruct-8.22.1-cp310-cp310-win_arm64.whl", hash = "sha256:86f68adf8945b80007cfed2318277fc2d5de5fb253d7df5d7a703dc10b54e149"}, + {file = "bitstruct-8.22.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a4d235cbdba2df0747d4e4a6a6d3d46505cc2b52b16e9d3d4431423401c565f6"}, + {file = "bitstruct-8.22.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7b3c29ee922e68538c575667bdc924e3ef09d21da94fd48a4a430f9c32c70997"}, + {file = "bitstruct-8.22.1-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:7a493a66fac64f989f3c8043e40f9ceb6b8169592c7e5a9d37a1409f00cb757a"}, + {file = "bitstruct-8.22.1-cp311-cp311-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:32f8ffe2f26f25dc4339e9c69acf2ba7ada877a377111bd2b1f6a8804953b51a"}, + {file = "bitstruct-8.22.1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:02bddf1893550a6e7e6fefb7065a222e1b10447f903cac4fbff38766df35712a"}, + {file = "bitstruct-8.22.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:3e829b10672f5ea5ed60f9d835cc2f3c2e1190448429051719600a2f6d891a0a"}, + {file = "bitstruct-8.22.1-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:956a3998e5e031bc2d1c2312afb9f714ec6131621c40123d3bb79ff1b69b82a8"}, + {file = "bitstruct-8.22.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:fa74a1b7426f94d1a4917944164230ef34aa85e5a01164d6dbc150a71f90c2ca"}, + {file = "bitstruct-8.22.1-cp311-cp311-win32.whl", hash = "sha256:8fd1b73c9c2c613c385f9f4fd9feaabedf88c1c740bff25a4738b1c6a70d4111"}, + {file = "bitstruct-8.22.1-cp311-cp311-win_amd64.whl", hash = "sha256:419e3ce7d1d89a809a7d966d72de16cb869bd3fe80b587e5bc7cf80debd7df8d"}, + {file = "bitstruct-8.22.1-cp311-cp311-win_arm64.whl", hash = "sha256:6e548bfc4e289cacae704458ac51c6c76188c305701bd14f3f4fec600fce58e8"}, + {file = "bitstruct-8.22.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:18ff3842c89596f88307f458696ebdcbd4af718af9f0f2cad863102387200b26"}, + {file = "bitstruct-8.22.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:837629e72aaf247ee214d19ae0580571c2ba0ddeb3144dff12f7014cb8f3d94d"}, + {file = "bitstruct-8.22.1-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:2bef6f3309b39880e6f5ea4f274899062a3511eb630563858a8be6332c332eac"}, + {file = "bitstruct-8.22.1-cp312-cp312-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:5958ea7f61c45e6d769bcbd0f4357cb91f1ba5cbea9323855a1c45d8bcd03b34"}, + {file = "bitstruct-8.22.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f965015d75e7cc949dff3783f82378a135fe67ac549a41d7b90eb0abf06fa81d"}, + {file = "bitstruct-8.22.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:88573f965ce5042113e236c74fdec828abfdabec3d1cceda1e9766c86de74978"}, + {file = "bitstruct-8.22.1-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:ec6eaf863c85d99fe3cdc6426c77f03320ff42d0905810cef73499f92c388a30"}, + {file = "bitstruct-8.22.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:6aeee6bfcdcf0d158cae706b1fe20f42c97b06bd778eca794aa097973e782c31"}, + {file = "bitstruct-8.22.1-cp312-cp312-win32.whl", hash = "sha256:275aeaf08bf3967ed7afa568c5d9726af17ff4b558f1d82e1c587118cad988c3"}, + {file = "bitstruct-8.22.1-cp312-cp312-win_amd64.whl", hash = "sha256:4d5bb7484ff55a18d4cc1370b9e4cd94893e2ec33d9eb985c313c1cbdca756e6"}, + {file = "bitstruct-8.22.1-cp312-cp312-win_arm64.whl", hash = "sha256:a38e90197bd1bc87863e5180c5556af71528ad03940d41e7a74d602c2255c4ba"}, + {file = "bitstruct-8.22.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ef0d8476eb20f1b0a46ffe0b85eaef8eac98005d6f758ed083e05ba26c404a19"}, + {file = "bitstruct-8.22.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:84ec7804eb606545fd6fcf2bac3a795737239fdc989faad88850a483a2b2af7a"}, + {file = "bitstruct-8.22.1-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6fe6c4f4cbc2f5bc173d34538b5fe529b0fdd1e30e5d0a56cd7b108cc32d803d"}, + {file = "bitstruct-8.22.1-cp313-cp313-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:356d89faf20264c72d0328b30a38a618c6b65db429b91cbd34f67dbc97dd42a6"}, + {file = "bitstruct-8.22.1-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:98251f43c2991aa3bf660806b501cf94367dbdcc9c7461cc0cdc9724ddc7bb27"}, + {file = "bitstruct-8.22.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:e753de82211d29aaa70f35af6a2d7f027f7487342e24dc98691575dbb2d2761f"}, + {file = "bitstruct-8.22.1-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:7065a241e9d282d6a62556fead7da369c1827a26ceb3eedd377113a021121a5d"}, + {file = "bitstruct-8.22.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:453043525cf86751868c76f9a32a92786ffbafbd0a48ae7fd2bedc012fa45ee9"}, + {file = "bitstruct-8.22.1-cp313-cp313-win32.whl", hash = "sha256:264cd11f52e1b5f513cee6c01c75bcefdf8b7b4c2b1c78f8efa35ef268c033a6"}, + {file = "bitstruct-8.22.1-cp313-cp313-win_amd64.whl", hash = "sha256:c4eecefb86b0762a2741f5f6810aac258a5dea58c288a331e972ce615ed3e2b7"}, + {file = "bitstruct-8.22.1-cp313-cp313-win_arm64.whl", hash = "sha256:3db208565632f5579db254d78df77fc930cb0b23ca6f09151c948785146adda5"}, + {file = "bitstruct-8.22.1-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:098f6b66f89e3bbe320dd31b55094b02c51f365a8ee8f75c0c99e98833eae6a8"}, + {file = "bitstruct-8.22.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:2c5b1ef022b0d690b6707464ee1bfd394c79f9a9ef4e765faf1168056fa21337"}, + {file = "bitstruct-8.22.1-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b94cb01f327e8154c4300fc690a69fd1d856064cbcc5b0bf2e8c2f090e40a5c7"}, + {file = "bitstruct-8.22.1-cp314-cp314-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:8b8f262edb37b1bbf37581f29f64e8e879b87101e07257e56ba47b5d797a3364"}, + {file = "bitstruct-8.22.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f89b5a8057dff42383829f6fce9aadeba619c151685a90e0faf4ab53e940224b"}, + {file = "bitstruct-8.22.1-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:10321fc31368eeaecb78a34b5dd5071749243fc3ce426ff3bb2e4038bfd11454"}, + {file = "bitstruct-8.22.1-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:3f13a7df190dc54dddff97de7d2a8d1069e4022469e1d7aa66ae10f4eee04d3b"}, + {file = "bitstruct-8.22.1-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:bc084f5a7a8b258668ebde5e914a4c0290d496612161472eef079d469e49227e"}, + {file = "bitstruct-8.22.1-cp314-cp314-win32.whl", hash = "sha256:ae701de781999fa4eac2242bc8b4525c267ed17d8025291c57c6f1bf1a85b6c0"}, + {file = "bitstruct-8.22.1-cp314-cp314-win_amd64.whl", hash = "sha256:f5acccd329319e9c9f1e03a4b3fd6b91a6e58ab1810932b1c6dafa64891d569b"}, + {file = "bitstruct-8.22.1-cp314-cp314-win_arm64.whl", hash = "sha256:7bca7c212a703f396f2558c76a5f07adb7b48bed3a128510eec381bc3ab9c7f2"}, + {file = "bitstruct-8.22.1-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:c2e78952b2cc04d52f2acee6757198fee374f39e4f64d432382be54834d5044a"}, + {file = "bitstruct-8.22.1-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:9b092f12bad0e900cc2fea3c0884b8c280370c7c4e9adde3fef46ea4686fe8f4"}, + {file = "bitstruct-8.22.1-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:15e60bae7714d6312105fbc3e18185cf6638a4366f011f5d0306ac3a768e8406"}, + {file = "bitstruct-8.22.1-cp314-cp314t-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:5891d506486915120a073802bc8b391a1840327fc781bece88173fcef69300dd"}, + {file = "bitstruct-8.22.1-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:68a7ed240b2448be339121a7b155c6a237460da236b6ba0c898861e156f7cd4b"}, + {file = "bitstruct-8.22.1-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:21b94ba5210acada44d092170daf885777df2043c5c7b82a2b39e4a0b9a98349"}, + {file = "bitstruct-8.22.1-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:e7d38a321a00120109ce165a651afb3662e51d5a13134716548b6c0fc7b66987"}, + {file = "bitstruct-8.22.1-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:505d18acd7cb2408bcde4009babb07d671a9b07ae4f3221b32bb332565452c6f"}, + {file = "bitstruct-8.22.1-cp314-cp314t-win32.whl", hash = "sha256:4ca7d883594451d1a5066e4a17ed3858ab7f8e26e419085b9c283e2d4c45eb25"}, + {file = "bitstruct-8.22.1-cp314-cp314t-win_amd64.whl", hash = "sha256:a8227add6ff956e559d00442dc1ce23e31450d42eb5c15fa87e45b3a7f0fb79d"}, + {file = "bitstruct-8.22.1-cp314-cp314t-win_arm64.whl", hash = "sha256:ca5dbad59125547c14f29220ebe8d1cb46fcb3b50fbf16fb31f6ac190790b738"}, + {file = "bitstruct-8.22.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ec0cc01d3130f3776f87752df69ed2da1b261c2c63ce3a2f3463958d445dd7e2"}, + {file = "bitstruct-8.22.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:62548cd0f5a9782e985aff83294ed0c2df656060680c2c9b6271061e8f2e6a62"}, + {file = "bitstruct-8.22.1-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c8e0d396c8980e903ea7e0c6fd13b0a3185dd8376f3002f018cbdc9957d9b808"}, + {file = "bitstruct-8.22.1-cp39-cp39-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:f69de99df37b8594f87c019de5236e14961c4f997928384e4bcd5f6a1d5dc116"}, + {file = "bitstruct-8.22.1-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:599b04940cf18b079f9849ec992e6a1d22e9bafff1473830df2e618f492d68b3"}, + {file = "bitstruct-8.22.1-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:d45e46afea8f629f9ce55ae9183042f94e4e0fc98112828fc80bb3ea75b39b6d"}, + {file = "bitstruct-8.22.1-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:896789040a6f5c724fd6df5b7e6b9dfa6ef83327e87ff4bd410331b09a9e6221"}, + {file = "bitstruct-8.22.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:11049c556c3e6452798145214bccd9c7eb4f6843520e190528075d4293728553"}, + {file = "bitstruct-8.22.1-cp39-cp39-win32.whl", hash = "sha256:8482be49681568b0f5fa64b6953680cf6f3634c3f3316d7b4f513a52eca19338"}, + {file = "bitstruct-8.22.1-cp39-cp39-win_amd64.whl", hash = "sha256:ec187f5000104c9d7428556579618dd6182ec2ed8891bd547c0ac83d9f7e5154"}, + {file = "bitstruct-8.22.1-cp39-cp39-win_arm64.whl", hash = "sha256:df5284f0e52865d8a25db399331d047e570af5733881f99242b57ab793086b83"}, + {file = "bitstruct-8.22.1.tar.gz", hash = "sha256:97588318c906c60d33129e0061dd830b03d793c517033d312487c75426d1e808"}, +] + [[package]] name = "certifi" version = "2025.6.15" @@ -593,8 +700,8 @@ files = [ astroid = ">=3.3.8,<=3.4.0.dev0" colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""} dill = [ - {version = ">=0.3.7", markers = "python_version >= \"3.12\""}, {version = ">=0.3.6", markers = "python_version == \"3.11\""}, + {version = ">=0.3.7", markers = "python_version >= \"3.12\""}, ] isort = ">=4.2.5,<5.13 || >5.13,<7" mccabe = ">=0.6,<0.8" @@ -605,6 +712,21 @@ tomlkit = ">=0.10.1" spelling = ["pyenchant (>=3.2,<4.0)"] testutils = ["gitpython (>3)"] +[[package]] +name = "pyparsing" +version = "3.3.2" +description = "pyparsing - Classes and methods to define and execute parsing grammars" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pyparsing-3.3.2-py3-none-any.whl", hash = "sha256:850ba148bd908d7e2411587e247a1e4f0327839c40e2e5e6d05a007ecc69911d"}, + {file = "pyparsing-3.3.2.tar.gz", hash = "sha256:c777f4d763f140633dcb6d8a3eda953bf7a214dc4eff598413c070bcdc117cbc"}, +] + +[package.extras] +diagrams = ["jinja2", "railroad-diagrams"] + [[package]] name = "pytest" version = "8.4.0" @@ -851,4 +973,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.1" python-versions = ">=3.11" -content-hash = "31b3877433de9d09f81efee88d605b8bcd2f9955b58446526aa34699a081467c" +content-hash = "373bf4ba90d4d51541ccdcc5d88d5fb0729b94fa20baabcf4e20bb571772a84d" diff --git a/pyproject.toml b/pyproject.toml index 6c506a3..51b5ebe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,8 @@ dependencies = [ "pycryptodome (>=3.20.0,<4.0.0)", "httpx (>=0.27.0,<0.29.0)", "tenacity (>=9.0.0,<10.0.0)", - "dacite (>=1.8.1,<2.0.0)" + "dacite (>=1.8.1,<2.0.0)", + "asn1tools (>=0.167.0,<1.0.0)" ] [project.urls] diff --git a/src/saic_ismart_client_ng/india/__init__.py b/src/saic_ismart_client_ng/india/__init__.py new file mode 100644 index 0000000..9a8de22 --- /dev/null +++ b/src/saic_ismart_client_ng/india/__init__.py @@ -0,0 +1,365 @@ +from __future__ import annotations + +from binascii import unhexlify +from dataclasses import dataclass, field +import hashlib +import hmac +import json +import re +import time +from typing import Any +from urllib.parse import urlencode + +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad +import httpx + +from saic_ismart_client_ng.india.bitcodec import ( + BitReader, + BitWriter, + read_fixed_7bit_string, + set_bits, + set_fixed_7bit_string, +) +from saic_ismart_client_ng.india.tap_codec import ( + IndiaTapError, + decode_status_response, + encode_status_request, +) + +INDIA_TAP_LOGIN_URL = "https://iov-tap.mgindia.co.in/TAP.Web/ota.mp" +INDIA_TAP_STATUS_URL = "https://iov-tap.mgindia.co.in/TAP.Web/ota.mpv21" +INDIA_GATEWAY_BASE_URL = "https://iov-gateway.mgindia.co.in/api.app/v1" +INDIA_USER_AGENT = "CER_IKE_01/2.3.0 (iPad; iOS 26.3; Scale/2.00)" +LOGIN_DISPATCHER_TEMPLATE_HEX = ( + "11005600882c60c183060c183060c183060c183060c183060c183060c183060c183060c183" + "060c183060c183060c183060c1ab06200000000020200468acf134468acf1342468acf134" + "2468acf1342000000000100a0" +) + + +class SaicIndiaApiException(Exception): + def __init__(self, message: str, return_code: int | None = None) -> None: + super().__init__(message) + self.return_code = return_code + + +@dataclass(frozen=True) +class IndiaVinInfo: + vin: str + brand_name: str | None = None + model_name: str | None = None + model_year: str | int | None = None + series: str | None = None + is_active: bool | None = None + raw: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class IndiaVehicleStatus: + status_time: int | None + last_vehicle_activity: int | None + battery_percent: int | None + range_km: float | None + odometer_km: float | None + auxiliary_battery_voltage: float | None + interior_temperature: int | None + exterior_temperature: int | None + locked: bool | None + driver_door_open: bool | None + passenger_door_open: bool | None + rear_left_door_open: bool | None + rear_right_door_open: bool | None + boot_open: bool | None + bonnet_open: bool | None + driver_window_open: bool | None + passenger_window_open: bool | None + rear_left_window_open: bool | None + rear_right_window_open: bool | None + climate_running: bool | None + sunroof_open: bool | None + can_bus_active: bool | None + + +class SaicIndiaApi: + def __init__( + self, + phone: str, + password: str, + *, + http_client: httpx.AsyncClient | None = None, + ) -> None: + self._phone = normalize_india_phone(phone) + self._password = password + self._http = http_client or httpx.AsyncClient(timeout=30) + self._owns_http = http_client is None + self._token: str | None = None + self._user_id: str | None = None + self._device_id = make_india_device_id(self._phone) + + @property + def token(self) -> str | None: + return self._token + + @property + def user_id(self) -> str | None: + return self._user_id + + async def close(self) -> None: + if self._owns_http: + await self._http.aclose() + + async def login(self) -> None: + body = self._build_login_body() + response = await self._http.post( + INDIA_TAP_LOGIN_URL, + content=body, + headers={ + "User-Agent": INDIA_USER_AGENT, + "Content-Type": "text/plain", + "Accept": "*/*", + "Accept-Language": "en-US;q=1", + "APP-SIGNATURE": india_tap_signature(body), + "SIGNATURE": "1", + }, + ) + response.raise_for_status() + dispatcher, app = decode_login_response(response.text) + if not app: + raise SaicIndiaApiException("India login did not return a token payload") + self._token = decode_login_token(app) + user_id = read_fixed_7bit_string(dispatcher, bit_offset=300, char_count=14) + self._user_id = user_id.rjust(50, "0") + + async def vehicle_list(self) -> list[IndiaVinInfo]: + await self._ensure_login() + payload = await self.gateway_get("/vehicle/userVinList") + vin_list = payload.get("data", {}).get("vinList", []) + return [parse_india_vehicle(item) for item in vin_list if isinstance(item, dict)] + + async def get_vehicle_status(self, vin: str) -> IndiaVehicleStatus: + await self._ensure_login() + event_id = 0 + for _ in range(6): + dispatcher, status = await self._status_request(vin, event_id) + result = dispatcher.get("result", 0) + if status is not None: + return parse_india_vehicle_status(status) + if result not in (0, 4, 6): + message = dispatcher.get("errorMessage") + if isinstance(message, bytes): + message = message.decode(errors="replace") + raise SaicIndiaApiException(message or f"India TAP status error {result}", result) + event_id = dispatcher.get("eventID", event_id) + raise SaicIndiaApiException("India vehicle status was not ready after polling") + + async def gateway_get(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + await self._ensure_login() + response = await self._gateway_get_raw(path, params or {}) + parsed = json.loads(decrypt_india_gateway_body(response.text, response.headers)) + if parsed.get("code") == 7: + await self.login() + response = await self._gateway_get_raw(path, params or {}) + parsed = json.loads(decrypt_india_gateway_body(response.text, response.headers)) + code = parsed.get("code") + if code != 0: + raise SaicIndiaApiException(parsed.get("message", f"India gateway error {code}"), code) + return parsed + + async def _status_request(self, vin: str, event_id: int) -> tuple[dict[str, Any], dict[str, Any] | None]: + if self._token is None or self._user_id is None: + raise SaicIndiaApiException("Not logged in") + try: + body = encode_status_request(self._user_id, self._token, vin, event_id) + except IndiaTapError as err: + raise SaicIndiaApiException(str(err)) from err + response = await self._http.post( + INDIA_TAP_STATUS_URL, + content=body, + headers=india_tap_headers(body), + ) + response.raise_for_status() + try: + return decode_status_response(response.text) + except IndiaTapError as err: + raise SaicIndiaApiException(str(err)) from err + + async def _gateway_get_raw(self, path: str, params: dict[str, str]) -> httpx.Response: + if self._token is None or self._user_id is None: + raise SaicIndiaApiException("Not logged in") + clean_path = "/" + path.lstrip("/") + query = urlencode(params) + signing_path = clean_path + (f"?{query}" if query else "") + timestamp = str(int(time.time() * 1000)) + content_type = "application/json" + headers = { + "User-Agent": INDIA_USER_AGENT, + "Content-Type": content_type, + "APP-CONTENT-ENCRYPTED": "1", + "APP-LANGUAGE-TYPE": "en-us", + "APP-LOGIN-TOKEN": self._token, + "APP-USER-ID": self._user_id, + "APP-SEND-DATE": timestamp, + "APP-VERIFICATION-STRING": india_gateway_signature(signing_path, timestamp, content_type), + "ORIGINAL-CONTENT-TYPE": content_type, + } + response = await self._http.get( + f"{INDIA_GATEWAY_BASE_URL}{clean_path}", params=params, headers=headers + ) + response.raise_for_status() + return response + + async def _ensure_login(self) -> None: + if self._token is None or self._user_id is None: + await self.login() + + def _build_login_body(self) -> str: + dispatcher = bytearray.fromhex(LOGIN_DISPATCHER_TEMPLATE_HEX) + app = encode_india_login_app(self._password, self._device_id) + set_fixed_7bit_string(dispatcher, 48, self._phone.rjust(50, "0")) + set_bits(dispatcher, 419, 32, int(time.time())) + dispatcher[-7:-3] = (len(app) * 2).to_bytes(4, "big") + dispatcher[-3] = 1 + dispatcher[-2:] = (160).to_bytes(2, "big") + payload = bytes(dispatcher) + app + raw_without_prefix = "1" + payload.hex().upper() + return f"{len(raw_without_prefix) + 4:04X}{raw_without_prefix}" + + +def normalize_india_phone(phone: str) -> str: + digits = re.sub(r"\D+", "", phone) + if len(digits) > 10: + digits = digits[-10:] + if len(digits) != 10: + raise SaicIndiaApiException("Phone must contain a 10-digit India mobile number") + return digits + + +def make_india_device_id(phone: str) -> str: + seed = hashlib.sha256(f"mg-ismart-india:{phone}".encode()).hexdigest() + return (f"saic-ng-india-{seed}" + "0" * 120)[:103] + + +def encode_india_login_app(password: str, device_id: str) -> bytes: + writer = BitWriter() + writer.write_bits(1, 1) + writer.write_7bit_string(password, 6, 30) + writer.write_7bit_string(device_id, 1, 200) + return writer.to_bytes() + + +def decode_login_response(raw: str) -> tuple[bytes, bytes]: + if len(raw) < 5 or raw[4] != "1": + raise SaicIndiaApiException("Unexpected India TAP login response framing") + payload = bytes.fromhex(raw[5:]) + if len(payload) < 4: + raise SaicIndiaApiException("India TAP login response is too short") + dispatcher_len = payload[2] + (payload[3] << 8) + return payload[:dispatcher_len], payload[dispatcher_len:] + + +def decode_login_token(app: bytes) -> str: + reader = BitReader(app) + reader.read_bits(6) + token = reader.read_7bit_string(40, 40) + refresh = reader.read_7bit_string(40, 40) + if token != refresh: + raise SaicIndiaApiException("India token and refresh token differ") + return token + + +def india_tap_signature(body: str) -> str: + key_material = body[1 : len(body) // 2] + hmac_key = hashlib.md5(key_material.encode()).hexdigest() # noqa: S324 + return hmac.new(hmac_key.encode(), body.encode(), hashlib.sha256).hexdigest() + + +def india_tap_headers(body: str) -> dict[str, str]: + return { + "User-Agent": INDIA_USER_AGENT, + "Content-Type": "text/plain", + "Accept": "*/*", + "APP-SIGNATURE": india_tap_signature(body), + "SIGNATURE": "1", + } + + +def india_gateway_signature(signing_path: str, current_ts: str, content_type: str) -> str: + key_part_one = md5_hex_digest(signing_path) + encrypt_key = md5_hex_digest(key_part_one + current_ts + "1" + content_type) + hmac_value = signing_path + current_ts + "1" + content_type + hmac_key = md5_hex_digest(encrypt_key + current_ts) + return hmac.new(hmac_key.encode(), hmac_value.encode(), hashlib.sha256).hexdigest() + + +def decrypt_india_gateway_body(encrypted: str, headers: httpx.Headers) -> str: + key = md5_hex_digest(headers["APP-SEND-DATE"] + "1" + headers["ORIGINAL-CONTENT-TYPE"]) + iv = md5_hex_digest(headers["APP-SEND-DATE"]) + cipher = AES.new(unhexlify(key), AES.MODE_CBC, unhexlify(iv)) + return unpad(cipher.decrypt(unhexlify(encrypted)), AES.block_size).decode("utf-8") + + +def md5_hex_digest(content: str) -> str: + return hashlib.md5(content.encode()).hexdigest() # noqa: S324 + + +def parse_india_vehicle(raw: dict[str, Any]) -> IndiaVinInfo: + return IndiaVinInfo( + vin=raw["vin"], + brand_name=raw.get("brandName"), + model_name=raw.get("modelName"), + model_year=raw.get("modelYear"), + series=raw.get("series"), + is_active=raw.get("isActivate"), + raw=raw, + ) + + +def parse_india_vehicle_status(raw: dict[str, Any]) -> IndiaVehicleStatus: + basic = raw.get("basicVehicleStatus", {}) + return IndiaVehicleStatus( + status_time=positive_int(raw.get("statusTime")), + last_vehicle_activity=positive_int(basic.get("timeOfLastCANBUSActivity")), + battery_percent=bounded_int(basic.get("fuelLevelPrc"), 0, 100), + range_km=tenths(basic.get("fuelRange")), + odometer_km=tenths(basic.get("mileage")), + auxiliary_battery_voltage=tenths(basic.get("batteryVoltage")), + interior_temperature=valid_temperature(basic.get("interiorTemperature")), + exterior_temperature=valid_temperature(basic.get("exteriorTemperature")), + locked=optional_bool(basic.get("lockStatus")), + driver_door_open=optional_bool(basic.get("driverDoor")), + passenger_door_open=optional_bool(basic.get("passengerDoor")), + rear_left_door_open=optional_bool(basic.get("rearLeftDoor")), + rear_right_door_open=optional_bool(basic.get("rearRightDoor")), + boot_open=optional_bool(basic.get("bootStatus")), + bonnet_open=optional_bool(basic.get("bonnetStatus")), + driver_window_open=optional_bool(basic.get("driverWindow")), + passenger_window_open=optional_bool(basic.get("passengerWindow")), + rear_left_window_open=optional_bool(basic.get("rearLeftWindow")), + rear_right_window_open=optional_bool(basic.get("rearRightWindow")), + climate_running=(basic.get("remoteClimateStatus") in (2, 3)) + if basic.get("remoteClimateStatus") is not None + else None, + sunroof_open=optional_bool(basic.get("sunroofStatus")), + can_bus_active=optional_bool(basic.get("canBusActive")), + ) + + +def positive_int(value: Any) -> int | None: + return value if isinstance(value, int) and value > 0 else None + + +def bounded_int(value: Any, minimum: int, maximum: int) -> int | None: + return value if isinstance(value, int) and minimum <= value <= maximum else None + + +def tenths(value: Any) -> float | None: + return round(value / 10, 1) if isinstance(value, int) else None + + +def valid_temperature(value: Any) -> int | None: + return value if isinstance(value, int) and value != -128 else None + + +def optional_bool(value: Any) -> bool | None: + return value if isinstance(value, bool) else None diff --git a/src/saic_ismart_client_ng/india/bitcodec.py b/src/saic_ismart_client_ng/india/bitcodec.py new file mode 100644 index 0000000..c7f8b33 --- /dev/null +++ b/src/saic_ismart_client_ng/india/bitcodec.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import math + + +class BitReader: + def __init__(self, data: bytes | bytearray) -> None: + self._data = data + self._offset = 0 + + @property + def bit_offset(self) -> int: + return self._offset + + def read_bits(self, bit_count: int) -> int: + value = 0 + for _ in range(bit_count): + byte_index = self._offset // 8 + bit_index = 7 - (self._offset % 8) + value = (value << 1) | ((self._data[byte_index] >> bit_index) & 1) + self._offset += 1 + return value + + def read_7bit_string(self, minimum: int, maximum: int) -> str: + length = self.read_constrained_length(minimum, maximum) + return "".join(chr(self.read_bits(7)) for _ in range(length)) + + def read_constrained_length(self, minimum: int, maximum: int) -> int: + span = maximum - minimum + if span == 0: + return minimum + return minimum + self.read_bits(math.ceil(math.log2(span + 1))) + + +class BitWriter: + def __init__(self) -> None: + self._bits: list[int] = [] + + def write_bits(self, value: int, bit_count: int) -> None: + if value < 0 or value >= (1 << bit_count): + msg = f"value does not fit in {bit_count} bits" + raise ValueError(msg) + for shift in range(bit_count - 1, -1, -1): + self._bits.append((value >> shift) & 1) + + def write_7bit_string(self, value: str, minimum: int, maximum: int) -> None: + if len(value) < minimum or len(value) > maximum: + raise ValueError("string length outside constrained range") + self.write_constrained_length(len(value), minimum, maximum) + for char in value: + code = ord(char) + if code > 0x7F: + raise ValueError("only 7-bit IA5 strings are supported") + self.write_bits(code, 7) + + def write_constrained_length(self, value: int, minimum: int, maximum: int) -> None: + if value < minimum or value > maximum: + raise ValueError("length outside constrained range") + span = maximum - minimum + if span: + self.write_bits(value - minimum, math.ceil(math.log2(span + 1))) + + def to_bytes(self) -> bytes: + padding = (-len(self._bits)) % 8 + bits = [*self._bits, *([0] * padding)] + output = bytearray() + for offset in range(0, len(bits), 8): + byte = 0 + for bit in bits[offset : offset + 8]: + byte = (byte << 1) | bit + output.append(byte) + return bytes(output) + + +def read_fixed_7bit_string(data: bytes | bytearray, *, bit_offset: int, char_count: int) -> str: + reader = BitReader(data) + reader._offset = bit_offset # noqa: SLF001 + return "".join(chr(reader.read_bits(7)) for _ in range(char_count)) + + +def set_bits(data: bytearray, bit_offset: int, bit_count: int, value: int) -> None: + if value < 0 or value >= (1 << bit_count): + msg = f"value does not fit in {bit_count} bits" + raise ValueError(msg) + for index in range(bit_count): + bit = (value >> (bit_count - index - 1)) & 1 + absolute = bit_offset + index + byte_index = absolute // 8 + bit_index = 7 - (absolute % 8) + if bit: + data[byte_index] |= 1 << bit_index + else: + data[byte_index] &= ~(1 << bit_index) + + +def set_fixed_7bit_string(data: bytearray, bit_offset: int, value: str) -> None: + for index, char in enumerate(value): + code = ord(char) + if code > 0x7F: + raise ValueError("only 7-bit IA5 strings are supported") + set_bits(data, bit_offset + index * 7, 7, code) diff --git a/src/saic_ismart_client_ng/india/tap_codec.py b/src/saic_ismart_client_ng/india/tap_codec.py new file mode 100644 index 0000000..12802f6 --- /dev/null +++ b/src/saic_ismart_client_ng/india/tap_codec.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from functools import lru_cache +import time +from typing import Any + +import asn1tools + +INDIA_TAP_ASN1 = "MGIndiaTapModule\n\nDEFINITIONS AUTOMATIC TAGS ::= BEGIN\n\nMPDispatcherBody ::= SEQUENCE\n{\n uid IA5String(SIZE(50)) OPTIONAL,\n token IA5String(SIZE(40)) OPTIONAL,\n applicationID IA5String(SIZE(3)),\n vin IA5String(SIZE(17)) OPTIONAL,\n messageID INTEGER(0..255),\n eventCreationTime INTEGER(0..2147483647),\n eventID INTEGER(0..2147483647) OPTIONAL,\n ulMessageCounter INTEGER(0..65535) OPTIONAL,\n dlMessageCounter INTEGER(0..65535) OPTIONAL,\n ackMessageCounter INTEGER(0..65535) OPTIONAL,\n ackRequired BOOLEAN OPTIONAL,\n applicationDataLength INTEGER(0..65535) OPTIONAL,\n applicationDataEncoding DataEncodingType OPTIONAL,\n applicationDataProtocolVersion INTEGER(0..65535) OPTIONAL,\n testFlag INTEGER(1..3) OPTIONAL,\n result INTEGER(0..65535) OPTIONAL,\n errorMessage OCTET STRING(SIZE(1..1024)) OPTIONAL\n}\n\nDataEncodingType ::= ENUMERATED { perUnaligned(0), der(1), ber(2) }\n\nOTARVMVehicleStatusReq ::= SEQUENCE { vehStatusReqType INTEGER(0..255) }\n\nOTARVMVehicleStatusResp513 ::= SEQUENCE\n{\n statusTime INTEGER(0..2147483647),\n gpsPosition RvsPosition,\n basicVehicleStatus RvsBasicStatus513,\n extendedVehicleStatus RvsExtStatus OPTIONAL\n}\n\nRvsPosition ::= SEQUENCE\n{\n wayPoint RvsWayPoint,\n timestamp4Short Timestamp4Short,\n gpsStatus GPSStatus\n}\n\nRvsWayPoint ::= SEQUENCE\n{\n position RvsWGS84Point,\n heading INTEGER(0..359),\n speed INTEGER(-1000..4500),\n hdop INTEGER(0..1000),\n satellites INTEGER(0..16)\n}\n\nRvsWGS84Point ::= SEQUENCE\n{\n latitude INTEGER(-90000000..90000000),\n longitude INTEGER(-180000000..180000000),\n altitude INTEGER(-100..8900)\n}\n\nTimestamp4Short ::= SEQUENCE { seconds INTEGER(0..2147483647) }\nGPSStatus ::= ENUMERATED { noGpsSignal(0), timeFix(1), fix2D(2), fix3D(3) }\n\nRvsBasicStatus513 ::= SEQUENCE\n{\n driverDoor BOOLEAN,\n passengerDoor BOOLEAN,\n rearLeftDoor BOOLEAN,\n rearRightDoor BOOLEAN,\n bootStatus BOOLEAN,\n bonnetStatus BOOLEAN,\n lockStatus BOOLEAN,\n driverWindow BOOLEAN OPTIONAL,\n passengerWindow BOOLEAN OPTIONAL,\n rearLeftWindow BOOLEAN OPTIONAL,\n rearRightWindow BOOLEAN OPTIONAL,\n sunroofStatus BOOLEAN OPTIONAL,\n frontRrightTyrePressure INTEGER(0..255) OPTIONAL,\n frontLeftTyrePressure INTEGER(0..255) OPTIONAL,\n rearRightTyrePressure INTEGER(0..255) OPTIONAL,\n rearLeftTyrePressure INTEGER(0..255) OPTIONAL,\n wheelTyreMonitorStatus INTEGER(0..255) OPTIONAL,\n sideLightStatus BOOLEAN,\n dippedBeamStatus BOOLEAN,\n mainBeamStatus BOOLEAN,\n vehicleAlarmStatus INTEGER(0..255) OPTIONAL,\n engineStatus INTEGER(0..255),\n powerMode INTEGER(0..255),\n lastKeySeen INTEGER(0..65535),\n currentJourneyDistance INTEGER(0..65535),\n currentJourneyID INTEGER(0..2147483647),\n interiorTemperature INTEGER(-128..127),\n exteriorTemperature INTEGER(-128..127),\n fuelLevelPrc INTEGER(0..255),\n fuelRange INTEGER(0..65535),\n remoteClimateStatus INTEGER(0..255),\n frontLeftSeatHeatLevel INTEGER(0..255) OPTIONAL,\n frontRightSeatHeatLevel INTEGER(0..255) OPTIONAL,\n canBusActive BOOLEAN,\n timeOfLastCANBUSActivity INTEGER(0..2147483647),\n clstrDspdFuelLvlSgmt INTEGER(0..255),\n mileage INTEGER(0..2147483647),\n batteryVoltage INTEGER(0..65535),\n extendedData1 INTEGER(0..2147483647) OPTIONAL,\n extendedData2 INTEGER(0..2147483647) OPTIONAL,\n handBrake BOOLEAN\n}\n\nRvsExtStatus ::= SEQUENCE { vehicleAlerts SEQUENCE SIZE(0..64) OF VehicleAlertInfo }\nVehicleAlertInfo ::= SEQUENCE { id INTEGER(0..255), value INTEGER(0..255) }\n\nEND\n" + +TAP_PROTOCOL_VERSION = 33 +TAP_RESERVED_SIZE = 16 +STATUS_APPLICATION_ID = "511" +STATUS_APPLICATION_PROTOCOL = 513 + + +class IndiaTapError(ValueError): + pass + + +@lru_cache(maxsize=1) +def _codec() -> Any: + return asn1tools.compile_string(INDIA_TAP_ASN1, "uper") + + +def encode_status_request(uid: str, token: str, vin: str, event_id: int) -> str: + app = _codec().encode("OTARVMVehicleStatusReq", {"vehStatusReqType": 2}) + body = _codec().encode( + "MPDispatcherBody", + { + "uid": uid, + "token": token, + "applicationID": STATUS_APPLICATION_ID, + "vin": vin, + "messageID": 1, + "eventCreationTime": int(time.time()), + "eventID": event_id, + "ulMessageCounter": 0, + "dlMessageCounter": 0, + "ackMessageCounter": 0, + "ackRequired": False, + "applicationDataLength": len(app), + "applicationDataEncoding": "perUnaligned", + "applicationDataProtocolVersion": STATUS_APPLICATION_PROTOCOL, + "testFlag": 2, + "result": 0, + }, + ) + dispatcher_length = len(body) + 3 + if dispatcher_length > 255: + raise IndiaTapError("TAP dispatcher is too large") + payload = ( + bytes((TAP_PROTOCOL_VERSION, dispatcher_length, 0)) + + bytes(TAP_RESERVED_SIZE) + + body + + app + ) + return "1" + f"{len(payload) + 3:04X}" + payload.hex().upper() + + +def decode_status_response(raw: str) -> tuple[dict[str, Any], dict[str, Any] | None]: + dispatcher, app = _decode_v21_response(raw) + if app is None: + return dispatcher, None + try: + status = _codec().decode("OTARVMVehicleStatusResp513", app) + except Exception as err: + raise IndiaTapError("Unable to decode India TAP vehicle status") from err + return dispatcher, status + + +def _decode_v21_response(raw: str) -> tuple[dict[str, Any], bytes | None]: + if len(raw) < 5 or raw[0] != "1": + raise IndiaTapError("Unexpected India TAP v2.1 response framing") + try: + payload = bytes.fromhex(raw[5:]) + except ValueError as err: + raise IndiaTapError("India TAP v2.1 response is not hexadecimal") from err + if len(payload) < 19: + raise IndiaTapError("India TAP v2.1 response is too short") + dispatcher_length = payload[1] + dispatcher_end = TAP_RESERVED_SIZE + dispatcher_length + if dispatcher_length < 3 or dispatcher_end > len(payload): + raise IndiaTapError("Invalid India TAP v2.1 dispatcher length") + try: + dispatcher = _codec().decode("MPDispatcherBody", payload[19:dispatcher_end]) + except Exception as err: + raise IndiaTapError("Unable to decode India TAP v2.1 dispatcher") from err + app_length = dispatcher.get("applicationDataLength", 0) + if not app_length: + return dispatcher, None + app = payload[dispatcher_end : dispatcher_end + app_length] + if len(app) != app_length: + raise IndiaTapError("Truncated India TAP v2.1 application data") + return dispatcher, app diff --git a/tests/test_india_api.py b/tests/test_india_api.py new file mode 100644 index 0000000..9979cc2 --- /dev/null +++ b/tests/test_india_api.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import hashlib +import hmac + +from saic_ismart_client_ng.india import ( + INDIA_GATEWAY_BASE_URL, + INDIA_TAP_LOGIN_URL, + INDIA_TAP_STATUS_URL, + SaicIndiaApi, + decode_login_token, + india_gateway_signature, + india_tap_signature, + make_india_device_id, + normalize_india_phone, + parse_india_vehicle, + parse_india_vehicle_status, +) +from saic_ismart_client_ng.india.bitcodec import BitWriter +from saic_ismart_client_ng.india.tap_codec import ( + TAP_RESERVED_SIZE, + _codec, + decode_status_response, + encode_status_request, +) + + +def test_india_urls_match_mg_india_hosts() -> None: + assert INDIA_TAP_LOGIN_URL == "https://iov-tap.mgindia.co.in/TAP.Web/ota.mp" + assert INDIA_TAP_STATUS_URL == "https://iov-tap.mgindia.co.in/TAP.Web/ota.mpv21" + assert INDIA_GATEWAY_BASE_URL == "https://iov-gateway.mgindia.co.in/api.app/v1" + + +def test_normalize_india_phone_keeps_last_ten_digits() -> None: + assert normalize_india_phone("+91 98765 43210") == "9876543210" + + +def test_india_device_id_is_stable_and_bounded() -> None: + first = make_india_device_id("9876543210") + second = make_india_device_id("9876543210") + assert first == second + assert first.startswith("saic-ng-india-") + assert len(first) == 103 + + +def test_tap_signature_uses_body_derived_md5_hmac_key() -> None: + body = "0123456789ABCDEF" + key_material = body[1 : len(body) // 2] + hmac_key = hashlib.md5(key_material.encode()).hexdigest() # noqa: S324 + expected = hmac.new(hmac_key.encode(), body.encode(), hashlib.sha256).hexdigest() + assert india_tap_signature(body) == expected + + +def test_gateway_signature_matches_india_formula() -> None: + signing_path = "/vehicle/userVinList" + timestamp = "1700000000000" + content_type = "application/json" + key_part_one = hashlib.md5(signing_path.encode()).hexdigest() # noqa: S324 + encrypt_key = hashlib.md5( # noqa: S324 + (key_part_one + timestamp + "1" + content_type).encode() + ).hexdigest() + hmac_key = hashlib.md5((encrypt_key + timestamp).encode()).hexdigest() # noqa: S324 + expected = hmac.new( + hmac_key.encode(), + (signing_path + timestamp + "1" + content_type).encode(), + hashlib.sha256, + ).hexdigest() + assert india_gateway_signature(signing_path, timestamp, content_type) == expected + + +def test_build_login_body_contains_framing_and_signature() -> None: + client = SaicIndiaApi("9876543210", "secret1") + body = client._build_login_body() + assert body[4] == "1" + assert len(body) == int(body[:4], 16) + assert len(india_tap_signature(body)) == 64 + + +def test_decode_login_token_requires_matching_refresh_token() -> None: + writer = BitWriter() + writer.write_bits(0, 6) + writer.write_7bit_string("A" * 40, 40, 40) + writer.write_7bit_string("A" * 40, 40, 40) + assert decode_login_token(writer.to_bytes()) == "A" * 40 + + +def test_encode_status_request_uses_protocol_513() -> None: + raw = encode_status_request("1" * 50, "2" * 40, "3" * 17, 42) + payload = bytes.fromhex(raw[5:]) + dispatcher_end = TAP_RESERVED_SIZE + payload[1] + dispatcher = _codec().decode("MPDispatcherBody", payload[19:dispatcher_end]) + app = _codec().decode("OTARVMVehicleStatusReq", payload[dispatcher_end:]) + + assert raw.startswith("1") + assert dispatcher["applicationID"] == "511" + assert dispatcher["applicationDataProtocolVersion"] == 513 + assert dispatcher["eventID"] == 42 + assert app == {"vehStatusReqType": 2} + + +def test_decode_and_parse_india_status_response() -> None: + status = { + "statusTime": 1_700_000_000, + "gpsPosition": { + "wayPoint": { + "position": {"latitude": 0, "longitude": 0, "altitude": 0}, + "heading": 0, + "speed": 0, + "hdop": 0, + "satellites": 0, + }, + "timestamp4Short": {"seconds": 1_700_000_000}, + "gpsStatus": "noGpsSignal", + }, + "basicVehicleStatus": { + "driverDoor": False, + "passengerDoor": False, + "rearLeftDoor": False, + "rearRightDoor": False, + "bootStatus": False, + "bonnetStatus": False, + "lockStatus": True, + "driverWindow": False, + "passengerWindow": False, + "rearLeftWindow": False, + "rearRightWindow": False, + "sideLightStatus": False, + "dippedBeamStatus": False, + "mainBeamStatus": False, + "engineStatus": 0, + "powerMode": 0, + "lastKeySeen": 0, + "currentJourneyDistance": 0, + "currentJourneyID": 1, + "interiorTemperature": 30, + "exteriorTemperature": 26, + "fuelLevelPrc": 94, + "fuelRange": 4040, + "remoteClimateStatus": 3, + "canBusActive": False, + "timeOfLastCANBUSActivity": 1_699_999_999, + "clstrDspdFuelLvlSgmt": 0, + "mileage": 49353, + "batteryVoltage": 140, + "handBrake": False, + }, + } + app = _codec().encode("OTARVMVehicleStatusResp513", status) + dispatcher = _codec().encode( + "MPDispatcherBody", + { + "applicationID": "511", + "messageID": 1, + "eventCreationTime": 1_700_000_000, + "eventID": 42, + "applicationDataLength": len(app), + "applicationDataEncoding": "perUnaligned", + "applicationDataProtocolVersion": 513, + "result": 0, + }, + ) + payload = bytes((33, len(dispatcher) + 3, 0)) + bytes(16) + dispatcher + app + raw = "1" + f"{len(payload) + 3:04X}" + payload.hex().upper() + + decoded_dispatcher, decoded_status = decode_status_response(raw) + parsed = parse_india_vehicle_status(decoded_status or {}) + + assert decoded_dispatcher["eventID"] == 42 + assert parsed.battery_percent == 94 + assert parsed.range_km == 404.0 + assert parsed.odometer_km == 4935.3 + assert parsed.auxiliary_battery_voltage == 14.0 + assert parsed.locked is True + assert parsed.climate_running is True + + +def test_parse_india_vehicle() -> None: + vehicle = parse_india_vehicle( + { + "vin": "VIN123", + "brandName": "MG", + "modelName": "Astor", + "modelYear": "2025", + "series": "Astor", + "isActivate": True, + } + ) + assert vehicle.vin == "VIN123" + assert vehicle.model_name == "Astor" + assert vehicle.is_active is True