Skip to content

Core API

This page documents the main user-facing classes and exceptions. These are the primary entry points for using Fluxgate.


Circuit Breaker Classes

These are the main classes you will instantiate to create a circuit breaker.

fluxgate.circuitbreaker.CircuitBreaker

CircuitBreaker(
    name: str,
    window: IWindow | None = None,
    tracker: ITracker | None = None,
    tripper: ITripper | None = None,
    retry: IRetry | None = None,
    permit: IPermit | None = None,
    slow_threshold: float = 60.0,
    listeners: Iterable[IListener] = (),
)

Synchronous circuit breaker implementation.

Protects your service from cascading failures by monitoring call failures and temporarily blocking calls when a failure threshold is reached.

The circuit breaker operates in three main states:

  • CLOSED: Normal operation, calls pass through
  • OPEN: Failure threshold exceeded, calls are blocked
  • HALF_OPEN: Testing if the service recovered, limited calls allowed

Parameters:

Name Type Description Default
name str

Circuit breaker identifier

required
window IWindow | None

Sliding window for metrics collection (default: CountWindow(100))

None
tracker ITracker | None

Determines which exceptions to track as failures (default: All())

None
tripper ITripper | None

Condition to open/close the circuit based on metrics (default: MinRequests(100) & (FailureRate(0.5) | SlowRate(1.0)))

None
retry IRetry | None

Strategy for transitioning from OPEN to HALF_OPEN (default: Cooldown(60.0))

None
permit IPermit | None

Strategy for allowing calls in HALF_OPEN state (default: RampUp(0.0, 1.0, 60.0))

None
slow_threshold float

Duration threshold in seconds to mark calls as slow (default: 60.0)

60.0
listeners Iterable[IListener]

Event listeners for state transitions (default: empty)

()

Examples:

Basic usage with defaults:

>>> cb = CircuitBreaker("my-service")
>>> @cb
... def call_api():
...     return requests.get("https://api.example.com")

Custom configuration:

>>> cb = CircuitBreaker(
...     name="payment_api",
...     tracker=TypeOf(ConnectionError),
...     tripper=MinRequests(10) & FailureRate(0.5),
...     slow_threshold=1.0,
... )
Note

This implementation is NOT thread-safe. Each process maintains its own independent circuit breaker state. For asyncio applications, use AsyncCircuitBreaker instead.

Source code in fluxgate/circuitbreaker.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def __init__(
    self,
    name: str,
    window: IWindow | None = None,
    tracker: ITracker | None = None,
    tripper: ITripper | None = None,
    retry: IRetry | None = None,
    permit: IPermit | None = None,
    slow_threshold: float = 60.0,
    listeners: Iterable[IListener] = (),
) -> None:
    self._name = name
    self._window = window or CountWindow(100)
    self._tracker = tracker or All()
    self._tripper = tripper or MinRequests(100) & (FailureRate(0.5) | SlowRate(1.0))
    self._retry = retry or Cooldown(60.0)
    self._permit = permit or RampUp(0.0, 1.0, 60.0)
    self._listeners = tuple(listeners)
    self._slow_threshold = slow_threshold
    self._changed_at = time.time()
    self._reopens = 0
    self._consecutive_failures = 0
    self._state: CircuitBreaker._State = self._Closed(self)

__call__

__call__(func: Callable[P, R]) -> Callable[P, R]
__call__(
    func: None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]
__call__(
    func: Callable[P, R] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, R]
    | Callable[[Callable[P, R]], Callable[P, R]]
)

Decorate a function with circuit breaker protection.

Examples:

>>> @cb
... def api_call():
...     return requests.get("https://api.example.com")
>>> @cb(fallback=lambda e: cached_value)
... def api_call():
...     return requests.get("https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, R] | None

Function to protect

None
fallback Callable[[Exception], R] | None

Optional function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

None

Returns:

Type Description
Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]

Wrapped function with circuit breaker behavior

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN (if no fallback)

