Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,54 @@ New in MSAL Python 1.26
.. automethod:: __init__


mTLS Proof-of-Possession (SN/I certificate)
-------------------------------------------

New in MSAL Python 1.38

MSAL Python supports two different kinds of Proof-of-Possession (PoP) tokens:

* **Signed HTTP Request (SHR) PoP** -- used by public-client apps through the
broker, and configured with the ``auth_scheme`` parameter and
:py:class:`msal.PopAuthScheme` above.
* **mutual-TLS (mTLS) PoP** -- used by *confidential-client* apps. The app's
Subject Name + Issuer (SN/I) certificate is presented as the **client TLS
certificate** in a mutual-TLS handshake to Microsoft Entra, and the returned
access token is cryptographically bound to that certificate
(``token_type == "mtls_pop"``, ``cnf``/``x5t#S256``).

To request an mTLS-PoP token, configure the confidential client with a
certificate credential and pass ``mtls_proof_of_possession=True`` to
:py:meth:`msal.ConfidentialClientApplication.acquire_token_for_client`::

app = msal.ConfidentialClientApplication(
client_id,
authority="https://login.microsoftonline.com/<tenant-id>", # MUST be tenanted
client_credential={"private_key_pfx_path": "sni.pfx", "public_certificate": True},
# azure_region="westus3", # optional; omit for the global mtls endpoint
)
result = app.acquire_token_for_client(
["https://graph.microsoft.com/.default"],
mtls_proof_of_possession=True,
)
# result["token_type"] == "mtls_pop"
# result["binding_certificate"] == {"x5c": [...], "thumbprint_sha256": "..."}

Notes and requirements:

* The ``authority`` must be **tenanted** (not ``/common`` or ``/organizations``).
* MSAL must own the TLS transport, so a custom ``http_client`` is not supported
together with ``mtls_proof_of_possession=True``.
* The existing SN/I + Bearer (client-assertion) flow is unchanged; the same
certificate can be used either as an assertion signer (Bearer) or as the TLS
client certificate (mtls_pop).
* mTLS PoP currently targets the public and Azure Government (Arlington) clouds.
* For a Federated Identity Credential (FIC) exchange, configure the leg-2 client
with both a ``client_assertion`` (the leg-1 token) and an
``mtls_binding_certificate`` sub-key; the leg-1 assertion is then sent as a
``jwt-pop`` client assertion over the same mTLS connection.


Exceptions
----------
These are exceptions that MSAL Python may raise.
Expand Down
374 changes: 361 additions & 13 deletions msal/application.py

Large diffs are not rendered by default.

192 changes: 192 additions & 0 deletions msal/mtls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""Helpers for mTLS Proof-of-Possession (PoP).

This module owns two concerns for the "SN/I certificate over mTLS PoP" feature:

1. The endpoint transform + sovereign guardrail: mapping a tenanted ``login.*``
authority to its ``mtlsauth.*`` counterpart (global or regional), and
rejecting clouds/hosts where mTLS PoP is not (yet) available.
2. The mTLS transport: a ``requests`` session whose HTTPS connections present a
client certificate for the mutual-TLS handshake to the token endpoint.

The wire contract and host mapping mirror the shipped MSAL.NET
``RegionAndMtlsDiscoveryProvider`` so MSAL Python stays cross-SDK consistent.
"""
import logging
try:
from urllib.parse import urlparse, urlunparse
except ImportError: # Python 2
from urlparse import urlparse, urlunparse


logger = logging.getLogger(__name__)

_MTLS_POP_DOC_LINK = "https://aka.ms/msal-net-pop"

# The global public mTLS host. The four public "login.*" hosts all normalize to
# this single global endpoint (ESTSR provides regional failover), matching
# MSAL.NET's PublicEnvForRegionalMtlsAuth.
_PUBLIC_MTLS_HOST = "mtlsauth.microsoft.com"

# Public-cloud login hosts that normalize to the single global mTLS host above.
_PUBLIC_CLOUD_LOGIN_HOSTS = frozenset([
"login.microsoftonline.com",
"login.microsoft.com",
"login.windows.net",
"sts.windows.net",
])

# ─────────────────────────────────────────────────────────────────────────────
# SOVEREIGN GUARDRAIL - single override point for mTLS cloud availability.
#
# mTLS PoP is currently rejected for the deprecated sovereign login hosts below
# and for any non-"login." host. ``mtlsauth.*`` is rolling out across clouds
# (Azure Government / AGC: available; Bleu / Delos: TBD). To enable a cloud,
# remove its entry here (and, if needed, relax the non-"login." host check in
# ``mtls_pop_host``). This is the ONLY place cloud eligibility is enforced, so
# do not scatter equivalent checks elsewhere in the codebase.
# ─────────────────────────────────────────────────────────────────────────────
_MTLS_POP_UNSUPPORTED_HOSTS = {
"login.usgovcloudapi.net":
"login.usgovcloudapi.net is not supported for mTLS PoP, "
"please use login.microsoftonline.us",
"login.chinacloudapi.cn":
"login.chinacloudapi.cn is not supported for mTLS PoP, "
"please use login.partner.microsoftonline.cn",
}


def mtls_pop_host(instance, region=None):
"""Return the ``mtlsauth.*`` host for a given ``login.*`` authority instance.

:param str instance: The authority host, e.g. ``login.microsoftonline.com``.
:param region: Optional region, e.g. ``westus3``. When provided, a regional
mTLS host ``{region}.mtlsauth...`` is returned; otherwise the global one.
:raises ValueError: When mTLS PoP is not supported for the given host
(the sovereign guardrail above, or a non-``login.`` host).
"""
instance = instance.lower()
if instance in _MTLS_POP_UNSUPPORTED_HOSTS: # Sovereign guardrail
raise ValueError(_MTLS_POP_UNSUPPORTED_HOSTS[instance])
if instance in _PUBLIC_CLOUD_LOGIN_HOSTS:
# Known public aliases (incl. legacy login.windows.net / sts.windows.net)
# all normalize to the single global mTLS host.
base = _PUBLIC_MTLS_HOST
elif instance.startswith("login."): # e.g. login.microsoftonline.us
base = "mtlsauth" + instance[len("login"):] # -> mtlsauth.microsoftonline.us
else:
raise ValueError(
"mTLS PoP is only supported for hosts that start with 'login.'. "
"The provided authority host ({}) does not meet this requirement. "
"See {} for details.".format(instance, _MTLS_POP_DOC_LINK))
return "{}.{}".format(region, base) if region else base


def transform_token_endpoint(token_endpoint, instance, region=None):
"""Return ``token_endpoint`` with its host swapped for the mTLS host.

The path/tenant and everything else are preserved; only the network host is
rewritten (e.g. ``https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token``
-> ``https://mtlsauth.microsoft.com/{tenant}/oauth2/v2.0/token``).
"""
parsed = urlparse(token_endpoint)
host = mtls_pop_host(instance, region)
# Preserve a non-default port if the original endpoint carried one (tests).
netloc = "{}:{}".format(host, parsed.port) if parsed.port else host
return urlunparse(parsed._replace(netloc=netloc))


class _MtlsHttpClient(object):
"""A minimal http client (``post``/``get``/``close``) whose HTTPS
connections present a client certificate for mutual-TLS.

MSAL owns this transport. A caller's plain custom ``http_client`` cannot
perform the mTLS handshake, which is why requesting mTLS PoP with a
non-mTLS-capable custom transport fails fast (see application.py).

The ``ssl.SSLContext`` (and its temp key file) is built lazily on first use,
so Bearer-only certificate apps never pay the cost nor touch the disk.
"""
def __init__(self, cert_pem, key_pem, *,
verify=True, proxies=None, timeout=None):
# cert_pem / key_pem are PEM-encoded bytes. key_pem must be an
# unencrypted private key (the caller normalizes it).
self._cert_pem = cert_pem
self._key_pem = key_pem
self._verify = verify
self._proxies = proxies
self._timeout = timeout
self._session = None

def _ensure_session(self):
if self._session is None:
import requests # Lazy import, same as the rest of MSAL
session = requests.Session()
session.verify = self._verify
if self._proxies:
session.proxies = self._proxies
adapter = _make_mtls_adapter(
_create_ssl_context(self._cert_pem, self._key_pem))
session.mount("https://", adapter)
self._session = session
return self._session

def post(self, url, **kwargs):
if self._timeout is not None:
kwargs.setdefault("timeout", self._timeout)
return self._ensure_session().post(url, **kwargs)

def get(self, url, **kwargs):
if self._timeout is not None:
kwargs.setdefault("timeout", self._timeout)
return self._ensure_session().get(url, **kwargs)

def close(self):
if self._session is not None:
self._session.close()


def _make_mtls_adapter(ssl_context):
"""Return a ``requests`` HTTPAdapter that injects ``ssl_context`` (with the
client certificate loaded) into every connection pool it creates."""
from requests.adapters import HTTPAdapter # Lazy import

class _MtlsHTTPAdapter(HTTPAdapter):
def __init__(self, ssl_context):
self._ssl_context = ssl_context
super(_MtlsHTTPAdapter, self).__init__(max_retries=1)

def init_poolmanager(self, *args, **kwargs):
kwargs["ssl_context"] = self._ssl_context
return super(_MtlsHTTPAdapter, self).init_poolmanager(*args, **kwargs)

def proxy_manager_for(self, *args, **kwargs):
kwargs["ssl_context"] = self._ssl_context
return super(_MtlsHTTPAdapter, self).proxy_manager_for(*args, **kwargs)

return _MtlsHTTPAdapter(ssl_context)


def _create_ssl_context(cert_pem, key_pem):
"""Build a client ``ssl.SSLContext`` that presents ``cert_pem``/``key_pem``.

``ssl.SSLContext.load_cert_chain`` requires a file path, but our key is
in memory. We write a ``0600`` temp PEM (mkstemp defaults to owner-only),
load it, then unlink it immediately - the context keeps the material in
memory, so nothing sensitive lingers on disk.
"""
import ssl
import os
import tempfile
context = ssl.create_default_context() # Verifies the server (ESTS) as usual
fd, path = tempfile.mkstemp(suffix=".pem") # Owner-only (0600) by default
try:
os.write(fd, key_pem + b"\n" + cert_pem)
os.close(fd) # Close before load_cert_chain reads it (required on Windows)
context.load_cert_chain(path) # Loads our client cert+key into memory
finally:
try:
os.remove(path) # Unlink immediately; minimal disk exposure
except OSError: # pragma: no cover
logger.warning("Unable to remove temporary mTLS key file")
return context
Comment on lines +180 to +191

1 change: 1 addition & 0 deletions msal/oauth2cli/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def encode_saml_assertion(assertion):

CLIENT_ASSERTION_TYPE_JWT = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer"
CLIENT_ASSERTION_TYPE_SAML2 = "urn:ietf:params:oauth:client-assertion-type:saml2-bearer"
CLIENT_ASSERTION_TYPE_JWT_POP = "urn:ietf:params:oauth:client-assertion-type:jwt-pop"
client_assertion_encoders = {CLIENT_ASSERTION_TYPE_SAML2: encode_saml_assertion}

@property
Expand Down
2 changes: 1 addition & 1 deletion msal/sku.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"""

# The __init__.py will import this. Not the other way around.
__version__ = "1.37.0"
__version__ = "1.38.0"
SKU = "MSAL.Python"
25 changes: 22 additions & 3 deletions msal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
AT_AGING = 4
RESERVED = 5

# Server-telemetry token-type values (parity with the other MSALs, e.g. .NET's
# TelemetryTokenTypeConstants). Only non-Bearer token types are emitted.
_TELEMETRY_TOKEN_TYPES = {
"mtls_pop": 6, # mTLS-bound Proof-of-Possession
}


def _get_new_correlation_id():
return str(uuid.uuid4())
Expand All @@ -28,18 +34,31 @@ class _TelemetryContext(object):
_CURRENT_HEADER_SIZE_LIMIT = 100
_LAST_HEADER_SIZE_LIMIT = 350

def __init__(self, buffer, lock, api_id, correlation_id=None, refresh_reason=None):
def __init__(self, buffer, lock, api_id, correlation_id=None,
refresh_reason=None, token_type=None):
self._buffer = buffer
self._lock = lock
self._api_id = api_id
self._correlation_id = correlation_id or _get_new_correlation_id()
self._refresh_reason = refresh_reason or NON_SILENT_CALL
self._token_type = token_type
logger.debug("Generate or reuse correlation_id: %s", self._correlation_id)

def generate_headers(self):
with self._lock:
current = "4|{api_id},{cache_refresh}|".format(
api_id=self._api_id, cache_refresh=self._refresh_reason)
# MSAL Python's current schema (4) carries an EMPTY platform-config
# section after the second "|". To stay byte-for-byte unchanged for
# every existing flow, we only populate that section for token types
# that need it (currently mtls_pop -> value 6, placed in the 3rd
# platform field). This is a documented Python-schema divergence:
# other MSALs use schema 5, but the token-type *value* matches.
token_type_value = _TELEMETRY_TOKEN_TYPES.get(self._token_type)
platform_config = (
",,{}".format(token_type_value)
if token_type_value is not None else "")
current = "4|{api_id},{cache_refresh}|{platform_config}".format(
api_id=self._api_id, cache_refresh=self._refresh_reason,
platform_config=platform_config)
if len(current) > self._CURRENT_HEADER_SIZE_LIMIT:
logger.warning(
"Telemetry header greater than {} will be truncated by AAD".format(
Expand Down
20 changes: 17 additions & 3 deletions msal/token_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ def __init__(self):
realm=None, target=None,
ext_cache_key=None,
# Note: New field(s) can be added here
#key_id=None,
key_id=None, # So ATs bound to different keys/certs can
# coexist (e.g. an mtls_pop AT vs a Bearer AT for the
# same app+scope). key_id is absent for Bearer, which
# keeps the Bearer cache key byte-for-byte unchanged.
**ignored_payload_from_a_real_token:
"-".join([ # Note: Could use a hash here to shorten key length
home_account_id or "",
Expand All @@ -163,8 +166,8 @@ def __init__(self):
client_id or "",
realm or "",
target or "",
#key_id or "", # So ATs of different key_id can coexist
] + ([ext_cache_key] if ext_cache_key else [])
+ ([key_id] if key_id else []) # ATs of different key_id coexist
).lower(),
Comment on lines 169 to 171
self.CredentialType.ID_TOKEN:
lambda home_account_id=None, environment=None, client_id=None,
Expand Down Expand Up @@ -194,6 +197,7 @@ def _get_access_token(
self,
home_account_id, environment, client_id, realm, target, # Together they form a compound key
ext_cache_key=None,
key_id=None,
default=None,
): # O(1)
return self._get(
Expand All @@ -205,6 +209,7 @@ def _get_access_token(
realm=realm,
target=" ".join(target),
ext_cache_key=ext_cache_key,
key_id=key_id,
),
default=default)

Expand Down Expand Up @@ -251,7 +256,8 @@ def search(self, credential_type, target=None, query=None, *, now=None): # O(n)
preferred_result = self._get_access_token(
query["home_account_id"], query["environment"],
query["client_id"], query["realm"], target,
ext_cache_key=query.get("ext_cache_key"))
ext_cache_key=query.get("ext_cache_key"),
key_id=query.get("key_id"))
if preferred_result and self._is_matching(
preferred_result, query,
# Needs no target_set here because it is satisfied by dict key
Expand Down Expand Up @@ -284,6 +290,14 @@ def search(self, credential_type, target=None, query=None, *, now=None): # O(n)
and "ext_cache_key" not in (query or {})
):
continue
# Cache isolation for key-bound tokens (e.g. mtls_pop, SSH-cert).
# An entry bound to a key_id must not satisfy a query without
# one, so a Bearer lookup never returns a PoP/mtls_pop token.
if (credential_type == self.CredentialType.ACCESS_TOKEN
and "key_id" in entry
and "key_id" not in (query or {})
):
continue
yield entry
for at in expired_access_tokens:
self.remove_at(at)
Expand Down
Loading
Loading