diff --git a/spp_dci_client/README.rst b/spp_dci_client/README.rst
index e3ecc9a56..c27197ec1 100644
--- a/spp_dci_client/README.rst
+++ b/spp_dci_client/README.rst
@@ -140,6 +140,17 @@ Dependencies
Changelog
=========
+19.0.2.0.2
+~~~~~~~~~~
+
+- fix(security): make the OAuth2 token/header methods private so they
+ are no longer callable over RPC — a low-privilege internal user can no
+ longer mint a DCI access token or obtain a Bearer header via
+ ``get_oauth2_token()`` / ``get_headers()``. Restrict the token cache
+ fields (``_oauth2_access_token`` / ``_oauth2_token_expires_at``) to
+ system administrators, and require write access to run a connection
+ test.
+
19.0.2.0.0
~~~~~~~~~~
diff --git a/spp_dci_client/__manifest__.py b/spp_dci_client/__manifest__.py
index 457a40662..84f9afa46 100644
--- a/spp_dci_client/__manifest__.py
+++ b/spp_dci_client/__manifest__.py
@@ -2,7 +2,7 @@
{
"name": "OpenSPP DCI Client",
"summary": "Base DCI client infrastructure with OAuth2 and data source management",
- "version": "19.0.2.0.1",
+ "version": "19.0.2.0.2",
"category": "OpenSPP/Integration",
"author": "OpenSPP.org",
"website": "https://github.com/OpenSPP/OpenSPP2",
diff --git a/spp_dci_client/models/data_source.py b/spp_dci_client/models/data_source.py
index 71ff54fb5..f5a2bfe17 100644
--- a/spp_dci_client/models/data_source.py
+++ b/spp_dci_client/models/data_source.py
@@ -176,13 +176,17 @@ class DCIDataSource(models.Model):
help="Last connection error message",
)
- # OAuth2 token cache fields (transient storage)
+ # OAuth2 token cache fields (transient storage). Restricted to system
+ # administrators: the cached access token is a credential and must not be
+ # readable by ordinary internal users (who have read on this model).
_oauth2_access_token = fields.Char(
string="Cached Access Token",
+ groups="base.group_system",
help="Cached OAuth2 access token (internal use only)",
)
_oauth2_token_expires_at = fields.Datetime(
string="Token Expiry",
+ groups="base.group_system",
help="Cached token expiration timestamp (internal use only)",
)
@@ -306,14 +310,18 @@ def _check_sender_id(self):
if record.auth_type != "none" and not record.our_sender_id:
raise ValidationError(_("Sender ID is required for authenticated connections."))
- def clear_oauth2_token_cache(self):
+ def _clear_oauth2_token_cache(self):
"""Clear cached OAuth2 token, forcing a fresh token request on next use.
- This can be useful when the cached token becomes invalid or when
- troubleshooting authentication issues.
+ Internal (underscore-prefixed) so it is NOT callable over RPC — otherwise
+ a low-privilege user could force repeated re-minting. It is invoked from
+ trusted server-side code (e.g. DCIClient on a 401 retry). The cache fields
+ are admin-restricted, so write via sudo: clearing the cache must work
+ regardless of the current user's privilege.
"""
self.ensure_one()
- self.write(
+ # nosemgrep: odoo-sudo-without-context
+ self.sudo().write(
{
"_oauth2_access_token": False,
"_oauth2_token_expires_at": False,
@@ -321,9 +329,13 @@ def clear_oauth2_token_cache(self):
)
_logger.info("Cleared OAuth2 token cache for data source: %s", self.code)
- def get_oauth2_token(self, force_refresh=False):
+ def _get_oauth2_token(self, force_refresh=False):
"""Get or refresh OAuth2 access token.
+ Internal (underscore-prefixed) so it is NOT callable over RPC: it mints a
+ token from the administrator-only OAuth2 client secret, so it must only be
+ reachable from trusted server-side code (e.g. the DCIClient service).
+
Args:
force_refresh: If True, skip cache and fetch a new token
@@ -338,24 +350,25 @@ def get_oauth2_token(self, force_refresh=False):
if self.auth_type != "oauth2":
raise UserError(_("This data source does not use OAuth2 authentication."))
+ # sudo(): the OAuth2 client secret and the token cache fields are
+ # restricted to administrators. This method is internal and only reached
+ # from trusted server-side code, so reading/writing them via sudo is safe.
+ sudo_self = self.sudo() # nosemgrep: odoo-sudo-without-context
+
# Check if cached token is still valid (with 60 second buffer)
now = fields.Datetime.now()
- if not force_refresh and self._oauth2_access_token and self._oauth2_token_expires_at:
- expiry_with_buffer = self._oauth2_token_expires_at - timedelta(seconds=60)
+ if not force_refresh and sudo_self._oauth2_access_token and sudo_self._oauth2_token_expires_at:
+ expiry_with_buffer = sudo_self._oauth2_token_expires_at - timedelta(seconds=60)
if now < expiry_with_buffer:
- _logger.info(
- "Using cached OAuth2 token for data source: %s (expires at %s)",
- self.code,
- self._oauth2_token_expires_at,
- )
- return self._oauth2_access_token
+ # Do not log the token expiry field (it is a credential-adjacent
+ # cache field); log only the data source code.
+ _logger.info("Using cached OAuth2 token for data source: %s", self.code)
+ return sudo_self._oauth2_access_token
# Request new token
_logger.info("Requesting new OAuth2 token for data source: %s", self.code)
try:
- # Use sudo() to access OAuth2 credentials which are restricted to administrators
- sudo_self = self.sudo() # nosemgrep: odoo-sudo-without-context
token_data = {
"grant_type": "client_credentials",
"client_id": sudo_self.oauth2_client_id,
@@ -446,9 +459,13 @@ def get_oauth2_token(self, force_refresh=False):
# Show generic user-friendly message
raise UserError(_("An unexpected error occurred. Please contact your administrator.")) from e
- def get_headers(self, force_refresh_token=False):
+ def _get_headers(self, force_refresh_token=False):
"""Get HTTP headers for API requests including authentication.
+ Internal (underscore-prefixed) so it is NOT callable over RPC: it returns
+ an Authorization header carrying credentials minted from administrator-only
+ fields. Call it only from trusted server-side code (e.g. DCIClient).
+
Args:
force_refresh_token: If True, force refresh OAuth2 token (skip cache)
@@ -466,7 +483,7 @@ def get_headers(self, force_refresh_token=False):
}
_logger.debug(
- "get_headers() called for data source %s, auth_type=%s, force_refresh=%s",
+ "_get_headers() called for data source %s, auth_type=%s, force_refresh=%s",
self.code,
self.auth_type,
force_refresh_token,
@@ -474,7 +491,7 @@ def get_headers(self, force_refresh_token=False):
if self.auth_type == "oauth2":
_logger.info("Fetching OAuth2 token for data source %s", self.code)
- token = self.get_oauth2_token(force_refresh=force_refresh_token)
+ token = self._get_oauth2_token(force_refresh=force_refresh_token)
headers["Authorization"] = f"Bearer {token}"
_logger.info(
"Added OAuth2 Authorization header for data source %s (token length: %d)",
@@ -482,9 +499,14 @@ def get_headers(self, force_refresh_token=False):
len(token) if token else 0,
)
elif self.auth_type == "bearer":
- if not self.bearer_token:
+ # bearer_token is admin-restricted (groups=base.group_system); read it
+ # via sudo so a non-admin internal caller (this method is not
+ # RPC-exposed) can build the header, matching the OAuth2 branch.
+ # nosemgrep: odoo-sudo-without-context
+ bearer_token = self.sudo().bearer_token
+ if not bearer_token:
raise UserError(_("Bearer token is not configured for this data source."))
- headers["Authorization"] = f"Bearer {self.bearer_token}"
+ headers["Authorization"] = f"Bearer {bearer_token}"
return headers
@@ -499,10 +521,15 @@ def test_connection(self):
"""
self.ensure_one()
+ # Testing a connection mints/uses the data source's administrator-only
+ # credentials and makes an outbound call, so require management (write)
+ # access on the record — a read-only user must not trigger it.
+ self.check_access("write")
+
_logger.info("Testing connection to data source: %s (%s)", self.name, self.code)
try:
- headers = self.get_headers()
+ headers = self._get_headers()
# Probe the authenticated ping endpoint. A 200 confirms both
# reachability *and* that our credentials are accepted; a 401/403
diff --git a/spp_dci_client/readme/HISTORY.md b/spp_dci_client/readme/HISTORY.md
index 4aaf9afef..9d07ae7c7 100644
--- a/spp_dci_client/readme/HISTORY.md
+++ b/spp_dci_client/readme/HISTORY.md
@@ -1,3 +1,7 @@
+### 19.0.2.0.2
+
+- fix(security): make the OAuth2 token/header methods private so they are no longer callable over RPC — a low-privilege internal user can no longer mint a DCI access token or obtain a Bearer header via `get_oauth2_token()` / `get_headers()`. Restrict the token cache fields (`_oauth2_access_token` / `_oauth2_token_expires_at`) to system administrators, and require write access to run a connection test.
+
### 19.0.2.0.0
- Initial migration to OpenSPP2
diff --git a/spp_dci_client/services/client.py b/spp_dci_client/services/client.py
index 522f65f4b..e314493a5 100644
--- a/spp_dci_client/services/client.py
+++ b/spp_dci_client/services/client.py
@@ -956,8 +956,10 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
"""
url = f"{self.data_source.base_url.rstrip('/')}{endpoint}"
- # Get headers from data source (includes auth)
- headers = self.data_source.get_headers()
+ # Get headers from data source (includes auth). Internal method — the
+ # DCIClient service is the trusted server-side entry point for outbound
+ # DCI calls (get_headers/get_oauth2_token are not RPC-exposed).
+ headers = self.data_source._get_headers()
# Track timing and result for outgoing log
start_time = time.monotonic()
@@ -999,7 +1001,7 @@ def _make_request(self, endpoint: str, envelope: dict, _retry_auth: bool = True)
log_response_data = response.json()
except json.JSONDecodeError:
log_response_data = None
- self.data_source.clear_oauth2_token_cache()
+ self.data_source._clear_oauth2_token_cache()
return self._make_request(endpoint, envelope, _retry_auth=False)
# Check for HTTP errors
diff --git a/spp_dci_client/static/description/index.html b/spp_dci_client/static/description/index.html
index 11e9ca70c..9c7558b5b 100644
--- a/spp_dci_client/static/description/index.html
+++ b/spp_dci_client/static/description/index.html
@@ -514,6 +514,18 @@
+
19.0.2.0.2
+
+- fix(security): make the OAuth2 token/header methods private so they
+are no longer callable over RPC — a low-privilege internal user can no
+longer mint a DCI access token or obtain a Bearer header via
+get_oauth2_token() / get_headers(). Restrict the token cache
+fields (_oauth2_access_token / _oauth2_token_expires_at) to
+system administrators, and require write access to run a connection
+test.
+
+
+
19.0.2.0.0
- Initial migration to OpenSPP2
diff --git a/spp_dci_client/tests/__init__.py b/spp_dci_client/tests/__init__.py
index abee02b19..a6f1258b5 100644
--- a/spp_dci_client/tests/__init__.py
+++ b/spp_dci_client/tests/__init__.py
@@ -8,3 +8,5 @@
from . import test_data_source_validators
from . import test_data_source_http
from . import test_client_convenience
+
+from . import test_data_source_security
diff --git a/spp_dci_client/tests/test_client_convenience.py b/spp_dci_client/tests/test_client_convenience.py
index af64a25ef..e62dff642 100644
--- a/spp_dci_client/tests/test_client_convenience.py
+++ b/spp_dci_client/tests/test_client_convenience.py
@@ -211,7 +211,7 @@ def setUp(self):
"message": {"q": 1},
}
# Avoid auth HTTP in get_headers
- p = patch.object(type(self.ds), "get_headers", return_value={"Content-Type": "application/json"})
+ p = patch.object(type(self.ds), "_get_headers", return_value={"Content-Type": "application/json"})
p.start()
self.addCleanup(p.stop)
@@ -261,7 +261,7 @@ def test_make_request_oauth_401_retry(self):
resp.raise_for_status.side_effect = httpx.HTTPStatusError(
"401", request=MagicMock(), response=MagicMock(status_code=401, text="unauthorized")
)
- with patch(HTTPX, return_value=_cm(resp)), patch.object(type(oauth_ds), "clear_oauth2_token_cache") as clear:
+ with patch(HTTPX, return_value=_cm(resp)), patch.object(type(oauth_ds), "_clear_oauth2_token_cache") as clear:
with self.assertRaises(UserError):
client._make_request("/sync/search", self.envelope)
clear.assert_called_once()
diff --git a/spp_dci_client/tests/test_data_source.py b/spp_dci_client/tests/test_data_source.py
index 118ea632d..e88e68dc6 100644
--- a/spp_dci_client/tests/test_data_source.py
+++ b/spp_dci_client/tests/test_data_source.py
@@ -476,7 +476,7 @@ def test_get_oauth2_token_success(self, mock_client_class):
mock_client_class.return_value = mock_client
# Get token
- token = ds.get_oauth2_token()
+ token = ds._get_oauth2_token()
self.assertEqual(token, "test_token_12345")
self.assertTrue(ds._oauth2_access_token)
@@ -513,7 +513,7 @@ def test_get_oauth2_token_cached(self, mock_client_class):
)
# Get token - should use cache
- token = ds.get_oauth2_token()
+ token = ds._get_oauth2_token()
self.assertEqual(token, "cached_token")
# HTTP client should not be called
@@ -531,7 +531,7 @@ def test_get_oauth2_token_wrong_auth_type(self):
)
with self.assertRaises(UserError) as cm:
- ds.get_oauth2_token()
+ ds._get_oauth2_token()
self.assertIn("oauth2", str(cm.exception).lower())
@patch("httpx.Client")
@@ -615,7 +615,7 @@ def test_get_headers_none_auth(self):
}
)
- headers = ds.get_headers()
+ headers = ds._get_headers()
self.assertEqual(headers["Content-Type"], "application/json")
self.assertEqual(headers["Accept"], "application/json")
@@ -651,7 +651,7 @@ def test_get_headers_oauth2(self, mock_client_class):
mock_client.__exit__.return_value = None
mock_client_class.return_value = mock_client
- headers = ds.get_headers()
+ headers = ds._get_headers()
self.assertEqual(headers["Authorization"], "Bearer test_token_12345")
diff --git a/spp_dci_client/tests/test_data_source_http.py b/spp_dci_client/tests/test_data_source_http.py
index e93203fb1..3d7e5889c 100644
--- a/spp_dci_client/tests/test_data_source_http.py
+++ b/spp_dci_client/tests/test_data_source_http.py
@@ -56,7 +56,7 @@ def test_get_token_rejects_non_oauth2(self):
}
)
with self.assertRaises(UserError):
- ds.get_oauth2_token()
+ ds._get_oauth2_token()
def test_get_token_uses_valid_cache(self):
ds = self._oauth_ds()
@@ -67,7 +67,7 @@ def test_get_token_uses_valid_cache(self):
}
)
# No HTTP mock needed; cached token returned without a request.
- self.assertEqual(ds.get_oauth2_token(), "cached-tok")
+ self.assertEqual(ds._get_oauth2_token(), "cached-tok")
def test_get_token_fetches_new_via_body(self):
ds = self._oauth_ds()
@@ -75,7 +75,7 @@ def test_get_token_fetches_new_via_body(self):
resp.raise_for_status = MagicMock()
resp.json.return_value = {"access_token": "fresh-tok", "expires_in": 1800}
with patch(HTTPX_CLIENT, return_value=_client_cm(resp)):
- token = ds.get_oauth2_token()
+ token = ds._get_oauth2_token()
self.assertEqual(token, "fresh-tok")
self.assertEqual(ds.sudo()._oauth2_access_token, "fresh-tok")
@@ -86,7 +86,7 @@ def test_get_token_query_credential_location(self):
resp.json.return_value = {"access_token": "q-tok"}
cm = _client_cm(resp)
with patch(HTTPX_CLIENT, return_value=cm):
- self.assertEqual(ds.get_oauth2_token(), "q-tok")
+ self.assertEqual(ds._get_oauth2_token(), "q-tok")
# query mode posts with params=, not data=
client = cm.__enter__.return_value
_, kwargs = client.post.call_args
@@ -99,7 +99,7 @@ def test_get_token_missing_access_token_raises(self):
resp.json.return_value = {"no_token": "here"}
with patch(HTTPX_CLIENT, return_value=_client_cm(resp)):
with self.assertRaises(UserError):
- ds.get_oauth2_token()
+ ds._get_oauth2_token()
def test_get_token_http_status_error(self):
ds = self._oauth_ds()
@@ -108,7 +108,7 @@ def test_get_token_http_status_error(self):
resp.raise_for_status.side_effect = httpx.HTTPStatusError("401", request=MagicMock(), response=err_resp)
with patch(HTTPX_CLIENT, return_value=_client_cm(resp)):
with self.assertRaises(UserError) as ctx:
- ds.get_oauth2_token()
+ ds._get_oauth2_token()
self.assertIn("Authentication failed", str(ctx.exception))
def test_get_token_request_error_timeout(self):
@@ -118,15 +118,15 @@ def test_get_token_request_error_timeout(self):
cm.__exit__.return_value = False
with patch(HTTPX_CLIENT, return_value=cm):
with self.assertRaises(UserError) as ctx:
- ds.get_oauth2_token()
+ ds._get_oauth2_token()
self.assertIn("timed out", str(ctx.exception).lower())
# --- get_headers ---------------------------------------------------------
def test_get_headers_oauth2(self):
ds = self._oauth_ds()
- with patch.object(type(ds), "get_oauth2_token", return_value="abc"):
- headers = ds.get_headers()
+ with patch.object(type(ds), "_get_oauth2_token", return_value="abc"):
+ headers = ds._get_headers()
self.assertEqual(headers["Authorization"], "Bearer abc")
def test_get_headers_bearer(self):
@@ -140,7 +140,7 @@ def test_get_headers_bearer(self):
"bearer_token": "btok",
}
)
- self.assertEqual(ds.get_headers()["Authorization"], "Bearer btok")
+ self.assertEqual(ds._get_headers()["Authorization"], "Bearer btok")
# --- test_connection -----------------------------------------------------
diff --git a/spp_dci_client/tests/test_data_source_security.py b/spp_dci_client/tests/test_data_source_security.py
new file mode 100644
index 000000000..fbae6f889
--- /dev/null
+++ b/spp_dci_client/tests/test_data_source_security.py
@@ -0,0 +1,101 @@
+# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
+"""Security tests: low-privilege users must not be able to mint or read DCI
+OAuth tokens, nor trigger credentialed connection tests.
+"""
+
+from odoo import Command
+from odoo.exceptions import AccessError
+from odoo.tests import TransactionCase, tagged
+
+
+@tagged("post_install", "-at_install")
+class TestDataSourceCredentialAccess(TransactionCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.DataSource = cls.env["spp.dci.data.source"]
+ cls.user = cls.env["res.users"].create(
+ {
+ "name": "DCI Low-Priv User",
+ "login": "dci_lowpriv_user",
+ "group_ids": [Command.link(cls.env.ref("base.group_user").id)],
+ }
+ )
+ cls.ds = cls.DataSource.create(
+ {
+ "name": "OAuth DS",
+ "code": "oauth_ds_sec",
+ "base_url": "https://dci.example.org/api",
+ "auth_type": "oauth2",
+ "our_sender_id": "openspp.test",
+ "oauth2_token_url": "https://auth.example.org/token",
+ "oauth2_client_id": "cid",
+ "oauth2_client_secret": "secret",
+ }
+ )
+
+ def test_token_methods_are_private(self):
+ """The credential methods must be private (underscore-prefixed).
+
+ Odoo blocks RPC ``call_kw`` to underscore-prefixed methods (framework
+ guarantee), so making them private removes them as an RPC entry point.
+ This asserts the public names are gone (catching a re-added public alias)
+ and the private ones exist; the RPC-dispatch block itself is a framework
+ property of ``_``-prefixed names.
+ """
+ model = self.env["spp.dci.data.source"]
+ self.assertFalse(hasattr(model, "get_oauth2_token"), "public get_oauth2_token must be removed")
+ self.assertFalse(hasattr(model, "get_headers"), "public get_headers must be removed")
+ self.assertTrue(hasattr(model, "_get_oauth2_token"))
+ self.assertTrue(hasattr(model, "_get_headers"))
+
+ def test_cached_token_field_hidden_from_regular_user(self):
+ """The cached access token is a credential; it must not be visible to an
+ ordinary internal user (who has read on the model)."""
+ fields_for_user = self.ds.with_user(self.user).fields_get()
+ self.assertNotIn("_oauth2_access_token", fields_for_user)
+ self.assertNotIn("_oauth2_token_expires_at", fields_for_user)
+ # An administrator can see them (control).
+ fields_for_admin = self.ds.fields_get()
+ self.assertIn("_oauth2_access_token", fields_for_admin)
+
+ def test_regular_user_cannot_read_cached_token(self):
+ """Even with a token cached, a regular user cannot read it back."""
+ self.ds.sudo().write({"_oauth2_access_token": "super-secret-token"})
+ with self.assertRaises(AccessError):
+ self.ds.with_user(self.user).read(["_oauth2_access_token"])
+
+ def test_test_connection_requires_write_access(self):
+ """test_connection mints/uses admin-only credentials and makes an
+ outbound call; a read-only user must not be able to trigger it."""
+ with self.assertRaises(AccessError):
+ self.ds.with_user(self.user).test_connection()
+
+ def test_action_test_connection_requires_write_access(self):
+ """The public button alias must inherit the same write gate."""
+ with self.assertRaises(AccessError):
+ self.ds.with_user(self.user).action_test_connection()
+
+ def test_regular_user_context_can_clear_token_cache(self):
+ """The internal cache-clear (used on a 401 retry) must work regardless of
+ the current user's privilege — it writes admin-restricted fields via sudo."""
+ self.ds.sudo().write({"_oauth2_access_token": "some-token"})
+ # Called from server-side code running in a non-admin user context.
+ self.ds.with_user(self.user)._clear_oauth2_token_cache()
+ self.assertFalse(self.ds.sudo()._oauth2_access_token)
+
+ def test_regular_user_context_can_get_bearer_headers(self):
+ """A bearer-auth header must build in a non-admin user context: the
+ admin-only bearer_token is read via sudo inside the internal method."""
+ bearer_ds = self.DataSource.create(
+ {
+ "name": "Bearer DS",
+ "code": "bearer_ds_sec",
+ "base_url": "https://dci.example.org/api",
+ "auth_type": "bearer",
+ "our_sender_id": "openspp.test",
+ "bearer_token": "secret-bearer-token",
+ }
+ )
+ headers = bearer_ds.with_user(self.user)._get_headers()
+ self.assertEqual(headers.get("Authorization"), "Bearer secret-bearer-token")
diff --git a/spp_dci_client/tests/test_outgoing_log_integration.py b/spp_dci_client/tests/test_outgoing_log_integration.py
index 7cefec242..17dd66f17 100644
--- a/spp_dci_client/tests/test_outgoing_log_integration.py
+++ b/spp_dci_client/tests/test_outgoing_log_integration.py
@@ -381,7 +381,7 @@ def test_401_retry_creates_two_log_entries(self, mock_client_class):
envelope = self._build_test_envelope(client)
- with patch.object(ds, "clear_oauth2_token_cache"):
+ with patch.object(ds, "_clear_oauth2_token_cache"):
client._make_request("/registry/sync/search", envelope)
# Should have two log entries: one for 401, one for retry success