Source code in fluxgate/circuitbreaker.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def __call__(
    self,
    func: Callable[P, R] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
    """Decorate a function with circuit breaker protection.

    Examples:
        >>> @cb
        ... def api_call():
        ...     return requests.get("https://api.example.com")

        >>> @cb(fallback=lambda e: cached_value)
        ... def api_call():
        ...     return requests.get("https://api.example.com")

    Args:
        func: Function to protect
        fallback: Optional function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.

    Returns:
        Wrapped function with circuit breaker behavior

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN (if no fallback)
    """

    def decorator(f: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(f)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            try:
                return self._state.execute(f, *args, **kwargs)
            except Exception as e:
                if fallback is not None:
                    return fallback(e)
                raise

        return wrapper

    if func is not None:
        return decorator(func)
    return decorator

call

call(
    func: Callable[P, R], /, *args: args, **kwargs: kwargs
) -> R

Execute a function with circuit breaker protection.

Examples:

>>> cb.call(requests.get, "https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, R]

Function to execute

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN

Source code in fluxgate/circuitbreaker.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def call(
    self,
    func: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute a function with circuit breaker protection.

    Examples:
        >>> cb.call(requests.get, "https://api.example.com")

    Args:
        func: Function to execute
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN
    """
    return self._state.execute(func, *args, **kwargs)

call_with_fallback

call_with_fallback(
    func: Callable[P, R],
    fallback: Callable[[Exception], R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute a function with circuit breaker protection and fallback.

Examples:

>>> cb.call_with_fallback(fetch_data, lambda e: cached_data)

Parameters:

Name Type Description Default
func Callable[P, R]

Function to execute

required
fallback Callable[[Exception], R]

Function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result or fallback result

Source code in fluxgate/circuitbreaker.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def call_with_fallback(
    self,
    func: Callable[P, R],
    fallback: Callable[[Exception], R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute a function with circuit breaker protection and fallback.

    Examples:
        >>> cb.call_with_fallback(fetch_data, lambda e: cached_data)

    Args:
        func: Function to execute
        fallback: Function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result or fallback result
    """
    try:
        return self._state.execute(func, *args, **kwargs)
    except Exception as e:
        return fallback(e)

info

info() -> CircuitBreakerInfo

Get current circuit breaker state and metrics.

Returns:

Type Description
CircuitBreakerInfo

Dictionary with circuit breaker state information

Source code in fluxgate/circuitbreaker.py
370
371
372
373
374
375
376
377
378
379
380
381
382
def info(self) -> CircuitBreakerInfo:
    """Get current circuit breaker state and metrics.

    Returns:
        Dictionary with circuit breaker state information
    """
    return CircuitBreakerInfo(
        name=self._name,
        state=self._state.get_state_enum().value,
        changed_at=self._changed_at,
        reopens=self._reopens,
        metrics=self._window.get_metric(),
    )

reset

reset() -> None

Reset circuit breaker to CLOSED state and clear metrics.

Source code in fluxgate/circuitbreaker.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def reset(self) -> None:
    """Reset circuit breaker to CLOSED state and clear metrics."""
    current_state = self._state.get_state_enum()
    self._state = self._Closed(self)
    self._changed_at = time.time()
    self._reopens = 0
    self._consecutive_failures = 0
    self._window.reset()

    signal = Signal(
        circuit_name=self._name,
        old_state=current_state,
        new_state=StateEnum.CLOSED,
        timestamp=self._changed_at,
    )
    self._notify(signal)

disable

disable() -> None

Disable circuit breaker (all calls pass through without tracking).

Source code in fluxgate/circuitbreaker.py
401
402
403
def disable(self) -> None:
    """Disable circuit breaker (all calls pass through without tracking)."""
    self._transition_to(StateEnum.DISABLED)

metrics_only

metrics_only() -> None

Enable metrics-only mode (track metrics but never open circuit).

Source code in fluxgate/circuitbreaker.py
405
406
407
def metrics_only(self) -> None:
    """Enable metrics-only mode (track metrics but never open circuit)."""
    self._transition_to(StateEnum.METRICS_ONLY)

force_open

force_open() -> None

Force circuit breaker to OPEN state (all calls blocked).

Source code in fluxgate/circuitbreaker.py
409
410
411
def force_open(self) -> None:
    """Force circuit breaker to OPEN state (all calls blocked)."""
    self._transition_to(StateEnum.FORCED_OPEN)

fluxgate.circuitbreaker.AsyncCircuitBreaker

AsyncCircuitBreaker(
    name: str,
    window: IWindow | None = None,
    tracker: ITracker | None = None,
    tripper: ITripper | None = None,
    retry: IRetry | None = None,
    permit: IPermit | None = None,
    slow_threshold: float = 60.0,
    max_half_open_calls: int = 10,
    listeners: Iterable[IListener | IAsyncListener] = (),
)

Asynchronous circuit breaker implementation for asyncio applications.

Thread-safe circuit breaker with async/await support. Uses asyncio locks to coordinate state transitions and metric updates across concurrent tasks.

The circuit breaker operates in three main states:

  • CLOSED: Normal operation, calls pass through
  • OPEN: Failure threshold exceeded, calls are blocked
  • HALF_OPEN: Testing if the service recovered, limited calls allowed

Parameters:

Name Type Description Default
name str

Circuit breaker identifier

required
window IWindow | None

Sliding window for metrics collection (default: CountWindow(100))

None
tracker ITracker | None

Determines which exceptions to track as failures (default: All())

None
tripper ITripper | None

Condition to open/close the circuit based on metrics (default: MinRequests(100) & (FailureRate(0.5) | SlowRate(1.0)))

None
retry IRetry | None

Strategy for transitioning from OPEN to HALF_OPEN (default: Cooldown(60.0))

None
permit IPermit | None

Strategy for allowing calls in HALF_OPEN state (default: RampUp(0.0, 1.0, 60.0))

None
slow_threshold float

Duration threshold in seconds to mark calls as slow (default: 60.0)

60.0
max_half_open_calls int

Maximum concurrent calls allowed in HALF_OPEN state (default: 10)

10
listeners Iterable[IListener | IAsyncListener]

Event listeners for state transitions (default: empty)

()

Examples:

Basic usage with defaults:

>>> cb = AsyncCircuitBreaker("my-service")
>>> @cb
... async def call_api():
...     async with httpx.AsyncClient() as client:
...         return await client.get("https://api.example.com")

Custom configuration:

>>> cb = AsyncCircuitBreaker(
...     name="payment_api",
...     tracker=TypeOf(httpx.ConnectError),
...     tripper=MinRequests(10) & FailureRate(0.5),
...     slow_threshold=1.0,
... )
Note

Uses asyncio locks for thread safety within a single event loop. Each process maintains its own independent circuit breaker state.

Source code in fluxgate/circuitbreaker.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def __init__(
    self,
    name: str,
    window: IWindow | None = None,
    tracker: ITracker | None = None,
    tripper: ITripper | None = None,
    retry: IRetry | None = None,
    permit: IPermit | None = None,
    slow_threshold: float = 60.0,
    max_half_open_calls: int = 10,
    listeners: Iterable[IListener | IAsyncListener] = (),
) -> None:
    self._name = name
    self._window = window or CountWindow(100)
    self._tracker = tracker or All()
    self._tripper = tripper or MinRequests(100) & (FailureRate(0.5) | SlowRate(1.0))
    self._retry = retry or Cooldown(60.0)
    self._permit = permit or RampUp(0.0, 1.0, 60.0)
    self._listeners = tuple(listeners)
    self._slow_threshold = slow_threshold
    self._changed_at = time.time()
    self._reopens = 0
    self._consecutive_failures = 0
    self._state: AsyncCircuitBreaker._State = self._Closed(self)
    self._state_lock = asyncio.Lock()
    self._half_open_semaphore = asyncio.Semaphore(max_half_open_calls)
    self._window_lock = asyncio.Lock()

__call__

__call__(
    func: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]
__call__(
    func: None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]
__call__(
    func: Callable[P, Awaitable[R]] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, Awaitable[R]]
    | Callable[
        [Callable[P, Awaitable[R]]],
        Callable[P, Awaitable[R]],
    ]
)

Decorate an awaitable function with circuit breaker protection.

Examples:

>>> @cb
... async def api_call():
...     async with httpx.AsyncClient() as client:
...         return await client.get("https://api.example.com")
>>> @cb(fallback=lambda e: cached_value)
... async def api_call():
...     return await fetch_data()

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]] | None

Awaitable function to protect

None
fallback Callable[[Exception], R] | None

Optional function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

None

Returns:

Type Description
Callable[P, Awaitable[R]] | Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Wrapped awaitable function with circuit breaker behavior

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN (if no fallback)

Source code in fluxgate/circuitbreaker.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
def __call__(
    self,
    func: Callable[P, Awaitable[R]] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, Awaitable[R]]
    | Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
):
    """Decorate an awaitable function with circuit breaker protection.

    Examples:
        >>> @cb
        ... async def api_call():
        ...     async with httpx.AsyncClient() as client:
        ...         return await client.get("https://api.example.com")

        >>> @cb(fallback=lambda e: cached_value)
        ... async def api_call():
        ...     return await fetch_data()

    Args:
        func: Awaitable function to protect
        fallback: Optional function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.

    Returns:
        Wrapped awaitable function with circuit breaker behavior

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN (if no fallback)
    """

    def decorator(
        f: Callable[P, Awaitable[R]],
    ) -> Callable[P, Awaitable[R]]:
        @functools.wraps(f)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            try:
                return await self._state.execute(f, *args, **kwargs)
            except Exception as e:
                if fallback is not None:
                    return fallback(e)
                raise

        return wrapper

    if func is not None:
        return decorator(func)
    return decorator

