Source code for satorbis_kit.vector_operation.wherobots_status

"""
Wherobots Job Status Module

This module provides functionality for checking status, viewing logs, and managing
Wherobots Cloud job runs via the REST API.
"""

from typing import Dict, List, Optional

import requests

from .wherobots_config import WHEROBOTS_API_BASE


[docs] def get_job_status(api_key: str, run_id: str) -> dict: """ Get the status of a Wherobots job run. Args: api_key: Wherobots API key run_id: Job run ID Returns: Job run details dictionary Raises: ImportError: If requests library is not available requests.HTTPError: If the API request fails """ url = f"{WHEROBOTS_API_BASE}/runs/{run_id}" headers = { "accept": "application/json", "X-API-Key": api_key, } response = requests.get(url, headers=headers) response.raise_for_status() return response.json()
[docs] def get_job_logs(api_key: str, run_id: str, cursor: int = 0, size: int = 100) -> dict: """ Get logs for a Wherobots job run. Args: api_key: Wherobots API key run_id: Job run ID cursor: Pagination cursor size: Number of log entries to fetch Returns: Logs dictionary with items, current_page, and next_page Raises: ImportError: If requests library is not available requests.HTTPError: If the API request fails """ url = f"{WHEROBOTS_API_BASE}/runs/{run_id}/logs" headers = { "accept": "application/json", "X-API-Key": api_key, } params = {"cursor": cursor, "size": size} response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json()
[docs] def cancel_job(api_key: str, run_id: str) -> None: """ Cancel a Wherobots job run. Args: api_key: Wherobots API key run_id: Job run ID Raises: ImportError: If requests library is not available requests.HTTPError: If the API request fails """ url = f"{WHEROBOTS_API_BASE}/runs/{run_id}/cancel" headers = { "accept": "application/json", "X-API-Key": api_key, } response = requests.post(url, headers=headers) response.raise_for_status()
[docs] def list_jobs( api_key: str, region: Optional[str] = None, status: Optional[List[str]] = None, name: Optional[str] = None, size: int = 50, ) -> dict: """ List Wherobots job runs. Args: api_key: Wherobots API key region: Filter by region (optional) status: Filter by status list (optional) name: Filter by name pattern (optional) size: Number of results per page Returns: Dictionary with items list and pagination info Raises: ImportError: If requests library is not available requests.HTTPError: If the API request fails """ url = f"{WHEROBOTS_API_BASE}/runs" headers = { "accept": "application/json", "X-API-Key": api_key, } params = {"size": size} if region: params["region"] = region if status: params["status"] = status if name: params["name"] = name response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json()
[docs] def submit_job( api_key: str, region: str, script_uri: str, script_args: Optional[List[str]] = None, runtime: str = "tiny", name: str = "wherobots-job", version: str = "latest", timeout_seconds: int = 3600, dependencies: Optional[List[dict]] = None, spark_configs: Optional[dict] = None, ) -> dict: """ Submit a Python job to Wherobots Cloud. Args: api_key: Wherobots API key region: Compute region (e.g., 'aws-ap-south-1', 'aws-us-west-2') script_uri: S3 URI to the Python script script_args: List of command-line arguments for the script runtime: Runtime size (tiny, small, medium, large, etc.) name: Job run name version: Wherobots version ('latest' or 'preview') timeout_seconds: Job timeout in seconds dependencies: List of dependency objects (PyPI or FILE) spark_configs: Dictionary of Spark configuration key-value pairs Returns: Response dictionary from Wherobots API Raises: ImportError: If requests library is not available requests.HTTPError: If the API request fails """ url = f"{WHEROBOTS_API_BASE}/runs" headers = { "accept": "application/json", "X-API-Key": api_key, "Content-Type": "application/json", } # Build request payload payload = { "runtime": runtime, "name": name, "version": version, "runPython": { "uri": script_uri, "args": script_args or [], }, "timeoutSeconds": timeout_seconds, } # Add environment config if dependencies or spark_configs are provided if dependencies or spark_configs: payload["environment"] = {} if dependencies: payload["environment"]["dependencies"] = dependencies if spark_configs: payload["environment"]["sparkConfigs"] = spark_configs # Make API request response = requests.post( f"{url}?region={region}", headers=headers, json=payload, ) # Check for errors response.raise_for_status() return response.json()