"""
Wherobots Job Status Module
This module provides functionality for checking status, viewing logs, and managing
Wherobots Cloud job runs via the REST API.
"""
from typing import Dict, List, Optional
import requests
from .wherobots_config import WHEROBOTS_API_BASE
[docs]
def get_job_status(api_key: str, run_id: str) -> dict:
"""
Get the status of a Wherobots job run.
Args:
api_key: Wherobots API key
run_id: Job run ID
Returns:
Job run details dictionary
Raises:
ImportError: If requests library is not available
requests.HTTPError: If the API request fails
"""
url = f"{WHEROBOTS_API_BASE}/runs/{run_id}"
headers = {
"accept": "application/json",
"X-API-Key": api_key,
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
[docs]
def get_job_logs(api_key: str, run_id: str, cursor: int = 0, size: int = 100) -> dict:
"""
Get logs for a Wherobots job run.
Args:
api_key: Wherobots API key
run_id: Job run ID
cursor: Pagination cursor
size: Number of log entries to fetch
Returns:
Logs dictionary with items, current_page, and next_page
Raises:
ImportError: If requests library is not available
requests.HTTPError: If the API request fails
"""
url = f"{WHEROBOTS_API_BASE}/runs/{run_id}/logs"
headers = {
"accept": "application/json",
"X-API-Key": api_key,
}
params = {"cursor": cursor, "size": size}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
[docs]
def cancel_job(api_key: str, run_id: str) -> None:
"""
Cancel a Wherobots job run.
Args:
api_key: Wherobots API key
run_id: Job run ID
Raises:
ImportError: If requests library is not available
requests.HTTPError: If the API request fails
"""
url = f"{WHEROBOTS_API_BASE}/runs/{run_id}/cancel"
headers = {
"accept": "application/json",
"X-API-Key": api_key,
}
response = requests.post(url, headers=headers)
response.raise_for_status()
[docs]
def list_jobs(
api_key: str,
region: Optional[str] = None,
status: Optional[List[str]] = None,
name: Optional[str] = None,
size: int = 50,
) -> dict:
"""
List Wherobots job runs.
Args:
api_key: Wherobots API key
region: Filter by region (optional)
status: Filter by status list (optional)
name: Filter by name pattern (optional)
size: Number of results per page
Returns:
Dictionary with items list and pagination info
Raises:
ImportError: If requests library is not available
requests.HTTPError: If the API request fails
"""
url = f"{WHEROBOTS_API_BASE}/runs"
headers = {
"accept": "application/json",
"X-API-Key": api_key,
}
params = {"size": size}
if region:
params["region"] = region
if status:
params["status"] = status
if name:
params["name"] = name
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
[docs]
def submit_job(
api_key: str,
region: str,
script_uri: str,
script_args: Optional[List[str]] = None,
runtime: str = "tiny",
name: str = "wherobots-job",
version: str = "latest",
timeout_seconds: int = 3600,
dependencies: Optional[List[dict]] = None,
spark_configs: Optional[dict] = None,
) -> dict:
"""
Submit a Python job to Wherobots Cloud.
Args:
api_key: Wherobots API key
region: Compute region (e.g., 'aws-ap-south-1', 'aws-us-west-2')
script_uri: S3 URI to the Python script
script_args: List of command-line arguments for the script
runtime: Runtime size (tiny, small, medium, large, etc.)
name: Job run name
version: Wherobots version ('latest' or 'preview')
timeout_seconds: Job timeout in seconds
dependencies: List of dependency objects (PyPI or FILE)
spark_configs: Dictionary of Spark configuration key-value pairs
Returns:
Response dictionary from Wherobots API
Raises:
ImportError: If requests library is not available
requests.HTTPError: If the API request fails
"""
url = f"{WHEROBOTS_API_BASE}/runs"
headers = {
"accept": "application/json",
"X-API-Key": api_key,
"Content-Type": "application/json",
}
# Build request payload
payload = {
"runtime": runtime,
"name": name,
"version": version,
"runPython": {
"uri": script_uri,
"args": script_args or [],
},
"timeoutSeconds": timeout_seconds,
}
# Add environment config if dependencies or spark_configs are provided
if dependencies or spark_configs:
payload["environment"] = {}
if dependencies:
payload["environment"]["dependencies"] = dependencies
if spark_configs:
payload["environment"]["sparkConfigs"] = spark_configs
# Make API request
response = requests.post(
f"{url}?region={region}",
headers=headers,
json=payload,
)
# Check for errors
response.raise_for_status()
return response.json()