diff --git a/docs/index.rst b/docs/index.rst index 1c5d79bc..aad61ffc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -160,6 +160,54 @@ New in MSAL Python 1.26 .. automethod:: __init__ +mTLS Proof-of-Possession (SN/I certificate) +------------------------------------------- + +New in MSAL Python 1.38 + +MSAL Python supports two different kinds of Proof-of-Possession (PoP) tokens: + +* **Signed HTTP Request (SHR) PoP** -- used by public-client apps through the + broker, and configured with the ``auth_scheme`` parameter and + :py:class:`msal.PopAuthScheme` above. +* **mutual-TLS (mTLS) PoP** -- used by *confidential-client* apps. The app's + Subject Name + Issuer (SN/I) certificate is presented as the **client TLS + certificate** in a mutual-TLS handshake to Microsoft Entra, and the returned + access token is cryptographically bound to that certificate + (``token_type == "mtls_pop"``, ``cnf``/``x5t#S256``). + +To request an mTLS-PoP token, configure the confidential client with a +certificate credential and pass ``mtls_proof_of_possession=True`` to +:py:meth:`msal.ConfidentialClientApplication.acquire_token_for_client`:: + + app = msal.ConfidentialClientApplication( + client_id, + authority="https://login.microsoftonline.com/", # MUST be tenanted + client_credential={"private_key_pfx_path": "sni.pfx", "public_certificate": True}, + # azure_region="westus3", # optional; omit for the global mtls endpoint + ) + result = app.acquire_token_for_client( + ["https://graph.microsoft.com/.default"], + mtls_proof_of_possession=True, + ) + # result["token_type"] == "mtls_pop" + # result["binding_certificate"] == {"x5c": [...], "thumbprint_sha256": "..."} + +Notes and requirements: + +* The ``authority`` must be **tenanted** (not ``/common`` or ``/organizations``). +* MSAL must own the TLS transport, so a custom ``http_client`` is not supported + together with ``mtls_proof_of_possession=True``. +* The existing SN/I + Bearer (client-assertion) flow is unchanged; the same + certificate can be used either as an assertion signer (Bearer) or as the TLS + client certificate (mtls_pop). +* mTLS PoP currently targets the public and Azure Government (Arlington) clouds. +* For a Federated Identity Credential (FIC) exchange, configure the leg-2 client + with both a ``client_assertion`` (the leg-1 token) and an + ``mtls_binding_certificate`` sub-key; the leg-1 assertion is then sent as a + ``jwt-pop`` client assertion over the same mTLS connection. + + Exceptions ---------- These are exceptions that MSAL Python may raise. diff --git a/msal/application.py b/msal/application.py index 3fc69461..e3bc7143 100644 --- a/msal/application.py +++ b/msal/application.py @@ -4,6 +4,7 @@ import logging import sys import warnings +import base64 from threading import Lock from typing import Optional # Needed in Python 3.7 & 3.8 from urllib.parse import urlparse @@ -24,6 +25,7 @@ import msal.telemetry from .region import _detect_region, _validate_region from .throttled_http_client import ThrottledHttpClient +from . import mtls from .cloudshell import _is_running_in_cloud_shell from .sku import SKU, __version__ from .oauth2cli.authcode import is_wsl @@ -85,7 +87,7 @@ def _extract_cert_and_thumbprints(cert): # https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object - Requires cryptography 0.7+ sha256_thumbprint = cert.fingerprint(hashes.SHA256()).hex() sha1_thumbprint = cert.fingerprint(hashes.SHA1()).hex() # CodeQL [SM02167] for legacy support such as ADFS - return sha256_thumbprint, sha1_thumbprint, x5c + return sha256_thumbprint, sha1_thumbprint, x5c, cert_pem def _parse_pfx(pfx_path, passphrase_bytes): # Cert concepts https://security.stackexchange.com/a/226758/125264 @@ -96,8 +98,8 @@ def _parse_pfx(pfx_path, passphrase_bytes): f.read(), passphrase_bytes) if not (private_key and cert): raise ValueError("Your PFX file shall contain both private key and cert") - sha256_thumbprint, sha1_thumbprint, x5c = _extract_cert_and_thumbprints(cert) - return private_key, sha256_thumbprint, sha1_thumbprint, x5c + sha256_thumbprint, sha1_thumbprint, x5c, cert_pem = _extract_cert_and_thumbprints(cert) + return private_key, sha256_thumbprint, sha1_thumbprint, x5c, cert_pem def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): @@ -110,6 +112,76 @@ def _load_private_key_from_pem_str(private_key_pem_str, passphrase_bytes): ) +def _private_key_to_unencrypted_pem(private_key, passphrase_bytes=None): + """Normalize a private key to unencrypted PKCS8 PEM bytes. + + ``private_key`` may be a ``cryptography`` private-key object (as returned by + ``_parse_pfx``) or a PEM string/bytes (the raw ``private_key`` credential). + The result is suitable for ``ssl.SSLContext.load_cert_chain`` via a temp file. + """ + from cryptography.hazmat.primitives import serialization + if isinstance(private_key, (str, bytes)): + key_obj = serialization.load_pem_private_key( + _str2bytes(private_key) if isinstance(private_key, str) else private_key, + passphrase_bytes) + else: # Already a cryptography private-key object + key_obj = private_key + return key_obj.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption()) + + +def _load_mtls_cert_material(cert_credential): + """Load client-cert material for an mTLS PoP handshake from a cert credential. + + ``cert_credential`` is a certificate credential dict - either the app's main + ``client_credential`` (vanilla SN/I) or the ``mtls_binding_certificate`` + sub-dict (FIC leg 2). Returns a dict with the unencrypted-PEM private key, + the leaf cert PEM, ``x5c``, the SHA-256 thumbprint (hex), and ``key_id`` + (base64url ``x5t#S256``, used for cache binding). Raises ``ValueError`` when + the credential cannot yield mTLS-capable certificate material. + """ + passphrase_bytes = _str2bytes( + cert_credential["passphrase"] + ) if cert_credential.get("passphrase") else None + if cert_credential.get("private_key_pfx_path"): + private_key, sha256_thumbprint, _, x5c, cert_pem = _parse_pfx( + cert_credential["private_key_pfx_path"], passphrase_bytes) + elif cert_credential.get("private_key"): + public_cert = cert_credential.get("public_certificate") + if not isinstance(public_cert, str): + raise ValueError( + "mTLS Proof-of-Possession requires the certificate's public part. " + "Provide 'public_certificate' (PEM) alongside 'private_key', " + "or use 'private_key_pfx_path'.") + from cryptography import x509 + cert = x509.load_pem_x509_certificate(_str2bytes(public_cert)) + sha256_thumbprint, _, x5c, cert_pem = _extract_cert_and_thumbprints(cert) + private_key = ( + _load_private_key_from_pem_str( + cert_credential['private_key'], passphrase_bytes) + if passphrase_bytes + else cert_credential['private_key']) + else: + raise ValueError( + "mTLS Proof-of-Possession requires a certificate credential " + "(a 'private_key_pfx_path', or a 'private_key' plus 'public_certificate').") + if not sha256_thumbprint: + raise ValueError( + "mTLS Proof-of-Possession requires a SHA-256 certificate thumbprint.") + key_id = base64.urlsafe_b64encode( + bytes.fromhex(sha256_thumbprint)).rstrip(b"=").decode("ascii") + return { + "private_key_pem": _private_key_to_unencrypted_pem( + private_key, passphrase_bytes), + "cert_pem": cert_pem if isinstance(cert_pem, bytes) else _str2bytes(cert_pem), + "x5c": x5c, + "sha256_thumbprint": sha256_thumbprint, + "key_id": key_id, + } + + def _pii_less_home_account_id(home_account_id): parts = home_account_id.split(".") # It could contain one or two parts parts[0] = "********" @@ -210,6 +282,35 @@ def obtain_token_by_username_password(self, username, password, **kwargs): username, password, headers=headers, **kwargs) +class _MtlsClient(_ClientWithCcsRoutingInfo): + """A confidential-client OAuth2 client whose token request presents a + client certificate over mutual-TLS (mTLS Proof-of-Possession). + + An mtls_pop access token is bound to the certificate, so the cache isolates + it via ``key_id`` (the certificate's base64url ``x5t#S256``). That ``key_id`` + must reach the token cache (for both storage and lookup), but it must NOT be + sent to ESTS on the wire - ESTS derives the binding from the TLS-presented + certificate itself. This override therefore pops ``key_id`` out of the HTTP + request body and re-injects it into the token-obtaining event so the cache + still records it against the certificate. + """ + def obtain_token_for_client(self, scope=None, **kwargs): + data = kwargs.pop("data", {}) + key_id = data.pop("key_id", None) # Keep key_id OFF the wire + if key_id: + outer = kwargs.pop("on_obtaining_tokens", None) + + def _reinject_key_id(event): + # Re-attach key_id to the event's data so token_cache.add() + # binds the stored mtls_pop AT to the certificate. + event["data"] = dict(event.get("data", {}), key_id=key_id) + (outer or self.on_obtaining_tokens)(event) + + kwargs["on_obtaining_tokens"] = _reinject_key_id + return super(_MtlsClient, self).obtain_token_for_client( + scope=scope, data=data, **kwargs) + + def _msal_extension_check(): # Can't run this in module or class level otherwise you'll get circular import error try: @@ -383,6 +484,30 @@ def get_client_assertion(): still supported for backward compatibility but is discouraged because the assertion will eventually expire. + .. admonition:: Binding an assertion to an mTLS certificate (FIC leg 2) + + *Added in version 1.38.0*: + For a Federated Identity Credential (FIC) exchange over mutual-TLS + Proof-of-Possession, the ``client_assertion`` container may also + carry an ``mtls_binding_certificate`` sub-key -- a certificate + credential (same shape as the top-level cert credential) that is + presented as the client TLS certificate during the token request:: + + { + "client_assertion": leg1_result["access_token"], # the leg-1 mtls_pop token + "mtls_binding_certificate": { + "private_key_pfx_path": "/path/to/your.pfx", + "public_certificate": True, + }, + } + + When ``mtls_binding_certificate`` is present, MSAL sends the + assertion with ``client_assertion_type`` set to the ``jwt-pop`` + type and routes the request over mTLS using that binding + certificate. See + :func:`~msal.ConfidentialClientApplication.acquire_token_for_client`'s + ``mtls_proof_of_possession`` parameter for the full two-leg flow. + .. admonition:: Supporting reading client certificates from PFX files This usage will automatically use SHA-256 thumbprint of the certificate. @@ -648,6 +773,14 @@ def get_client_assertion(): 'Invalid exclude_scopes={}. You can not opt out "openid" scope'.format( repr(exclude_scopes))) + # Retained for the mTLS PoP transport, which needs its own dedicated + # requests.Session carrying the client certificate (see msal/mtls.py). + # A user-supplied http_client cannot perform the mTLS handshake, so + # requesting mtls_proof_of_possession=True with one fails fast. + self._http_client_is_custom = bool(http_client) + self._http_client_config = { + "verify": verify, "proxies": proxies, "timeout": timeout} + self._http_cache = http_cache # Reused by the lazily-built mTLS client if http_client: self.http_client = http_client else: @@ -716,6 +849,30 @@ def get_client_assertion(): self._region_detected = None self.client, self._regional_client = self._build_client( client_credential, self.authority) + + # mTLS Proof-of-Possession state. The certificate material and the + # dedicated mTLS client(s) are parsed/built lazily on the first + # mtls_proof_of_possession request (see _get_mtls_pop_cert / + # _get_mtls_client), so Bearer-only cert apps pay nothing and the + # sovereign/tenanted guardrails fire at request time. + self._mtls_pop_cert_material = None # Cached parse of the cert below + self._mtls_client = None # Lazily built global mTLS client + self._mtls_regional_client = None # Lazily built regional mTLS client + self._mtls_lock = Lock() + if isinstance(client_credential, dict) and client_credential.get( + "mtls_binding_certificate"): # FIC leg 2 (assertion + binding cert) + self._mtls_cert_credential = client_credential["mtls_binding_certificate"] + self._mtls_is_fic_leg2 = True + elif isinstance(client_credential, dict) and not client_credential.get( + "client_assertion") and ( + client_credential.get("private_key_pfx_path") + or client_credential.get("private_key")): # Vanilla SN/I cert + self._mtls_cert_credential = client_credential + self._mtls_is_fic_leg2 = False + else: # No certificate available to present over mTLS + self._mtls_cert_credential = None + self._mtls_is_fic_leg2 = False + # Warn if using a static string/bytes client_assertion (discouraged for long-running apps) if isinstance(client_credential, dict) and isinstance( client_credential.get("client_assertion"), (str, bytes)): @@ -801,12 +958,21 @@ def _decorate_scope( return list(decorated) def _build_telemetry_context( - self, api_id, correlation_id=None, refresh_reason=None): + self, api_id, correlation_id=None, refresh_reason=None, + token_type=None): return msal.telemetry._TelemetryContext( self._telemetry_buffer, self._telemetry_lock, api_id, - correlation_id=correlation_id, refresh_reason=refresh_reason) + correlation_id=correlation_id, refresh_reason=refresh_reason, + token_type=token_type) - def _get_regional_authority(self, central_authority) -> Optional[Authority]: + def _compute_region_to_use(self) -> Optional[str]: + """Resolve the effective Azure region (or None), honoring the + ``azure_region`` config, ``MSAL_FORCE_REGION``, and auto-detection. + + Factored out of :func:`_get_regional_authority` so the mTLS PoP client + (which builds a ``{region}.mtlsauth.*`` endpoint) can reuse the exact + same region resolution. + """ if self._region_configured is False: # User opts out of ESTS-R return None # Short circuit to completely bypass region detection if self._region_configured is None: # User did not make an ESTS-R choice @@ -825,6 +991,10 @@ def _get_regional_authority(self, central_authority) -> Optional[Authority]: region_to_use = _validate_region( region_to_use, source="azure_region parameter") logger.debug('Region to be used: {}'.format(repr(region_to_use))) + return region_to_use + + def _get_regional_authority(self, central_authority) -> Optional[Authority]: + region_to_use = self._compute_region_to_use() if region_to_use: regional_host = ("{}.login.microsoft.com".format(region_to_use) if central_authority.instance in ( @@ -843,6 +1013,99 @@ def _get_regional_authority(self, central_authority) -> Optional[Authority]: ) return None + def _default_client_headers(self): + default_headers = { + "x-client-sku": SKU, "x-client-ver": __version__, + "x-client-os": sys.platform, + "x-ms-lib-capability": "retry-after, h429", + } + if self.app_name: + default_headers['x-app-name'] = self.app_name + if self.app_version: + default_headers['x-app-ver'] = self.app_version + return default_headers + + def _get_mtls_pop_cert(self): + """Lazily parse and cache the client certificate material used as the + TLS client certificate for mTLS PoP. Raises ``ValueError`` when no + suitable certificate credential is configured.""" + if self._mtls_cert_credential is None: + raise ValueError( + "mtls_proof_of_possession=True requires this confidential client " + "to be configured with a certificate credential (a " + "'private_key_pfx_path', or a 'private_key' plus " + "'public_certificate'); or, for a Federated Identity Credential " + "exchange, a 'client_assertion' plus an 'mtls_binding_certificate'.") + with self._mtls_lock: + if self._mtls_pop_cert_material is None: + self._mtls_pop_cert_material = _load_mtls_cert_material( + self._mtls_cert_credential) + return self._mtls_pop_cert_material + + def _get_mtls_client(self, central_authority): + """Lazily build (and cache) the mTLS PoP client for ``central_authority``. + + The token endpoint host is transformed ``login.* -> [region.]mtlsauth.*`` + (region honored when configured, global otherwise), and the client + presents the configured certificate over mutual-TLS. For a FIC leg-2 + credential (``client_assertion`` + ``mtls_binding_certificate``) the + assertion is sent with ``client_assertion_type = ...:jwt-pop``. + """ + if self._http_client_is_custom: + raise ValueError( + "mtls_proof_of_possession=True is not supported with a custom " + "http_client, because MSAL must own the TLS transport to present " + "the client certificate in the mutual-TLS handshake. Omit the " + "http_client argument to use MSAL's built-in mTLS transport.") + region = self._compute_region_to_use() + with self._mtls_lock: + cached = self._mtls_regional_client if region else self._mtls_client + if cached is not None: + return cached + cert = self._get_mtls_pop_cert() # May raise ValueError (no cert configured) + token_endpoint = mtls.transform_token_endpoint( # login.* -> mtlsauth.* + central_authority.token_endpoint, central_authority.instance, region) + logger.debug("mTLS PoP token endpoint: %s", token_endpoint) + configuration = { + "authorization_endpoint": central_authority.authorization_endpoint, + "token_endpoint": token_endpoint, + "device_authorization_endpoint": + central_authority.device_authorization_endpoint, + } + http_client = ThrottledHttpClient( + mtls._MtlsHttpClient( + cert["cert_pem"], cert["private_key_pem"], + **self._http_client_config), + http_cache=self._http_cache, + default_throttle_time=5, + ) + if self._mtls_is_fic_leg2: # FIC leg 2: assertion carried as jwt-pop + client_assertion = self.client_credential["client_assertion"] + client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT_POP + else: # Vanilla SN/I: the TLS certificate alone authenticates the client + client_assertion = None + client_assertion_type = None + client = _MtlsClient( + configuration, + self.client_id, + http_client=http_client, + default_headers=self._default_client_headers(), + default_body={"client_info": 1}, + client_assertion=client_assertion, + client_assertion_type=client_assertion_type, + # Cache under the ORIGINAL login.* host, never the mtlsauth.* host, + # so mtls_pop ATs share the environment with the rest of the app. + on_obtaining_tokens=lambda event: self.token_cache.add(dict( + event, environment=central_authority.instance)), + on_removing_rt=self.token_cache.remove_rt, + on_updating_rt=self.token_cache.update_rt) + with self._mtls_lock: + if region: + self._mtls_regional_client = client + else: + self._mtls_client = client + return client + def _build_client(self, client_credential, authority, skip_regional_client=False): client_assertion = None client_assertion_type = None @@ -869,7 +1132,7 @@ def _build_client(self, client_credential, authority, skip_regional_client=False client_credential["passphrase"] ) if client_credential.get("passphrase") else None if client_credential.get("private_key_pfx_path"): - private_key, sha256_thumbprint, sha1_thumbprint, x5c = _parse_pfx( + private_key, sha256_thumbprint, sha1_thumbprint, x5c, _ = _parse_pfx( client_credential["private_key_pfx_path"], passphrase_bytes) if client_credential.get("public_certificate") is True and x5c: @@ -892,7 +1155,7 @@ def _build_client(self, client_credential, authority, skip_regional_client=False from cryptography import x509 cert = x509.load_pem_x509_certificate( _str2bytes(client_credential['public_certificate'])) - sha256_thumbprint, sha1_thumbprint, headers["x5c"] = ( + sha256_thumbprint, sha1_thumbprint, headers["x5c"], _ = ( _extract_cert_and_thumbprints(cert)) else: raise ValueError( @@ -2494,7 +2757,9 @@ class ConfidentialClientApplication(ClientApplication): # server-side web app except that ``allow_broker`` parameter shall remain ``None``. """ - def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, **kwargs): + def acquire_token_for_client( + self, scopes, claims_challenge=None, fmi_path=None, + mtls_proof_of_possession=False, **kwargs): """Acquires token for the current confidential client, not for an end user. Since MSAL Python 1.23, it will automatically look for token from cache, @@ -2518,6 +2783,40 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, scopes=["api://resource/.default"], fmi_path="SomeFmiPath/FmiCredentialPath", ) + :param bool mtls_proof_of_possession: + Optional. Defaults to ``False``. When ``True``, MSAL presents this + confidential client's certificate as the client certificate in a + mutual-TLS (mTLS) handshake to Microsoft Entra, and requests an + mTLS-bound Proof-of-Possession (PoP) access token + (``token_type == "mtls_pop"``) rather than a Bearer token. The same + Subject Name + Issuer (SN/I) certificate that would otherwise sign a + client assertion is instead used as the TLS client certificate, and + the returned token is cryptographically bound to it + (``cnf``/``x5t#S256``). + + Requirements: the app must be configured with a certificate + credential (``private_key_pfx_path``, or ``private_key`` + + ``public_certificate``) - or, for a Federated Identity Credential + (FIC) exchange, a ``client_assertion`` plus an + ``mtls_binding_certificate``; the ``authority`` must be tenanted (not + ``/common`` or ``/organizations``); and MSAL's built-in HTTP + transport must be in use (a custom ``http_client`` cannot perform + the mTLS handshake). Any of these unmet raises ``ValueError``. + + On success the result also contains a ``binding_certificate`` dict + with the **public** binding material only (never the private key):: + + result["binding_certificate"] == { + "x5c": ["", ...], + "thumbprint_sha256": "", + } + + Example usage:: + + result = cca.acquire_token_for_client( + ["https://graph.microsoft.com/.default"], + mtls_proof_of_possession=True, + ) :return: A dict representing the json response from Microsoft Entra: - A successful response would contain "access_token" key, @@ -2533,8 +2832,48 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, "fmi_path must be a string, got {}".format(type(fmi_path).__name__)) kwargs["data"] = kwargs.get("data", {}) kwargs["data"]["fmi_path"] = fmi_path - return _clean_up(self._acquire_token_silent_with_error( + if mtls_proof_of_possession or self._mtls_is_fic_leg2: + # An mTLS transport is required whenever we present the certificate: + # for a cert-bound PoP request (mtls_proof_of_possession=True) and + # for every FIC leg-2 request - there the leg-1 assertion is itself + # bound to the certificate, so even a Bearer final token must travel + # over the same mTLS connection ("implicit Bearer-over-mTLS"). + if self._http_client_is_custom: + raise ValueError( + "mtls_proof_of_possession=True is not supported with a custom " + "http_client, because MSAL must own the TLS transport to " + "present the client certificate in the mutual-TLS handshake. " + "Omit the http_client argument to use MSAL's built-in " + "mTLS transport.") + if self.authority.tenant.lower() in ("common", "organizations"): + raise ValueError( + "mtls_proof_of_possession=True requires a tenanted authority. " + "Use a specific tenant id or domain instead of " + "/common or /organizations.") + # Parse/validate the certificate now (fail fast). + mtls_cert = self._get_mtls_pop_cert() + data = dict(kwargs.get("data") or {}) + if mtls_proof_of_possession: + # Cert-bound PoP: request an mtls_pop token and bind its cache + # entry to the cert via key_id (base64url x5t#S256). token_type + # also routes _acquire_token_for_client() to the mTLS client. + data["token_type"] = "mtls_pop" + data["key_id"] = mtls_cert["key_id"] + # else: FIC leg-2 without the flag -> Bearer-over-mTLS. No token_type + # / key_id, so the final Bearer token caches normally; the mTLS + # client is still selected because self._mtls_is_fic_leg2 is True. + kwargs["data"] = data + result = _clean_up(self._acquire_token_silent_with_error( scopes, None, claims_challenge=claims_challenge, **kwargs)) + if mtls_proof_of_possession and result and "access_token" in result: + # Surface the PUBLIC binding certificate (never the private key), so + # callers - including FIC leg-2 / cross-app hand-off - can correlate + # the token to its cert. Survives _clean_up (key has no "_" prefix). + result["binding_certificate"] = { + "x5c": mtls_cert["x5c"], + "thumbprint_sha256": mtls_cert["key_id"], # base64url x5t#S256 + } + return result def _acquire_token_for_client( self, @@ -2548,10 +2887,19 @@ def _acquire_token_for_client( "Using /common or /organizations authority " "in acquire_token_for_client() is unreliable. " "Please use a specific tenant instead.", DeprecationWarning) - self._validate_ssh_cert_input_data(kwargs.get("data", {})) + data = kwargs.get("data", {}) + self._validate_ssh_cert_input_data(data) + is_mtls_pop = data.get("token_type") == "mtls_pop" telemetry_context = self._build_telemetry_context( - self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason) - client = self._regional_client or self.client + self.ACQUIRE_TOKEN_FOR_CLIENT_ID, refresh_reason=refresh_reason, + token_type=data.get("token_type")) + if is_mtls_pop or self._mtls_is_fic_leg2: + # Present the certificate over mTLS to obtain a cert-bound token + # (mtls_pop), or to carry a cert-bound FIC leg-1 assertion as jwt-pop + # (Bearer-over-mTLS when the flag is absent). + client = self._get_mtls_client(self.authority) + else: + client = self._regional_client or self.client response = client.obtain_token_for_client( scope=scopes, # This grant flow requires no scope decoration headers=telemetry_context.generate_headers(), diff --git a/msal/mtls.py b/msal/mtls.py new file mode 100644 index 00000000..a9e12e16 --- /dev/null +++ b/msal/mtls.py @@ -0,0 +1,192 @@ +"""Helpers for mTLS Proof-of-Possession (PoP). + +This module owns two concerns for the "SN/I certificate over mTLS PoP" feature: + +1. The endpoint transform + sovereign guardrail: mapping a tenanted ``login.*`` + authority to its ``mtlsauth.*`` counterpart (global or regional), and + rejecting clouds/hosts where mTLS PoP is not (yet) available. +2. The mTLS transport: a ``requests`` session whose HTTPS connections present a + client certificate for the mutual-TLS handshake to the token endpoint. + +The wire contract and host mapping mirror the shipped MSAL.NET +``RegionAndMtlsDiscoveryProvider`` so MSAL Python stays cross-SDK consistent. +""" +import logging +try: + from urllib.parse import urlparse, urlunparse +except ImportError: # Python 2 + from urlparse import urlparse, urlunparse + + +logger = logging.getLogger(__name__) + +_MTLS_POP_DOC_LINK = "https://aka.ms/msal-net-pop" + +# The global public mTLS host. The four public "login.*" hosts all normalize to +# this single global endpoint (ESTSR provides regional failover), matching +# MSAL.NET's PublicEnvForRegionalMtlsAuth. +_PUBLIC_MTLS_HOST = "mtlsauth.microsoft.com" + +# Public-cloud login hosts that normalize to the single global mTLS host above. +_PUBLIC_CLOUD_LOGIN_HOSTS = frozenset([ + "login.microsoftonline.com", + "login.microsoft.com", + "login.windows.net", + "sts.windows.net", + ]) + +# ───────────────────────────────────────────────────────────────────────────── +# SOVEREIGN GUARDRAIL - single override point for mTLS cloud availability. +# +# mTLS PoP is currently rejected for the deprecated sovereign login hosts below +# and for any non-"login." host. ``mtlsauth.*`` is rolling out across clouds +# (Azure Government / AGC: available; Bleu / Delos: TBD). To enable a cloud, +# remove its entry here (and, if needed, relax the non-"login." host check in +# ``mtls_pop_host``). This is the ONLY place cloud eligibility is enforced, so +# do not scatter equivalent checks elsewhere in the codebase. +# ───────────────────────────────────────────────────────────────────────────── +_MTLS_POP_UNSUPPORTED_HOSTS = { + "login.usgovcloudapi.net": + "login.usgovcloudapi.net is not supported for mTLS PoP, " + "please use login.microsoftonline.us", + "login.chinacloudapi.cn": + "login.chinacloudapi.cn is not supported for mTLS PoP, " + "please use login.partner.microsoftonline.cn", + } + + +def mtls_pop_host(instance, region=None): + """Return the ``mtlsauth.*`` host for a given ``login.*`` authority instance. + + :param str instance: The authority host, e.g. ``login.microsoftonline.com``. + :param region: Optional region, e.g. ``westus3``. When provided, a regional + mTLS host ``{region}.mtlsauth...`` is returned; otherwise the global one. + :raises ValueError: When mTLS PoP is not supported for the given host + (the sovereign guardrail above, or a non-``login.`` host). + """ + instance = instance.lower() + if instance in _MTLS_POP_UNSUPPORTED_HOSTS: # Sovereign guardrail + raise ValueError(_MTLS_POP_UNSUPPORTED_HOSTS[instance]) + if instance in _PUBLIC_CLOUD_LOGIN_HOSTS: + # Known public aliases (incl. legacy login.windows.net / sts.windows.net) + # all normalize to the single global mTLS host. + base = _PUBLIC_MTLS_HOST + elif instance.startswith("login."): # e.g. login.microsoftonline.us + base = "mtlsauth" + instance[len("login"):] # -> mtlsauth.microsoftonline.us + else: + raise ValueError( + "mTLS PoP is only supported for hosts that start with 'login.'. " + "The provided authority host ({}) does not meet this requirement. " + "See {} for details.".format(instance, _MTLS_POP_DOC_LINK)) + return "{}.{}".format(region, base) if region else base + + +def transform_token_endpoint(token_endpoint, instance, region=None): + """Return ``token_endpoint`` with its host swapped for the mTLS host. + + The path/tenant and everything else are preserved; only the network host is + rewritten (e.g. ``https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token`` + -> ``https://mtlsauth.microsoft.com/{tenant}/oauth2/v2.0/token``). + """ + parsed = urlparse(token_endpoint) + host = mtls_pop_host(instance, region) + # Preserve a non-default port if the original endpoint carried one (tests). + netloc = "{}:{}".format(host, parsed.port) if parsed.port else host + return urlunparse(parsed._replace(netloc=netloc)) + + +class _MtlsHttpClient(object): + """A minimal http client (``post``/``get``/``close``) whose HTTPS + connections present a client certificate for mutual-TLS. + + MSAL owns this transport. A caller's plain custom ``http_client`` cannot + perform the mTLS handshake, which is why requesting mTLS PoP with a + non-mTLS-capable custom transport fails fast (see application.py). + + The ``ssl.SSLContext`` (and its temp key file) is built lazily on first use, + so Bearer-only certificate apps never pay the cost nor touch the disk. + """ + def __init__(self, cert_pem, key_pem, *, + verify=True, proxies=None, timeout=None): + # cert_pem / key_pem are PEM-encoded bytes. key_pem must be an + # unencrypted private key (the caller normalizes it). + self._cert_pem = cert_pem + self._key_pem = key_pem + self._verify = verify + self._proxies = proxies + self._timeout = timeout + self._session = None + + def _ensure_session(self): + if self._session is None: + import requests # Lazy import, same as the rest of MSAL + session = requests.Session() + session.verify = self._verify + if self._proxies: + session.proxies = self._proxies + adapter = _make_mtls_adapter( + _create_ssl_context(self._cert_pem, self._key_pem)) + session.mount("https://", adapter) + self._session = session + return self._session + + def post(self, url, **kwargs): + if self._timeout is not None: + kwargs.setdefault("timeout", self._timeout) + return self._ensure_session().post(url, **kwargs) + + def get(self, url, **kwargs): + if self._timeout is not None: + kwargs.setdefault("timeout", self._timeout) + return self._ensure_session().get(url, **kwargs) + + def close(self): + if self._session is not None: + self._session.close() + + +def _make_mtls_adapter(ssl_context): + """Return a ``requests`` HTTPAdapter that injects ``ssl_context`` (with the + client certificate loaded) into every connection pool it creates.""" + from requests.adapters import HTTPAdapter # Lazy import + + class _MtlsHTTPAdapter(HTTPAdapter): + def __init__(self, ssl_context): + self._ssl_context = ssl_context + super(_MtlsHTTPAdapter, self).__init__(max_retries=1) + + def init_poolmanager(self, *args, **kwargs): + kwargs["ssl_context"] = self._ssl_context + return super(_MtlsHTTPAdapter, self).init_poolmanager(*args, **kwargs) + + def proxy_manager_for(self, *args, **kwargs): + kwargs["ssl_context"] = self._ssl_context + return super(_MtlsHTTPAdapter, self).proxy_manager_for(*args, **kwargs) + + return _MtlsHTTPAdapter(ssl_context) + + +def _create_ssl_context(cert_pem, key_pem): + """Build a client ``ssl.SSLContext`` that presents ``cert_pem``/``key_pem``. + + ``ssl.SSLContext.load_cert_chain`` requires a file path, but our key is + in memory. We write a ``0600`` temp PEM (mkstemp defaults to owner-only), + load it, then unlink it immediately - the context keeps the material in + memory, so nothing sensitive lingers on disk. + """ + import ssl + import os + import tempfile + context = ssl.create_default_context() # Verifies the server (ESTS) as usual + fd, path = tempfile.mkstemp(suffix=".pem") # Owner-only (0600) by default + try: + os.write(fd, key_pem + b"\n" + cert_pem) + os.close(fd) # Close before load_cert_chain reads it (required on Windows) + context.load_cert_chain(path) # Loads our client cert+key into memory + finally: + try: + os.remove(path) # Unlink immediately; minimal disk exposure + except OSError: # pragma: no cover + logger.warning("Unable to remove temporary mTLS key file") + return context + diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index 4590d52d..fa2419f9 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -43,6 +43,7 @@ def encode_saml_assertion(assertion): CLIENT_ASSERTION_TYPE_JWT = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" CLIENT_ASSERTION_TYPE_SAML2 = "urn:ietf:params:oauth:client-assertion-type:saml2-bearer" + CLIENT_ASSERTION_TYPE_JWT_POP = "urn:ietf:params:oauth:client-assertion-type:jwt-pop" client_assertion_encoders = {CLIENT_ASSERTION_TYPE_SAML2: encode_saml_assertion} @property diff --git a/msal/sku.py b/msal/sku.py index 5e58eda9..f3c7f97d 100644 --- a/msal/sku.py +++ b/msal/sku.py @@ -2,5 +2,5 @@ """ # The __init__.py will import this. Not the other way around. -__version__ = "1.37.0" +__version__ = "1.38.0" SKU = "MSAL.Python" diff --git a/msal/telemetry.py b/msal/telemetry.py index b07ab3ed..123d1fdd 100644 --- a/msal/telemetry.py +++ b/msal/telemetry.py @@ -14,6 +14,12 @@ AT_AGING = 4 RESERVED = 5 +# Server-telemetry token-type values (parity with the other MSALs, e.g. .NET's +# TelemetryTokenTypeConstants). Only non-Bearer token types are emitted. +_TELEMETRY_TOKEN_TYPES = { + "mtls_pop": 6, # mTLS-bound Proof-of-Possession + } + def _get_new_correlation_id(): return str(uuid.uuid4()) @@ -28,18 +34,31 @@ class _TelemetryContext(object): _CURRENT_HEADER_SIZE_LIMIT = 100 _LAST_HEADER_SIZE_LIMIT = 350 - def __init__(self, buffer, lock, api_id, correlation_id=None, refresh_reason=None): + def __init__(self, buffer, lock, api_id, correlation_id=None, + refresh_reason=None, token_type=None): self._buffer = buffer self._lock = lock self._api_id = api_id self._correlation_id = correlation_id or _get_new_correlation_id() self._refresh_reason = refresh_reason or NON_SILENT_CALL + self._token_type = token_type logger.debug("Generate or reuse correlation_id: %s", self._correlation_id) def generate_headers(self): with self._lock: - current = "4|{api_id},{cache_refresh}|".format( - api_id=self._api_id, cache_refresh=self._refresh_reason) + # MSAL Python's current schema (4) carries an EMPTY platform-config + # section after the second "|". To stay byte-for-byte unchanged for + # every existing flow, we only populate that section for token types + # that need it (currently mtls_pop -> value 6, placed in the 3rd + # platform field). This is a documented Python-schema divergence: + # other MSALs use schema 5, but the token-type *value* matches. + token_type_value = _TELEMETRY_TOKEN_TYPES.get(self._token_type) + platform_config = ( + ",,{}".format(token_type_value) + if token_type_value is not None else "") + current = "4|{api_id},{cache_refresh}|{platform_config}".format( + api_id=self._api_id, cache_refresh=self._refresh_reason, + platform_config=platform_config) if len(current) > self._CURRENT_HEADER_SIZE_LIMIT: logger.warning( "Telemetry header greater than {} will be truncated by AAD".format( diff --git a/msal/token_cache.py b/msal/token_cache.py index 0ca250df..95bf6af1 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -152,7 +152,10 @@ def __init__(self): realm=None, target=None, ext_cache_key=None, # Note: New field(s) can be added here - #key_id=None, + key_id=None, # So ATs bound to different keys/certs can + # coexist (e.g. an mtls_pop AT vs a Bearer AT for the + # same app+scope). key_id is absent for Bearer, which + # keeps the Bearer cache key byte-for-byte unchanged. **ignored_payload_from_a_real_token: "-".join([ # Note: Could use a hash here to shorten key length home_account_id or "", @@ -163,8 +166,8 @@ def __init__(self): client_id or "", realm or "", target or "", - #key_id or "", # So ATs of different key_id can coexist ] + ([ext_cache_key] if ext_cache_key else []) + + ([key_id] if key_id else []) # ATs of different key_id coexist ).lower(), self.CredentialType.ID_TOKEN: lambda home_account_id=None, environment=None, client_id=None, @@ -194,6 +197,7 @@ def _get_access_token( self, home_account_id, environment, client_id, realm, target, # Together they form a compound key ext_cache_key=None, + key_id=None, default=None, ): # O(1) return self._get( @@ -205,6 +209,7 @@ def _get_access_token( realm=realm, target=" ".join(target), ext_cache_key=ext_cache_key, + key_id=key_id, ), default=default) @@ -251,7 +256,8 @@ def search(self, credential_type, target=None, query=None, *, now=None): # O(n) preferred_result = self._get_access_token( query["home_account_id"], query["environment"], query["client_id"], query["realm"], target, - ext_cache_key=query.get("ext_cache_key")) + ext_cache_key=query.get("ext_cache_key"), + key_id=query.get("key_id")) if preferred_result and self._is_matching( preferred_result, query, # Needs no target_set here because it is satisfied by dict key @@ -284,6 +290,14 @@ def search(self, credential_type, target=None, query=None, *, now=None): # O(n) and "ext_cache_key" not in (query or {}) ): continue + # Cache isolation for key-bound tokens (e.g. mtls_pop, SSH-cert). + # An entry bound to a key_id must not satisfy a query without + # one, so a Bearer lookup never returns a PoP/mtls_pop token. + if (credential_type == self.CredentialType.ACCESS_TOKEN + and "key_id" in entry + and "key_id" not in (query or {}) + ): + continue yield entry for at in expired_access_tokens: self.remove_at(at) diff --git a/sample/confidential_client_mtls_pop_sample.py b/sample/confidential_client_mtls_pop_sample.py new file mode 100644 index 00000000..26bcabaa --- /dev/null +++ b/sample/confidential_client_mtls_pop_sample.py @@ -0,0 +1,136 @@ +""" +This sample demonstrates a confidential-client (daemon) application that uses a +Subject Name + Issuer (SN/I) certificate as the client TLS certificate in a +mutual-TLS (mTLS) handshake to Microsoft Entra, and obtains an mTLS-bound +Proof-of-Possession (PoP) access token (``token_type == "mtls_pop"``). + +It also shows the optional two-leg Federated Identity Credential (FIC) exchange +over mTLS PoP. + +Prerequisites +------------- +* A confidential-client app registration configured with a certificate. +* The SAME certificate available locally as a PFX file (SN/I). The certificate + is used as the TLS client certificate -- it is NOT used to sign a client + assertion here. +* A **tenanted** authority (mTLS PoP does not work with ``/common`` or + ``/organizations``). +* The resource you request a token for must be allow-listed by Entra for mTLS + PoP (for example Microsoft Graph or Azure Key Vault). + +Configuration is read from environment variables so no secrets are hard-coded:: + + CLIENT_ID your application (client) id + AUTHORITY e.g. https://login.microsoftonline.com/ + PFX_PATH path to the .pfx file holding the SN/I cert + private key + PFX_PASSPHRASE (optional) passphrase protecting the .pfx + SCOPE e.g. https://graph.microsoft.com/.default + AZURE_REGION (optional) e.g. westus3; omit for the global mtls endpoint + +Then run:: + + python confidential_client_mtls_pop_sample.py +""" + +import json +import os + +import msal + + +def _build_cert_credential(): + credential = { + "private_key_pfx_path": os.environ["PFX_PATH"], + # "public_certificate": True opts in to Subject Name/Issuer (SN/I) auth, + # which is what makes the full cert chain (x5c) available for mTLS PoP. + "public_certificate": True, + } + passphrase = os.getenv("PFX_PASSPHRASE") + if passphrase: + credential["passphrase"] = passphrase + return credential + + +def vanilla_sni_mtls_pop(): + """Acquire an mTLS-bound PoP token using the SN/I cert as the TLS client cert.""" + app = msal.ConfidentialClientApplication( + os.environ["CLIENT_ID"], + authority=os.environ["AUTHORITY"], # MUST be tenanted + client_credential=_build_cert_credential(), + # Note: do NOT pass a custom http_client here. MSAL must own the + # transport to present the client certificate during the TLS handshake. + azure_region=os.getenv("AZURE_REGION"), # optional; None -> global endpoint + ) + + result = app.acquire_token_for_client( + os.environ["SCOPE"].split(), + mtls_proof_of_possession=True, # <-- request an mTLS PoP token + ) + + if "access_token" in result: + print("token_type:", result["token_type"]) # -> "mtls_pop" + # Public binding material only (never the private key): + print("binding_certificate:", json.dumps( + result.get("binding_certificate"), indent=2)) + print("token_source:", result.get("token_source")) + else: + print("Token acquisition failed:", result.get("error"), + result.get("error_description")) + return result + + +def fic_two_leg_over_mtls_pop(): + """Optional: two-leg Federated Identity Credential (FIC) exchange over mTLS PoP. + + Leg 1: the SN/I confidential client acquires a cert-bound mtls_pop token for + the token-exchange audience. + Leg 2: a second confidential client presents that token as a ``jwt-pop`` + client assertion, over an mTLS connection bound by the SAME cert, to + obtain the final token (Bearer or mtls_pop). + """ + cert = _build_cert_credential() + + # --- Leg 1: acquire the federated (cert-bound) assertion --------------- + leg1_app = msal.ConfidentialClientApplication( + os.environ["CLIENT_ID"], + authority=os.environ["AUTHORITY"], + client_credential=cert, + azure_region=os.getenv("AZURE_REGION"), + ) + # The exchange audience is caller-supplied, not hard-coded by MSAL. + exchange_scope = os.getenv( + "FIC_EXCHANGE_SCOPE", "api://AzureADTokenExchange/.default") + leg1 = leg1_app.acquire_token_for_client( + [exchange_scope], mtls_proof_of_possession=True) + if "access_token" not in leg1: + print("Leg 1 failed:", leg1.get("error_description")) + return leg1 + + # --- Leg 2: exchange the leg-1 token for the final token --------------- + leg2_app = msal.ConfidentialClientApplication( + os.getenv("LEG2_CLIENT_ID", os.environ["CLIENT_ID"]), + authority=os.environ["AUTHORITY"], + client_credential={ + "client_assertion": leg1["access_token"], # the leg-1 mtls_pop token + "mtls_binding_certificate": cert, # binds the leg-2 TLS handshake + }, + azure_region=os.getenv("AZURE_REGION"), + ) + + # Final token as mTLS PoP (drop mtls_proof_of_possession for Bearer-over-mTLS): + leg2 = leg2_app.acquire_token_for_client( + os.environ["SCOPE"].split(), mtls_proof_of_possession=True) + if "access_token" in leg2: + print("Leg 2 token_type:", leg2["token_type"]) # -> "mtls_pop" + else: + print("Leg 2 failed:", leg2.get("error_description")) + return leg2 + + +if __name__ == "__main__": + print("=== Vanilla SN/I -> mTLS PoP ===") + vanilla_sni_mtls_pop() + + if os.getenv("RUN_FIC_SAMPLE"): + print("\n=== FIC two-leg over mTLS PoP ===") + fic_two_leg_over_mtls_pop() diff --git a/tests/test_application.py b/tests/test_application.py index 31f77a71..7ed21413 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1586,4 +1586,167 @@ def mock_post(url, headers=None, data=None, *args, **kwargs): self.assertEqual( captured_value, captured_data.get("client_assertion"), "Lambda with defaulted params should return its default value, " - "not receive a context dict") \ No newline at end of file + "not receive a context dict") + + +import os as _os +_MTLS_PFX = _os.path.join(_os.path.dirname(__file__), "certificate-with-password.pfx") +_MTLS_CERT_CRED = { + "private_key_pfx_path": _MTLS_PFX, + "passphrase": "password", + "public_certificate": True, +} +_MTLS_OIDC_MOCK = Mock(return_value={ + "authorization_endpoint": "https://login.microsoftonline.com/my_tenant/oauth2/v2.0/authorize", + "token_endpoint": "https://login.microsoftonline.com/my_tenant/oauth2/v2.0/token", + "issuer": "https://login.microsoftonline.com/my_tenant/v2.0", +}) +_JWT_BEARER = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer" +_JWT_POP = "urn:ietf:params:oauth:client-assertion-type:jwt-pop" + + +@patch(_OIDC_DISCOVERY, new=_MTLS_OIDC_MOCK) +class TestMtlsProofOfPossession(unittest.TestCase): + """SN/I certificate as the client TLS cert over mTLS Proof-of-Possession.""" + + _AUTHORITY = "https://login.microsoftonline.com/my_tenant" + + def _capturing_post(self, captured, token_type="mtls_pop"): + def mock_post(url, headers=None, data=None, *a, **k): + captured.append({ + "url": url, "data": dict(data or {}), "headers": dict(headers or {})}) + return MinimalResponse(status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600, "token_type": token_type})) + return mock_post + + def test_vanilla_sni_mtls_pop_request_and_result(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, authority=self._AUTHORITY) + captured = [] + result = app.acquire_token_for_client( + ["s1"], mtls_proof_of_possession=True, post=self._capturing_post(captured)) + req = captured[0] + # Endpoint transformed to the global mtls host + self.assertEqual( + req["url"], + "https://mtlsauth.microsoft.com/my_tenant/oauth2/v2.0/token") + # Wire body: token_type=mtls_pop, and NO client_assertion / req_cnf / key_id + self.assertEqual("mtls_pop", req["data"].get("token_type")) + self.assertNotIn("client_assertion", req["data"]) + self.assertNotIn("req_cnf", req["data"]) + self.assertNotIn("key_id", req["data"], "key_id must never travel on the wire") + # Telemetry marks token-type 6 (parity with .NET MtlsPop) + self.assertEqual("4|730,2|,,6", req["headers"].get(CLIENT_CURRENT_TELEMETRY)) + # Result carries token_type and the PUBLIC binding cert (no private key) + self.assertEqual("mtls_pop", result.get("token_type")) + binding = result["binding_certificate"] + self.assertEqual({"x5c", "thumbprint_sha256"}, set(binding)) + self.assertNotIn("private", json.dumps(binding).lower()) + + def test_repeat_call_is_cache_hit(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, authority=self._AUTHORITY) + app.acquire_token_for_client( + ["s1"], mtls_proof_of_possession=True, post=self._capturing_post([])) + second = [] + result = app.acquire_token_for_client( + ["s1"], mtls_proof_of_possession=True, post=self._capturing_post(second)) + self.assertEqual([], second, "Second call must be served from cache") + self.assertEqual(app._TOKEN_SOURCE_CACHE, result[app._TOKEN_SOURCE]) + self.assertTrue(result.get("binding_certificate")) + + def test_backward_compatible_bearer_is_unchanged(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, authority=self._AUTHORITY) + captured = [] + result = app.acquire_token_for_client( + ["s1"], post=self._capturing_post(captured, token_type="Bearer")) + req = captured[0] + # Original login.* endpoint, classic SN/I private_key_jwt (Bearer) + self.assertEqual( + req["url"], "https://login.microsoftonline.com/my_tenant/oauth2/v2.0/token") + self.assertTrue(req["data"].get("client_assertion")) + self.assertEqual(_JWT_BEARER, req["data"].get("client_assertion_type")) + self.assertEqual("4|730,2|", req["headers"].get(CLIENT_CURRENT_TELEMETRY)) + self.assertEqual("Bearer", result.get("token_type")) + self.assertNotIn("binding_certificate", result) + + def test_regional_mtls_endpoint(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, authority=self._AUTHORITY, + azure_region="westus3") + captured = [] + app.acquire_token_for_client( + ["s1"], mtls_proof_of_possession=True, post=self._capturing_post(captured)) + self.assertEqual( + captured[0]["url"], + "https://westus3.mtlsauth.microsoft.com/my_tenant/oauth2/v2.0/token") + + def test_fic_leg2_pop_sends_jwt_pop_over_mtls(self): + app = ConfidentialClientApplication( + "cid", authority=self._AUTHORITY, + client_credential={ + "client_assertion": "LEG1_TOKEN", + "mtls_binding_certificate": _MTLS_CERT_CRED}) + captured = [] + result = app.acquire_token_for_client( + ["s1"], mtls_proof_of_possession=True, post=self._capturing_post(captured)) + req = captured[0] + self.assertTrue(req["url"].startswith("https://mtlsauth.microsoft.com/")) + self.assertEqual("LEG1_TOKEN", req["data"].get("client_assertion")) + self.assertEqual(_JWT_POP, req["data"].get("client_assertion_type")) + self.assertEqual("mtls_pop", req["data"].get("token_type")) + self.assertEqual("mtls_pop", result.get("token_type")) + + def test_fic_leg2_bearer_still_travels_over_mtls(self): + # No flag -> Bearer final token, but the cert-bound leg-1 assertion still + # requires the mTLS endpoint and jwt-pop ("implicit Bearer-over-mTLS"). + app = ConfidentialClientApplication( + "cid", authority=self._AUTHORITY, + client_credential={ + "client_assertion": "LEG1_TOKEN", + "mtls_binding_certificate": _MTLS_CERT_CRED}) + captured = [] + result = app.acquire_token_for_client( + ["s1"], post=self._capturing_post(captured, token_type="Bearer")) + req = captured[0] + self.assertTrue(req["url"].startswith("https://mtlsauth.microsoft.com/")) + self.assertEqual(_JWT_POP, req["data"].get("client_assertion_type")) + self.assertNotIn("token_type", req["data"]) + self.assertEqual("4|730,2|", req["headers"].get(CLIENT_CURRENT_TELEMETRY)) + self.assertEqual("Bearer", result.get("token_type")) + self.assertNotIn("binding_certificate", result) + + def test_secret_credential_with_flag_raises(self): + app = ConfidentialClientApplication( + "cid", client_credential="a_secret", authority=self._AUTHORITY) + with self.assertRaises(ValueError): + app.acquire_token_for_client(["s1"], mtls_proof_of_possession=True) + + def test_string_assertion_without_binding_cert_with_flag_raises(self): + app = ConfidentialClientApplication( + "cid", client_credential={"client_assertion": "just_a_string"}, + authority=self._AUTHORITY) + with self.assertRaises(ValueError): + app.acquire_token_for_client(["s1"], mtls_proof_of_possession=True) + + def test_custom_http_client_with_flag_fails_fast(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, authority=self._AUTHORITY, + http_client=MinimalHttpClient()) + with self.assertRaises(ValueError): + app.acquire_token_for_client(["s1"], mtls_proof_of_possession=True) + + +@patch(_OIDC_DISCOVERY, new=Mock(return_value={ + "authorization_endpoint": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize", + "token_endpoint": "https://login.microsoftonline.com/common/oauth2/v2.0/token", + "issuer": "https://login.microsoftonline.com/{tenantid}/v2.0", +})) +class TestMtlsProofOfPossessionUntenantedAuthority(unittest.TestCase): + def test_common_authority_with_flag_raises(self): + app = ConfidentialClientApplication( + "cid", client_credential=_MTLS_CERT_CRED, + authority="https://login.microsoftonline.com/common") + with self.assertRaises(ValueError): + app.acquire_token_for_client(["s1"], mtls_proof_of_possession=True) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 37632ee7..1bc6285f 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -441,6 +441,106 @@ def test_subject_name_issuer_authentication(self): self.assertIn('access_token', result) self.assertCacheWorksForApp(result, scope) + # ── SN/I certificate over mTLS Proof-of-Possession ───────────────────────── + # These reuse the same lab SN/I cert as test_subject_name_issuer_authentication + # (it is not CNG-backed, so it works as a TLS client cert on the CI agents). + # NOTE: MSAL must own the TLS transport for mTLS, so unlike the tests above we + # do NOT pass http_client=MinimalHttpClient() here. + # ESTS gates mTLS PoP on the final resource audience (must be allow-listed, + # e.g. MS Graph), not on the client app, so we target Microsoft Graph. + _MTLS_POP_SCOPE = ["https://graph.microsoft.com/.default"] + + def _assert_pop_cache_hit(self, app, scope): + # A second mTLS-PoP call for the same scope must be served from cache. + # (assertCacheWorksForApp cannot be reused: its follow-up call omits the + # flag and would look up a Bearer entry, missing the mtls_pop token.) + again = app.acquire_token_for_client(scope, mtls_proof_of_possession=True) + self.assertIsNotNone(again) + self.assertEqual(app._TOKEN_SOURCE_CACHE, again[app._TOKEN_SOURCE]) + + @staticmethod + def _cnf_x5t_s256(access_token): + # Best-effort decode of an mtls_pop JWT's cnf.x5t#S256 binding claim. + try: + payload = json.loads(decode_part(access_token.split(".")[1])) + return payload.get("cnf", {}).get("x5t#S256") + except Exception: # Opaque/non-JWT token - skip the offline binding check + return None + + def test_sni_mtls_pop_for_client(self): + from tests.lab_config import get_client_certificate + if not clean_env("LAB_APP_CLIENT_CERT_PFX_PATH"): + self.skipTest("LAB_APP_CLIENT_CERT_PFX_PATH not set") + self.app = msal.ConfidentialClientApplication( + LAB_APP_CLIENT_ID, + authority="https://login.microsoftonline.com/microsoft.onmicrosoft.com", + client_credential=get_client_certificate()) + result = self.app.acquire_token_for_client( + self._MTLS_POP_SCOPE, mtls_proof_of_possession=True) + self.assertIn("access_token", result, "mTLS PoP request failed: %s" % result) + self.assertEqual("mtls_pop", result.get("token_type")) + binding = result.get("binding_certificate") + self.assertTrue(binding and binding.get("x5c") and binding.get("thumbprint_sha256")) + self.assertNotIn("private", json.dumps(binding).lower()) + cnf = self._cnf_x5t_s256(result["access_token"]) + if cnf: # When the AT is a decodable JWT, its cnf must match our cert + self.assertEqual(binding["thumbprint_sha256"], cnf) + self._assert_pop_cache_hit(self.app, self._MTLS_POP_SCOPE) + + def test_sni_mtls_pop_for_client_regional(self): + from tests.lab_config import get_client_certificate + if not clean_env("LAB_APP_CLIENT_CERT_PFX_PATH"): + self.skipTest("LAB_APP_CLIENT_CERT_PFX_PATH not set") + self.app = msal.ConfidentialClientApplication( + LAB_APP_CLIENT_ID, + authority="https://login.microsoftonline.com/microsoft.onmicrosoft.com", + client_credential=get_client_certificate(), + azure_region="westus3") + result = self.app.acquire_token_for_client( + self._MTLS_POP_SCOPE, mtls_proof_of_possession=True) + if result.get("error") in ("invalid_request", "temporarily_unavailable"): + self.skipTest("Regional mTLS endpoint not reachable: %s" % result) + self.assertIn("access_token", result, "Regional mTLS PoP failed: %s" % result) + self.assertEqual("mtls_pop", result.get("token_type")) + + def test_fic_two_leg_over_mtls_pop(self): + from tests.lab_config import get_client_certificate + if not clean_env("LAB_APP_CLIENT_CERT_PFX_PATH"): + self.skipTest("LAB_APP_CLIENT_CERT_PFX_PATH not set") + authority = "https://login.microsoftonline.com/microsoft.onmicrosoft.com" + # Leg 1: SN/I cert -> cert-bound (mtls_pop) federated assertion for the + # caller-supplied token-exchange audience. + leg1_app = msal.ConfidentialClientApplication( + LAB_APP_CLIENT_ID, authority=authority, + client_credential=get_client_certificate()) + leg1 = leg1_app.acquire_token_for_client( + ["api://AzureADTokenExchange/.default"], mtls_proof_of_possession=True) + if "access_token" not in leg1: + self.skipTest("FIC leg 1 not pre-authorized for token exchange: %s" % leg1) + self.assertEqual("mtls_pop", leg1.get("token_type"), "Leg 1 must be mtls_pop") + leg1_thumbprint = leg1["binding_certificate"]["thumbprint_sha256"] + + # Leg 2: same cert binds the TLS handshake, leg-1 token carried as jwt-pop. + leg2_app = msal.ConfidentialClientApplication( + LAB_APP_CLIENT_ID, authority=authority, + client_credential={ + "client_assertion": leg1["access_token"], + "mtls_binding_certificate": get_client_certificate()}) + # Leg 2 -> Bearer over mTLS (implicit Bearer-over-mTLS, no flag) + leg2_bearer = leg2_app.acquire_token_for_client(self._MTLS_POP_SCOPE) + self.assertIn("access_token", leg2_bearer, "Leg 2 (Bearer) failed: %s" % leg2_bearer) + # Leg 2 -> mTLS PoP final token + leg2_pop = leg2_app.acquire_token_for_client( + self._MTLS_POP_SCOPE, mtls_proof_of_possession=True) + self.assertIn("access_token", leg2_pop, "Leg 2 (PoP) failed: %s" % leg2_pop) + self.assertEqual("mtls_pop", leg2_pop.get("token_type")) + # The final PoP token must be bound to the leg-1 certificate thumbprint + cnf = self._cnf_x5t_s256(leg2_pop["access_token"]) + if cnf: + self.assertEqual(leg1_thumbprint, cnf, + "Final token cnf must be bound to the leg-1 cert thumbprint") + + class DeviceFlowTestCase(E2eTestCase): # A leaf class so it will be run only once @classmethod def setUpClass(cls): diff --git a/tests/test_mtls_transport.py b/tests/test_mtls_transport.py new file mode 100644 index 00000000..bd0ea35d --- /dev/null +++ b/tests/test_mtls_transport.py @@ -0,0 +1,146 @@ +import os +import ssl +import unittest + +from msal import mtls +from msal.application import _load_mtls_cert_material + + +class TestMtlsEndpointTransform(unittest.TestCase): + """The login.* -> [region.]mtlsauth.* transform and the sovereign guardrail. + + These values mirror MSAL.NET's RegionAndMtlsDiscoveryProvider so MSAL Python + stays wire-compatible across SDKs. + """ + + def test_public_hosts_normalize_to_global_mtls_host(self): + for host in [ + "login.microsoftonline.com", "login.microsoft.com", + "login.windows.net", "sts.windows.net", + ]: + self.assertEqual(mtls.mtls_pop_host(host), "mtlsauth.microsoft.com") + + def test_case_insensitive_host(self): + self.assertEqual( + mtls.mtls_pop_host("Login.MicrosoftOnline.Com"), + "mtlsauth.microsoft.com") + + def test_region_prefixes_global_host(self): + self.assertEqual( + mtls.mtls_pop_host("login.microsoftonline.com", region="westus3"), + "westus3.mtlsauth.microsoft.com") + + def test_non_public_login_host_keeps_its_suffix(self): + # e.g. a national cloud that IS supported keeps its own suffix + self.assertEqual( + mtls.mtls_pop_host("login.microsoftonline.us"), + "mtlsauth.microsoftonline.us") + + def test_transform_token_endpoint_swaps_only_the_host(self): + self.assertEqual( + mtls.transform_token_endpoint( + "https://login.microsoftonline.com/my_tenant/oauth2/v2.0/token", + "login.microsoftonline.com"), + "https://mtlsauth.microsoft.com/my_tenant/oauth2/v2.0/token") + + def test_transform_token_endpoint_regional(self): + self.assertEqual( + mtls.transform_token_endpoint( + "https://login.microsoftonline.com/my_tenant/oauth2/v2.0/token", + "login.microsoftonline.com", region="westus3"), + "https://westus3.mtlsauth.microsoft.com/my_tenant/oauth2/v2.0/token") + + def test_transform_preserves_non_default_port(self): + self.assertEqual( + mtls.transform_token_endpoint( + "https://login.microsoftonline.com:8443/t/token", + "login.microsoftonline.com"), + "https://mtlsauth.microsoft.com:8443/t/token") + + def test_sovereign_us_gov_fails_fast(self): + with self.assertRaises(ValueError) as cm: + mtls.mtls_pop_host("login.usgovcloudapi.net") + self.assertIn("login.microsoftonline.us", str(cm.exception)) + + def test_sovereign_china_fails_fast(self): + with self.assertRaises(ValueError) as cm: + mtls.mtls_pop_host("login.chinacloudapi.cn") + self.assertIn("login.partner.microsoftonline.cn", str(cm.exception)) + + def test_non_login_host_fails_fast(self): + with self.assertRaises(ValueError): + mtls.mtls_pop_host("contoso.example.com") + + +class TestMtlsSslContext(unittest.TestCase): + """The mTLS SSLContext is built from in-memory PEM and leaves no key on disk.""" + + _PFX = os.path.join(os.path.dirname(__file__), "certificate-with-password.pfx") + _PASSPHRASE = "password" + + def _material(self): + return _load_mtls_cert_material({ + "private_key_pfx_path": self._PFX, + "passphrase": self._PASSPHRASE, + "public_certificate": True, + }) + + def test_cert_material_shape(self): + material = self._material() + self.assertTrue(material["private_key_pem"].startswith(b"-----BEGIN ")) + self.assertIn(b"PRIVATE KEY", material["private_key_pem"]) + self.assertIn(b"BEGIN CERTIFICATE", material["cert_pem"]) + self.assertTrue(material["x5c"]) + self.assertEqual(len(material["sha256_thumbprint"]), 64) # hex sha256 + # key_id is the base64url (unpadded) x5t#S256 used for cache binding + self.assertNotIn("=", material["key_id"]) + + def test_create_ssl_context_loads_cert_and_unlinks_tempfile(self): + material = self._material() + created = {} + # Capture the temp path that _create_ssl_context uses, then assert it is + # gone after the call (the key must not linger on disk). + import tempfile as _tempfile + orig_mkstemp = _tempfile.mkstemp + + def spy_mkstemp(*a, **k): + fd, path = orig_mkstemp(*a, **k) + created["path"] = path + return fd, path + + _tempfile.mkstemp = spy_mkstemp + try: + context = mtls._create_ssl_context( + material["cert_pem"], material["private_key_pem"]) + finally: + _tempfile.mkstemp = orig_mkstemp + self.assertIsInstance(context, ssl.SSLContext) + self.assertIn("path", created) + self.assertFalse( + os.path.exists(created["path"]), + "temp key file must be unlinked immediately after load_cert_chain") + + def test_adapter_injects_ssl_context(self): + material = self._material() + context = mtls._create_ssl_context( + material["cert_pem"], material["private_key_pem"]) + adapter = mtls._make_mtls_adapter(context) + # The adapter builds its default poolmanager on construction; our + # ssl_context must have been injected into the pool's connection kwargs. + self.assertIs( + adapter.poolmanager.connection_pool_kw.get("ssl_context"), context) + + def test_http_client_builds_session_lazily(self): + material = self._material() + client = mtls._MtlsHttpClient( + material["cert_pem"], material["private_key_pem"]) + self.assertIsNone(client._session, "session must not be built until first use") + session = client._ensure_session() + self.assertIsNotNone(session) + # The https adapter is our mTLS adapter (cert-bearing), not the default + self.assertIn("https://", session.adapters) + client.close() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_optional_thumbprint.py b/tests/test_optional_thumbprint.py index 56ec4013..1932bf6a 100644 --- a/tests/test_optional_thumbprint.py +++ b/tests/test_optional_thumbprint.py @@ -54,7 +54,8 @@ def _setup_certificate_mocks(self, mock_extract, mock_load_cert): mock_extract.return_value = ( "mock_sha256_thumbprint", # sha256_thumbprint "mock_sha1_thumbprint", # sha1_thumbprint - ["mock_x5c_value"] # x5c + ["mock_x5c_value"], # x5c + "-----BEGIN CERTIFICATE-----\nmock\n-----END CERTIFICATE-----\n", # cert_pem ) def _verify_assertion_params(self, mock_jwt_creator_class, expected_algorithm, diff --git a/tests/test_token_cache.py b/tests/test_token_cache.py index d7dfe8de..663c3119 100644 --- a/tests/test_token_cache.py +++ b/tests/test_token_cache.py @@ -246,8 +246,53 @@ def test_access_tokens_with_different_key_id(self): self._test_data_should_be_saved_and_searchable_in_access_token({"key_id": "2"}) self.assertEqual( len(self.cache._cache["AccessToken"]), - 1, """Historically, tokens are not keyed by key_id, -so a new token overwrites the old one, and we would end up with 1 token in cache""") + 2, """Access tokens are now isolated by key_id, so that a key-bound +token (SSH-cert or mTLS PoP) never overwrites another one bound to a different +key, nor a plain Bearer token for the same scope. We therefore keep 2 tokens.""") + # Each key-bound token remains independently searchable by its own key_id + for key_id in ("1", "2"): + self.assertFoundAccessToken( + scopes=["s1", "s2", "s3"], now=1000, query=dict( + key_id=key_id, + client_id="my_client_id", + environment="login.example.com", + realm="contoso", + home_account_id="uid.utid", + )) + + def test_bearer_and_mtls_pop_tokens_coexist_and_isolate(self): + # The crux of mTLS PoP cache isolation (plan C6): a Bearer token and an + # mtls_pop token for the SAME app/scope/tenant must coexist, and an + # unkeyed (Bearer) lookup must never return the key-bound mtls_pop token. + scopes = ["s2", "s1", "s3"] + now = 1000 + common = dict( + client_id="my_client_id", scope=scopes, + token_endpoint="https://login.example.com/contoso/v2/token") + # 1) Store a plain Bearer AT (no key_id) + self.cache.add(dict(common, data={}, response=build_response( + uid="uid", utid="utid", expires_in=3600, + access_token="bearer_at", token_type="Bearer")), now=now) + # 2) Store an mtls_pop AT bound to a cert key_id, same scope + self.cache.add(dict(common, data={"key_id": "THUMB"}, response=build_response( + uid="uid", utid="utid", expires_in=3600, + access_token="mtls_at", token_type="mtls_pop")), now=now) + self.assertEqual( + 2, len(self.cache._cache["AccessToken"]), + "Bearer and mtls_pop ATs for the same scope must be separate entries") + base_query = dict( + client_id="my_client_id", environment="login.example.com", + realm="contoso", home_account_id="uid.utid") + # A Bearer lookup (no key_id) must return ONLY the Bearer token + bearer = self.assertFoundAccessToken( + scopes=scopes, now=now, query=dict(base_query)) + self.assertEqual("bearer_at", bearer["secret"]) + self.assertEqual("Bearer", bearer["token_type"]) + # A keyed lookup must return the mtls_pop token + pop = self.assertFoundAccessToken( + scopes=scopes, now=now, query=dict(base_query, key_id="THUMB")) + self.assertEqual("mtls_at", pop["secret"]) + self.assertEqual("mtls_pop", pop["token_type"]) def test_refresh_in_should_be_recorded_as_refresh_on(self): # Sounds weird. Yep. scopes = ["s2", "s1", "s3"] # Not in particular order