Runpod python
๐ | Python library for Runpod API and serverless worker SDK.
Welcome to the official Python library for Runpod API & SDK. The project is written primarily in Python, distributed under the MIT License license, first published in 2022. Key topics include: api, artificial-intelligence, cloud-gpu, gpu, machine-learning.
Welcome to the official Python library for Runpod API & SDK.
Table of Contents
- Table of Contents
- ๐ป | Installation
- โก | Serverless Worker (SDK)
- ๐ | API Language Library (GraphQL Wrapper)
- ๐ | Directory
- ๐ค | Community and Contributing
๐ป | Installation
Install from PyPI (Stable Release)
bash# Install with pip pip install runpod # Install with uv (faster alternative) uv add runpod
Install from GitHub (Latest Changes)
To get the latest changes that haven't been released to PyPI yet:
bash# Install latest development version from main branch with pip pip install git+https://github.com/runpod/runpod-python.git # Install with uv uv add git+https://github.com/runpod/runpod-python.git # Install a specific branch pip install git+https://github.com/runpod/runpod-python.git@branch-name # Install a specific tag/release pip install git+https://github.com/runpod/runpod-python.git@v1.0.0 # Install in editable mode for development git clone https://github.com/runpod/runpod-python.git cd runpod-python pip install -e .
Python 3.10 or higher is required to use the latest version of this package.
โก | Serverless Worker (SDK)
This python package can also be used to create a serverless worker that can be deployed to Runpod as a custom endpoint API.
Quick Start
Create a python script in your project that contains your model definition and the Runpod worker start code. Run this python code as your default container start command:
python# my_worker.py import runpod def is_even(job): job_input = job["input"] the_number = job_input["number"] if not isinstance(the_number, int): return {"error": "Silly human, you need to pass an integer."} if the_number % 2 == 0: return True return False runpod.serverless.start({"handler": is_even})
Make sure that this file is ran when your container starts. This can be accomplished by calling it in the docker command when you set up a template at console.runpod.io/serverless/user/templates or by setting it as the default command in your Dockerfile.
See our blog post for creating a basic Serverless API, or view the details docs for more information.
Local Test Worker
You can also test your worker locally before deploying it to Runpod. This is useful for debugging and testing.
bashpython my_worker.py --rp_serve_api
Worker Fitness Checks
Fitness checks allow you to validate your worker environment at startup before processing jobs. If any check fails, the worker exits immediately, allowing your orchestrator to restart it.
python# my_worker.py import runpod import torch # Register fitness checks using the decorator @runpod.serverless.register_fitness_check def check_gpu_available(): """Verify GPU is available.""" if not torch.cuda.is_available(): raise RuntimeError("GPU not available") @runpod.serverless.register_fitness_check def check_disk_space(): """Verify sufficient disk space.""" import shutil stat = shutil.disk_usage("/") free_gb = stat.free / (1024**3) if free_gb < 10: raise RuntimeError(f"Insufficient disk space: {free_gb:.2f}GB free") def handler(job): job_input = job["input"] # Your handler code here return {"output": "success"} # Fitness checks run before handler initialization (production only) runpod.serverless.start({"handler": handler})
Key Features:
- Supports both synchronous and asynchronous check functions
- Checks run only once at worker startup (production mode)
- Runs before handler initialization and job processing begins
- Any check failure exits with code 1 (worker marked unhealthy)
See Worker Fitness Checks documentation for more examples and best practices.
๐ | API Language Library (GraphQL Wrapper)
When interacting with the Runpod API you can use this library to make requests to the API.
pythonimport runpod runpod.api_key = "your_runpod_api_key_found_under_settings"
Endpoints
You can interact with Runpod endpoints via a run or run_sync method.
Basic Usage
pythonendpoint = runpod.Endpoint("ENDPOINT_ID") run_request = endpoint.run( {"your_model_input_key": "your_model_input_value"} ) # Check the status of the endpoint run request print(run_request.status()) # Get the output of the endpoint run request, blocking until the endpoint run is complete. print(run_request.output())
pythonendpoint = runpod.Endpoint("ENDPOINT_ID") run_request = endpoint.run_sync( {"your_model_input_key": "your_model_input_value"} ) # Returns the job results if completed within 90 seconds, otherwise, returns the job status. print(run_request )
API Key Management
The SDK supports multiple ways to set API keys:
1. Global API Key (Default)
pythonimport runpod # Set global API key runpod.api_key = "your_runpod_api_key" # All endpoints will use this key by default endpoint = runpod.Endpoint("ENDPOINT_ID") result = endpoint.run_sync({"input": "data"})
2. Endpoint-Specific API Key
python# Create endpoint with its own API key endpoint = runpod.Endpoint("ENDPOINT_ID", api_key="specific_api_key") # This endpoint will always use the provided API key result = endpoint.run_sync({"input": "data"})
API Key Precedence
The SDK uses this precedence order (highest to lowest):
- Endpoint instance API key (if provided to
Endpoint()) - Global API key (set via
runpod.api_key)
pythonimport runpod # Example showing precedence runpod.api_key = "GLOBAL_KEY" # This endpoint uses GLOBAL_KEY endpoint1 = runpod.Endpoint("ENDPOINT_ID") # This endpoint uses ENDPOINT_KEY (overrides global) endpoint2 = runpod.Endpoint("ENDPOINT_ID", api_key="ENDPOINT_KEY") # All requests from endpoint2 will use ENDPOINT_KEY result = endpoint2.run_sync({"input": "data"})
Thread-Safe Operations
Each Endpoint instance maintains its own API key, making concurrent operations safe:
pythonimport threading import runpod def process_request(api_key, endpoint_id, input_data): # Each thread gets its own Endpoint instance endpoint = runpod.Endpoint(endpoint_id, api_key=api_key) return endpoint.run_sync(input_data) # Safe concurrent usage with different API keys threads = [] for customer in customers: t = threading.Thread( target=process_request, args=(customer["api_key"], customer["endpoint_id"], customer["input"]) ) threads.append(t) t.start()
GPU Cloud (Pods)
pythonimport runpod runpod.api_key = "your_runpod_api_key_found_under_settings" # Get all my pods pods = runpod.get_pods() # Get a specific pod pod = runpod.get_pod(pod.id) # Create a pod with GPU pod = runpod.create_pod("test", "runpod/stack", "NVIDIA GeForce RTX 3070") # Create a pod with CPU pod = runpod.create_pod("test", "runpod/stack", instance_id="cpu3c-2-4") # Stop the pod runpod.stop_pod(pod.id) # Resume the pod runpod.resume_pod(pod.id) # Terminate the pod runpod.terminate_pod(pod.id)
๐ | Directory
BASH. โโโ docs # Documentation โโโ examples # Examples โโโ runpod # Package source code โ โโโ api_wrapper # Language library - API (GraphQL) โ โโโ cli # Command Line Interface Functions โ โโโ endpoint # Language library - Endpoints โ โโโ serverless # SDK - Serverless Worker โโโ tests # Package tests
๐ค | Community and Contributing
We welcome both pull requests and issues on GitHub. Bug fixes and new features are encouraged, but please read our contributing guide first.
<div align="center"><a target="_blank" href="https://discord.gg/pJ3P2DbUUq">
</a>
Contributors
Showing top 12 contributors by commit count.
