콘텐츠로 이동

Core

이 페이지는 사용자에게 노출되는 주요 클래스와 예외를 문서화합니다. 이는 Fluxgate 사용을 위한 주요 진입점입니다.


Circuit Breaker 클래스

이것은 Circuit Breaker를 생성하기 위해 인스턴스화할 주요 클래스입니다.

fluxgate.circuitbreaker.CircuitBreaker

CircuitBreaker(
    window: Window | None = None,
    tracker: Tracker | None = None,
    tripper: Tripper | None = None,
    retry: Retry | None = None,
    permit: Permit | None = None,
    listeners: Iterable[Listener] = (),
)

Synchronous circuit breaker implementation.

Protects your service from cascading failures by monitoring call failures and temporarily blocking calls when a failure threshold is reached.

The circuit breaker operates in three main states:

  • CLOSED: Normal operation, calls pass through
  • OPEN: Failure threshold exceeded, calls are blocked
  • HALF_OPEN: Testing if the service recovered, limited calls allowed

Parameters:

Name Type Description Default
window Window | None

Sliding window for metrics collection (default: CountWindow(100))

None
tracker Tracker | None

Determines which exceptions to track as failures (default: All())

None
tripper Tripper | None

Condition to open/close the circuit based on metrics (default: MinRequests(100) & FailureRate(0.5))

None
retry Retry | None

Strategy for transitioning from OPEN to HALF_OPEN (default: Cooldown(60.0))

None
permit Permit | None

Strategy for allowing calls in HALF_OPEN state (default: RampUp(0.1, 1.0, 60.0))

None
listeners Iterable[Listener]

Event listeners for state transitions (default: empty)

()

Examples:

Basic usage with defaults:

>>> cb = CircuitBreaker()
>>> @cb
... def call_api():
...     return requests.get("https://api.example.com")

Custom configuration with slow-call detection:

>>> cb = CircuitBreaker(
...     tracker=TypeOf(ConnectionError),
...     tripper=MinRequests(10) & (FailureRate(0.5) | SlowRate(0.3, threshold=1.0)),
... )
Note

This implementation is NOT thread-safe. Each process maintains its own independent circuit breaker state. For asyncio applications, use AsyncCircuitBreaker instead.

Source code in fluxgate/circuitbreaker.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def __init__(
    self,
    window: Window | None = None,
    tracker: Tracker | None = None,
    tripper: Tripper | None = None,
    retry: Retry | None = None,
    permit: Permit | None = None,
    listeners: Iterable[Listener] = (),
) -> None:
    self._window = window or CountWindow(100)
    self._tracker = tracker or All()
    self._tripper = tripper or MinRequests(100) & FailureRate(0.5)
    self._retry = retry or Cooldown(60.0)
    self._permit = permit or RampUp(0.1, 1.0, 60.0)
    self._listeners = tuple(listeners)
    self._slow_thresholds = _collect_slow_thresholds(self._tripper)
    self._changed_at = time.time()
    self._reopens = 0
    self._consecutive_failures = 0
    self._handlers: dict[State, CircuitBreaker._Handler] = {
        "closed": self._Closed(self),
        "open": self._Open(self),
        "half_open": self._HalfOpen(self),
        "metrics_only": self._MetricsOnly(self),
        "disabled": self._Disabled(self),
        "forced_open": self._ForcedOpen(self),
    }
    self._state: CircuitBreaker._Handler = self._handlers["closed"]

__call__

__call__(func: Callable[P, R]) -> Callable[P, R]
__call__(
    func: None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[[Callable[P, R]], Callable[P, R]]
__call__(
    func: Callable[P, R] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, R]
    | Callable[[Callable[P, R]], Callable[P, R]]
)

Decorate a function with circuit breaker protection.

Examples:

>>> @cb
... def api_call():
...     return requests.get("https://api.example.com")
>>> @cb(fallback=lambda e: cached_value)
... def api_call():
...     return requests.get("https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, R] | None

Function to protect

None
fallback Callable[[Exception], R] | None

Optional function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

None

Returns:

Type Description
Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]

Wrapped function with circuit breaker behavior

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN (if no fallback)

