Fluxgate¶
A modern, composable circuit breaker library for Python with full support for both synchronous and asynchronous code.
Why Fluxgate?¶
Circuit breakers prevent cascading failures in distributed systems by monitoring service health and temporarily blocking calls to failing services. Fluxgate makes this easy with:
- Sync & Async: First-class support for both synchronous and asynchronous code
- Composable: Build complex failure detection logic using simple, reusable components
- Zero Dependencies: Core library has no external dependencies
- Fully Typed: Complete type hints for better IDE support
- Monitoring Ready: Built-in monitoring with Prometheus, Slack, and logging
Quick Start¶
Installation¶
pip install fluxgate
Basic Example¶
from fluxgate import CircuitBreaker
cb = CircuitBreaker("payment_api")
@cb
def call_payment_api(amount: float):
return requests.post("https://api.example.com/pay", json={"amount": amount})
That's it! The circuit breaker uses sensible defaults:
- Trips when failure rate exceeds 50% (after 100 calls minimum)
- Waits 60 seconds before attempting recovery
- Gradually increases allowed calls from 0% to 100% over 60 seconds during recovery
How It Works¶
A circuit breaker is a state machine that determines whether to allow or block calls to a service. It operates in three main states:
stateDiagram-v2
[*] --> CLOSED
CLOSED --> OPEN: tripper
OPEN --> HALF_OPEN: retry
HALF_OPEN --> CLOSED: !tripper
HALF_OPEN --> OPEN: tripper
- CLOSED: This is the default state. All calls are permitted and a
trackermonitors their outcomes. If the failure rate exceeds a configured threshold (thetrippercondition), the breaker "trips" and moves to theOPENstate. - OPEN: In this state, the circuit breaker blocks all calls to the service, preventing further failures. After a configured
retrytimeout, it transitions toHALF_OPEN. - HALF_OPEN: The breaker allows a limited number of "probe" calls (controlled by a
permit) to test if the service has recovered. If these calls succeed, the breaker returns toCLOSED. If they fail, it trips again and returns toOPEN.
Core Components¶
Fluxgate's power comes from its composable components. You can mix and match them to create precise and flexible failure detection logic tailored to your needs.
| Component | Purpose | Common Implementations |
|---|---|---|
| Windows | Collects and stores recent call outcomes (success/failure). | CountWindow(100): Stores the last 100 calls.TimeWindow(60): Stores calls from the last 60 seconds. |
| Trackers | Decides whether a call's outcome should be tracked as a failure. | TypeOf(ConnectionError): Tracks specific exception types.Custom(func): Use your own function for complex logic. |
| Trippers | Defines the condition for tripping the circuit from CLOSED to OPEN. |
FailureRate(0.5): Trips if failure rate exceeds 50%.AvgLatency(2.0): Trips if average response time is over 2 seconds. |
| Retries | Determines when the circuit should transition from OPEN to HALF_OPEN. |
Cooldown(60.0): Waits for a fixed 60-second cooldown.Backoff(10.0): Uses an exponential backoff strategy starting at 10 seconds. |
| Permits | Manages how many "probe" calls are allowed in the HALF_OPEN state. |
Random(0.5): Allows 50% of calls to pass through.RampUp(0.1, 0.8, 60): Gradually increases the allowed call ratio over 60 seconds. |
Async Support¶
Full support for asyncio applications:
import asyncio
from fluxgate import AsyncCircuitBreaker
cb = AsyncCircuitBreaker("async_api")
@cb
async def call_async_api():
pass # Your async HTTP call here
async def main():
result = await call_async_api()
asyncio.run(main())
Complete Example¶
A fully configured circuit breaker for an external payment API:
import httpx
from fluxgate import CircuitBreaker
from fluxgate.windows import CountWindow
from fluxgate.trackers import Custom
from fluxgate.trippers import Closed, HalfOpened, MinRequests, FailureRate, FailureStreak
from fluxgate.retries import Backoff
from fluxgate.permits import RampUp
from fluxgate.listeners.log import LogListener
from fluxgate.listeners.prometheus import PrometheusListener
# Track only 5xx errors and network failures
def is_retriable_error(e: Exception) -> bool:
if isinstance(e, httpx.HTTPStatusError):
return e.response.status_code >= 500
return isinstance(e, (httpx.ConnectError, httpx.TimeoutException))
payment_cb = CircuitBreaker(
name="payment_api",
window=CountWindow(size=100),
tracker=Custom(is_retriable_error),
tripper=FailureStreak(5) | (MinRequests(20) & (
(Closed() & FailureRate(0.6)) |
(HalfOpened() & FailureRate(0.5))
)),
retry=Backoff(initial=10.0, multiplier=2.0, max_duration=300.0, jitter_ratio=0.1),
permit=RampUp(initial=0.1, final=0.5, duration=60.0),
listeners=[LogListener(), PrometheusListener()],
)
@payment_cb
def charge_payment(amount: float):
response = httpx.post("https://payment-api.example.com/charge", json={"amount": amount})
response.raise_for_status()
return response.json()
Next Steps¶
- Learn about Components - Deep dive into windows, trackers, trippers, retries, and permits
- See Examples - Real-world usage patterns
- Read API Reference - Complete API documentation