Source code for satorbis_kit.auth.auth_service
import time
from typing import Optional, Union
import requests
from .auth_base import BaseAuth
from .environments import Environment, get_env_config
[docs]
class ServiceAuth(BaseAuth):
"""
Manages the service-account (client-credentials) login flow against
an OIDC provider, designed for use inside a Jupyter notebook.
Parameters
----------
client_id:
The client ID of the OIDC application.
client_secret:
The client secret of the OIDC application.
environment:
Target deployment environment. Accepts an :class:`Environment`
member, a plain string (``"prod"`` or ``"dev"``), or ``None``.
When ``None`` the value of the ``SATORBIS_ENV`` environment
variable is used; if that is also absent, ``prod`` is the
default. Example: ``ServiceAuth(..., environment="dev")``.
scopes:
Space-separated OIDC scopes.
token_url:
Optional. The token endpoint URL. If not provided,
it will be discovered from the OIDC discovery document.
"""
def __init__(
self,
client_id: str,
client_secret: str,
environment: Union[Environment, str, None] = None,
scopes: str = "openid profile email",
token_url: Optional[str] = None,
):
super().__init__()
if isinstance(environment, str):
environment = Environment(environment)
resolved_env, config = get_env_config(environment)
self.environment = resolved_env
self.domain = config.domain.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
if token_url:
self.token_url = token_url
else:
self._discover_endpoints()
def _discover_endpoints(self) -> None:
"""
Fetch the OIDC discovery document and extract the token endpoint URL.
"""
discovery_url = f"{self.domain}/.well-known/openid-configuration"
resp = requests.get(discovery_url, timeout=10)
resp.raise_for_status()
config = resp.json()
self.token_url = config["token_endpoint"]
[docs]
def authenticate(self):
"""
Authenticate using service account (client credentials flow).
Returns
-------
TokenStore
Token store containing the access token.
"""
response = requests.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scopes,
},
)
try:
response.raise_for_status()
token = response.json()
self.tokens.update_from_raw(token)
exp = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(self.tokens.expires_at)
)
print("Service authentication successful")
print(f"Access token: {self.tokens.access_token[:30]}...")
print(f"Expires at: {exp}")
except Exception as e:
print("Service authentication failed")
print(f"Error: {e}")
raise e
return self.tokens
[docs]
def refresh_token(self):
"""
Refresh the access token by re-authenticating.
Returns
-------
TokenStore
Token store containing the new access token.
"""
return self.authenticate()