Source code in fluxgate/circuitbreaker.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def __call__(
    self,
    func: Callable[P, R] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
    """Decorate a function with circuit breaker protection.

    Examples:
        >>> @cb
        ... def api_call():
        ...     return requests.get("https://api.example.com")

        >>> @cb(fallback=lambda e: cached_value)
        ... def api_call():
        ...     return requests.get("https://api.example.com")

    Args:
        func: Function to protect
        fallback: Optional function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.

    Returns:
        Wrapped function with circuit breaker behavior

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN (if no fallback)
    """

    def decorator(f: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(f)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            try:
                return self._state.execute(f, *args, **kwargs)
            except Exception as e:
                if fallback is not None:
                    return fallback(e)
                raise

        return wrapper

    if func is not None:
        return decorator(func)
    return decorator

call

call(
    func: Callable[P, R], /, *args: args, **kwargs: kwargs
) -> R

Execute a function with circuit breaker protection.

Examples:

>>> cb.call(requests.get, "https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, R]

Function to execute

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN

Source code in fluxgate/circuitbreaker.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def call(
    self,
    func: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute a function with circuit breaker protection.

    Examples:
        >>> cb.call(requests.get, "https://api.example.com")

    Args:
        func: Function to execute
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN
    """
    return self._state.execute(func, *args, **kwargs)

call_with_fallback

call_with_fallback(
    func: Callable[P, R],
    fallback: Callable[[Exception], R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute a function with circuit breaker protection and fallback.

Examples:

>>> cb.call_with_fallback(fetch_data, lambda e: cached_data)

Parameters:

Name Type Description Default
func Callable[P, R]

Function to execute

required
fallback Callable[[Exception], R]

Function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result or fallback result

Source code in fluxgate/circuitbreaker.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def call_with_fallback(
    self,
    func: Callable[P, R],
    fallback: Callable[[Exception], R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute a function with circuit breaker protection and fallback.

    Examples:
        >>> cb.call_with_fallback(fetch_data, lambda e: cached_data)

    Args:
        func: Function to execute
        fallback: Function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result or fallback result
    """
    try:
        return self._state.execute(func, *args, **kwargs)
    except Exception as e:
        return fallback(e)

info

info() -> CircuitBreakerInfo

Get current circuit breaker state and metrics.

Returns:

Type Description
CircuitBreakerInfo

CircuitBreakerInfo with state, changed_at, reopens, and metrics.

Source code in fluxgate/circuitbreaker.py
380
381
382
383
384
385
386
387
388
389
390
391
def info(self) -> CircuitBreakerInfo:
    """Get current circuit breaker state and metrics.

    Returns:
        CircuitBreakerInfo with state, changed_at, reopens, and metrics.
    """
    return CircuitBreakerInfo(
        state=self._state.state,
        changed_at=self._changed_at,
        reopens=self._reopens,
        metrics=self._window.get_metric(),
    )

reset

reset() -> None

Reset circuit breaker to CLOSED state and clear metrics.

Source code in fluxgate/circuitbreaker.py
393
394
395
def reset(self) -> None:
    """Reset circuit breaker to CLOSED state and clear metrics."""
    self._transition_to("closed")

disable

disable() -> None

Disable circuit breaker (all calls pass through without tracking).

Source code in fluxgate/circuitbreaker.py
397
398
399
def disable(self) -> None:
    """Disable circuit breaker (all calls pass through without tracking)."""
    self._transition_to("disabled")

metrics_only

metrics_only() -> None

Enable metrics-only mode (track metrics but never open circuit).

Source code in fluxgate/circuitbreaker.py
401
402
403
def metrics_only(self) -> None:
    """Enable metrics-only mode (track metrics but never open circuit)."""
    self._transition_to("metrics_only")

force_open

force_open() -> None

Force circuit breaker to OPEN state (all calls blocked).

Source code in fluxgate/circuitbreaker.py
405
406
407
def force_open(self) -> None:
    """Force circuit breaker to OPEN state (all calls blocked)."""
    self._transition_to("forced_open")

fluxgate.circuitbreaker.AsyncCircuitBreaker

AsyncCircuitBreaker(
    window: Window | None = None,
    tracker: Tracker | None = None,
    tripper: Tripper | None = None,
    retry: Retry | None = None,
    permit: Permit | None = None,
    max_half_open_calls: int = 10,
    listeners: Iterable[Listener | AsyncListener] = (),
)

Asynchronous circuit breaker implementation for asyncio applications.

Coordinates concurrent calls within a single event loop using one asyncio.Lock for all internal state mutations, plus an asyncio.Semaphore to cap concurrent HALF_OPEN trial calls.

The circuit breaker operates in three main states:

  • CLOSED: Normal operation, calls pass through
  • OPEN: Failure threshold exceeded, calls are blocked
  • HALF_OPEN: Testing if the service recovered, limited calls allowed

Parameters:

Name Type Description Default
window Window | None

Sliding window for metrics collection (default: CountWindow(100))

None
tracker Tracker | None

Determines which exceptions to track as failures (default: All())

None
tripper Tripper | None

Condition to open/close the circuit based on metrics (default: MinRequests(100) & FailureRate(0.5))

None
retry Retry | None

Strategy for transitioning from OPEN to HALF_OPEN (default: Cooldown(60.0))

None
permit Permit | None

Strategy for allowing calls in HALF_OPEN state (default: RampUp(0.1, 1.0, 60.0))

None
max_half_open_calls int

Maximum concurrent calls allowed in HALF_OPEN state (default: 10)

10
listeners Iterable[Listener | AsyncListener]

Event listeners for state transitions (default: empty)

()

Examples:

Basic usage with defaults:

>>> cb = AsyncCircuitBreaker()
>>> @cb
... async def call_api():
...     async with httpx.AsyncClient() as client:
...         return await client.get("https://api.example.com")

Custom configuration with slow-call detection:

>>> cb = AsyncCircuitBreaker(
...     tracker=TypeOf(httpx.ConnectError),
...     tripper=MinRequests(10) & (FailureRate(0.5) | SlowRate(0.3, threshold=1.0)),
... )
Concurrency model

All state mutations (window record, counter updates, state transitions) happen under a single asyncio.Lock. The protected callable and listener notifications run outside the lock, so the critical section is small and await never holds the lock.

Each call captures its handler on entry. When its outcome is applied, the handler checks self is cb._state and discards the outcome if the state has changed in the meantime, so a transition cannot pollute the next state's metric window. The only race that slips through is a regression back to the same handler instance (e.g. closed → open → half_open → closed); since the window is reset on every transition, at most one stale sample may then land in the new window. Pair small windows with MinRequests if a single stale sample could flip your tripper.

Note

Each process maintains its own independent circuit breaker state.

Source code in fluxgate/circuitbreaker.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
def __init__(
    self,
    window: Window | None = None,
    tracker: Tracker | None = None,
    tripper: Tripper | None = None,
    retry: Retry | None = None,
    permit: Permit | None = None,
    max_half_open_calls: int = 10,
    listeners: Iterable[Listener | AsyncListener] = (),
) -> None:
    self._window = window or CountWindow(100)
    self._tracker = tracker or All()
    self._tripper = tripper or MinRequests(100) & FailureRate(0.5)
    self._retry = retry or Cooldown(60.0)
    self._permit = permit or RampUp(0.1, 1.0, 60.0)
    self._listeners = tuple(listeners)
    self._slow_thresholds = _collect_slow_thresholds(self._tripper)
    self._changed_at = time.time()
    self._reopens = 0
    self._consecutive_failures = 0
    self._handlers: dict[State, AsyncCircuitBreaker._Handler] = {
        "closed": self._Closed(self),
        "open": self._Open(self),
        "half_open": self._HalfOpen(self),
        "metrics_only": self._MetricsOnly(self),
        "disabled": self._Disabled(self),
        "forced_open": self._ForcedOpen(self),
    }
    self._state: AsyncCircuitBreaker._Handler = self._handlers["closed"]
    self._lock = asyncio.Lock()
    self._half_open_semaphore = asyncio.Semaphore(max_half_open_calls)

__call__

__call__(
    func: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]
__call__(
    func: None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> Callable[
    [Callable[P, Awaitable[R]]],
    Callable[P, Awaitable[R]],
]
__call__(
    func: Callable[P, Awaitable[R]] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, Awaitable[R]]
    | Callable[
        [Callable[P, Awaitable[R]]],
        Callable[P, Awaitable[R]],
    ]
)

Decorate an awaitable function with circuit breaker protection.

Examples:

>>> @cb
... async def api_call():
...     async with httpx.AsyncClient() as client:
...         return await client.get("https://api.example.com")
>>> @cb(fallback=lambda e: cached_value)
... async def api_call():
...     return await fetch_data()

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]] | None

Awaitable function to protect

None
fallback Callable[[Exception], R] | None

Optional function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

None

Returns:

Type Description
Callable[P, Awaitable[R]] | Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]

Wrapped awaitable function with circuit breaker behavior

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN (if no fallback)

Source code in fluxgate/circuitbreaker.py
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
def __call__(
    self,
    func: Callable[P, Awaitable[R]] | None = None,
    *,
    fallback: Callable[[Exception], R] | None = None,
) -> (
    Callable[P, Awaitable[R]]
    | Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]
):
    """Decorate an awaitable function with circuit breaker protection.

    Examples:
        >>> @cb
        ... async def api_call():
        ...     async with httpx.AsyncClient() as client:
        ...         return await client.get("https://api.example.com")

        >>> @cb(fallback=lambda e: cached_value)
        ... async def api_call():
        ...     return await fetch_data()

    Args:
        func: Awaitable function to protect
        fallback: Optional function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.

    Returns:
        Wrapped awaitable function with circuit breaker behavior

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN (if no fallback)
    """

    def decorator(
        f: Callable[P, Awaitable[R]],
    ) -> Callable[P, Awaitable[R]]:
        @functools.wraps(f)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            try:
                return await self._state.execute(f, *args, **kwargs)
            except Exception as e:
                if fallback is not None:
                    return fallback(e)
                raise

        return wrapper

    if func is not None:
        return decorator(func)
    return decorator

