Source code for satorbis_kit.auth.auth_service

import time
from typing import Optional, Union

import requests

from .auth_base import BaseAuth
from .environments import Environment, get_env_config


[docs] class ServiceAuth(BaseAuth): """ Manages the service-account (client-credentials) login flow against an OIDC provider, designed for use inside a Jupyter notebook. Parameters ---------- client_id: The client ID of the OIDC application. client_secret: The client secret of the OIDC application. environment: Target deployment environment. Accepts an :class:`Environment` member, a plain string (``"prod"`` or ``"dev"``), or ``None``. When ``None`` the value of the ``SATORBIS_ENV`` environment variable is used; if that is also absent, ``prod`` is the default. Example: ``ServiceAuth(..., environment="dev")``. scopes: Space-separated OIDC scopes. token_url: Optional. The token endpoint URL. If not provided, it will be discovered from the OIDC discovery document. """ def __init__( self, client_id: str, client_secret: str, environment: Union[Environment, str, None] = None, scopes: str = "openid profile email", token_url: Optional[str] = None, ): super().__init__() if isinstance(environment, str): environment = Environment(environment) resolved_env, config = get_env_config(environment) self.environment = resolved_env self.domain = config.domain.rstrip("/") self.client_id = client_id self.client_secret = client_secret self.scopes = scopes if token_url: self.token_url = token_url else: self._discover_endpoints() def _discover_endpoints(self) -> None: """ Fetch the OIDC discovery document and extract the token endpoint URL. """ discovery_url = f"{self.domain}/.well-known/openid-configuration" resp = requests.get(discovery_url, timeout=10) resp.raise_for_status() config = resp.json() self.token_url = config["token_endpoint"]
[docs] def authenticate(self): """ Authenticate using service account (client credentials flow). Returns ------- TokenStore Token store containing the access token. """ response = requests.post( self.token_url, data={ "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, "scope": self.scopes, }, ) try: response.raise_for_status() token = response.json() self.tokens.update_from_raw(token) exp = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self.tokens.expires_at) ) print("Service authentication successful") print(f"Access token: {self.tokens.access_token[:30]}...") print(f"Expires at: {exp}") except Exception as e: print("Service authentication failed") print(f"Error: {e}") raise e return self.tokens
[docs] def refresh_token(self): """ Refresh the access token by re-authenticating. Returns ------- TokenStore Token store containing the new access token. """ return self.authenticate()