call async

call(
    func: Callable[P, Awaitable[R]],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute an awaitable function with circuit breaker protection.

Examples:

>>> await cb.call(client.get, "https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]]

Awaitable function to execute

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN

Source code in fluxgate/circuitbreaker.py
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
async def call(
    self,
    func: Callable[P, Awaitable[R]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute an awaitable function with circuit breaker protection.

    Examples:
        >>> await cb.call(client.get, "https://api.example.com")

    Args:
        func: Awaitable function to execute
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN
    """
    return await self._state.execute(func, *args, **kwargs)

call_with_fallback async

call_with_fallback(
    func: Callable[P, Awaitable[R]],
    fallback: Callable[[Exception], R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute an awaitable function with circuit breaker protection and fallback.

Examples:

>>> await cb.call_with_fallback(fetch_data, lambda e: cached_data)

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]]

Awaitable function to execute

required
fallback Callable[[Exception], R]

Function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result or fallback result

Source code in fluxgate/circuitbreaker.py
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
async def call_with_fallback(
    self,
    func: Callable[P, Awaitable[R]],
    fallback: Callable[[Exception], R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute an awaitable function with circuit breaker protection and fallback.

    Examples:
        >>> await cb.call_with_fallback(fetch_data, lambda e: cached_data)

    Args:
        func: Awaitable function to execute
        fallback: Function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result or fallback result
    """
    try:
        return await self._state.execute(func, *args, **kwargs)
    except Exception as e:
        return fallback(e)

info

info() -> CircuitBreakerInfo

Get current circuit breaker state and metrics.

Returns:

Type Description
CircuitBreakerInfo

Dictionary with circuit breaker state information

Source code in fluxgate/circuitbreaker.py
798
799
800
801
802
803
804
805
806
807
808
809
810
def info(self) -> CircuitBreakerInfo:
    """Get current circuit breaker state and metrics.

    Returns:
        Dictionary with circuit breaker state information
    """
    return CircuitBreakerInfo(
        name=self._name,
        state=self._state.get_state_enum().value,
        changed_at=self._changed_at,
        reopens=self._reopens,
        metrics=self._window.get_metric(),
    )

reset async

reset() -> None

Reset circuit breaker to CLOSED state and clear metrics.

Source code in fluxgate/circuitbreaker.py
812
813
814
815
async def reset(self) -> None:
    """Reset circuit breaker to CLOSED state and clear metrics."""
    async with self._state_lock:
        await self._transition_to(StateEnum.CLOSED)

disable async

disable() -> None

Disable circuit breaker (all calls pass through without tracking).

Source code in fluxgate/circuitbreaker.py
817
818
819
820
async def disable(self) -> None:
    """Disable circuit breaker (all calls pass through without tracking)."""
    async with self._state_lock:
        await self._transition_to(StateEnum.DISABLED)

metrics_only async

metrics_only() -> None

Enable metrics-only mode (track metrics but never open circuit).

Source code in fluxgate/circuitbreaker.py
822
823
824
825
async def metrics_only(self) -> None:
    """Enable metrics-only mode (track metrics but never open circuit)."""
    async with self._state_lock:
        await self._transition_to(StateEnum.METRICS_ONLY)

force_open async

force_open() -> None

Force circuit breaker to OPEN state (all calls blocked).

Source code in fluxgate/circuitbreaker.py
827
828
829
830
async def force_open(self) -> None:
    """Force circuit breaker to OPEN state (all calls blocked)."""
    async with self._state_lock:
        await self._transition_to(StateEnum.FORCED_OPEN)

States and Exceptions

These are the enums and exceptions you will most commonly interact with.

fluxgate.state.StateEnum

Bases: Enum

CLOSED class-attribute instance-attribute

CLOSED = 'closed'

OPEN class-attribute instance-attribute

OPEN = 'open'

HALF_OPEN class-attribute instance-attribute

HALF_OPEN = 'half_open'

METRICS_ONLY class-attribute instance-attribute

METRICS_ONLY = 'metrics_only'

DISABLED class-attribute instance-attribute

DISABLED = 'disabled'

FORCED_OPEN class-attribute instance-attribute

FORCED_OPEN = 'forced_open'

fluxgate.errors.CallNotPermittedError

CallNotPermittedError(message: str)

Bases: Exception

Raised when a call is not permitted due to circuit breaker state.

Source code in fluxgate/errors.py
4
5
6
def __init__(self, message: str) -> None:
    super().__init__(message)
    self.message = message

message instance-attribute

message = message