call async

call(
    func: Callable[P, Awaitable[R]],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute an awaitable function with circuit breaker protection.

Examples:

>>> await cb.call(client.get, "https://api.example.com")

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]]

Awaitable function to execute

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result

Raises:

Type Description
CallNotPermittedError

When circuit is OPEN or FORCED_OPEN

Source code in fluxgate/circuitbreaker.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
async def call(
    self,
    func: Callable[P, Awaitable[R]],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute an awaitable function with circuit breaker protection.

    Examples:
        >>> await cb.call(client.get, "https://api.example.com")

    Args:
        func: Awaitable function to execute
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result

    Raises:
        CallNotPermittedError: When circuit is OPEN or FORCED_OPEN
    """
    return await self._state.execute(func, *args, **kwargs)

call_with_fallback async

call_with_fallback(
    func: Callable[P, Awaitable[R]],
    fallback: Callable[[Exception], R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> R

Execute an awaitable function with circuit breaker protection and fallback.

Examples:

>>> await cb.call_with_fallback(fetch_data, lambda e: cached_data)

Parameters:

Name Type Description Default
func Callable[P, Awaitable[R]]

Awaitable function to execute

required
fallback Callable[[Exception], R]

Function to call on exception. Receives the exception as argument and should return a fallback value or re-raise.

required
*args args

Positional arguments for the function

()
**kwargs kwargs

Keyword arguments for the function

{}

Returns:

Type Description
R

Function result or fallback result

Source code in fluxgate/circuitbreaker.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
async def call_with_fallback(
    self,
    func: Callable[P, Awaitable[R]],
    fallback: Callable[[Exception], R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> R:
    """Execute an awaitable function with circuit breaker protection and fallback.

    Examples:
        >>> await cb.call_with_fallback(fetch_data, lambda e: cached_data)

    Args:
        func: Awaitable function to execute
        fallback: Function to call on exception. Receives the exception
            as argument and should return a fallback value or re-raise.
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        Function result or fallback result
    """
    try:
        return await self._state.execute(func, *args, **kwargs)
    except Exception as e:
        return fallback(e)

info

info() -> CircuitBreakerInfo

Get current circuit breaker state and metrics.

Returns:

Type Description
CircuitBreakerInfo

CircuitBreakerInfo with state, changed_at, reopens, and metrics.

Source code in fluxgate/circuitbreaker.py
853
854
855
856
857
858
859
860
861
862
863
864
def info(self) -> CircuitBreakerInfo:
    """Get current circuit breaker state and metrics.

    Returns:
        CircuitBreakerInfo with state, changed_at, reopens, and metrics.
    """
    return CircuitBreakerInfo(
        state=self._state.state,
        changed_at=self._changed_at,
        reopens=self._reopens,
        metrics=self._window.get_metric(),
    )

reset async

reset() -> None

Reset circuit breaker to CLOSED state and clear metrics.

Source code in fluxgate/circuitbreaker.py
866
867
868
async def reset(self) -> None:
    """Reset circuit breaker to CLOSED state and clear metrics."""
    await self._command("closed")

disable async

disable() -> None

Disable circuit breaker (all calls pass through without tracking).

Source code in fluxgate/circuitbreaker.py
870
871
872
async def disable(self) -> None:
    """Disable circuit breaker (all calls pass through without tracking)."""
    await self._command("disabled")

metrics_only async

metrics_only() -> None

Enable metrics-only mode (track metrics but never open circuit).

Source code in fluxgate/circuitbreaker.py
874
875
876
async def metrics_only(self) -> None:
    """Enable metrics-only mode (track metrics but never open circuit)."""
    await self._command("metrics_only")

force_open async

force_open() -> None

Force circuit breaker to OPEN state (all calls blocked).

Source code in fluxgate/circuitbreaker.py
878
879
880
async def force_open(self) -> None:
    """Force circuit breaker to OPEN state (all calls blocked)."""
    await self._command("forced_open")

상태 및 예외

이것은 가장 일반적으로 상호 작용할 열거형(enum) 및 예외입니다.

fluxgate.state.State module-attribute

State = Literal[
    "closed",
    "open",
    "half_open",
    "metrics_only",
    "disabled",
    "forced_open",
]

fluxgate.errors.CallNotPermittedError

Bases: Exception

Raised when a call is not permitted due to circuit breaker state.