콘텐츠로 이동

API: Listener

이 페이지는 사용 가능한 Listener 구현을 문서화합니다. Listener를 사용하면 로깅, 모니터링 및 알림 시스템과 통합하여 Circuit Breaker 상태 변경에 반응할 수 있습니다.

Listener 사용 및 사용자 정의에 대한 개요는 Listener 컴포넌트 가이드를 참조하십시오.


로깅 Listener

fluxgate.listeners.log.LogListener

LogListener(
    name: str,
    logger: Logger | None = None,
    level_map: dict[State, int] | None = None,
)

Bases: Listener

Listener that logs circuit breaker state transitions.

Logs state changes using Python's standard logging module. Works with both CircuitBreaker and AsyncCircuitBreaker.

Parameters:

Name Type Description Default
name str

Identifier for the circuit, included in every log line.

required
logger Logger | None

Custom logger instance. If None, uses logging.getLogger("fluxgate.listeners.log") so logging.getLogger("fluxgate").setLevel(...) scopes correctly.

None
level_map dict[State, int] | None

Mapping from new_state to log level (logging.INFO, etc.). Default levels: OPEN/FORCED_OPEN -> WARNING, others -> INFO.

None
Note

logging methods are thread-safe and can be safely used in async contexts.

Examples:

Basic usage with default logger:

>>> import logging
>>> from fluxgate import CircuitBreaker
>>> from fluxgate.listeners.log import LogListener
>>>
>>> logging.basicConfig(level=logging.INFO)
>>> cb = CircuitBreaker(listeners=[LogListener(name="api")])

With custom logger:

>>> logger = logging.getLogger("my_app.circuit_breaker")
>>> cb = CircuitBreaker(
...     listeners=[LogListener(name="api", logger=logger)]
... )

With custom level_map:

>>> level_map = {
...     "open": logging.ERROR,
...     "half_open": logging.WARNING,
...     "closed": logging.DEBUG,
... }
>>> cb = CircuitBreaker(
...     listeners=[LogListener(name="api", level_map=level_map)]
... )
Source code in fluxgate/listeners/log.py
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    name: str,
    logger: logging.Logger | None = None,
    level_map: dict[State, int] | None = None,
) -> None:
    self._name = name
    self._logger = logger or logging.getLogger(__name__)
    self._level_map = {**self.DEFAULT_LEVEL_MAP, **(level_map or {})}

DEFAULT_LEVEL_MAP class-attribute instance-attribute

DEFAULT_LEVEL_MAP: dict[State, int] = {
    "closed": INFO,
    "open": WARNING,
    "half_open": INFO,
    "metrics_only": INFO,
    "disabled": INFO,
    "forced_open": WARNING,
}

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/log.py
76
77
78
79
80
81
82
83
84
85
86
def __call__(self, signal: Signal) -> None:
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(signal.timestamp))
    level = self._level_map.get(signal.new_state, logging.INFO)
    self._logger.log(
        level,
        "[%s] Circuit Breaker '%s' transitioned from %s to %s",
        timestamp,
        self._name,
        signal.old_state,
        signal.new_state,
    )

Prometheus Listener

fluxgate.listeners.prometheus.PrometheusListener

PrometheusListener(
    name: str, registry: CollectorRegistry | None = None
)

Bases: Listener

Listener that exports circuit breaker metrics to Prometheus.

Exports two metrics:

  • circuit_breaker_state: Gauge showing current state (0 or 1 for each state)
  • circuit_breaker_state_transition: Counter of state transitions

Works with both CircuitBreaker and AsyncCircuitBreaker.

Parameters:

Name Type Description Default
name str

Identifier used as the circuit_name label on emitted metrics.

required
registry CollectorRegistry | None

Optional CollectorRegistry to register metrics into. None (default) writes to the module-level metrics on the default prometheus_client.REGISTRY. Pass a dedicated registry to isolate fluxgate's metrics under importlib.reload / uvicorn --reload, or when another component already exports a metric named circuit_breaker_state.

None
Note

prometheus_client is thread-safe and can be safely used in async contexts.

Examples:

>>> from prometheus_client import start_http_server
>>> from fluxgate import CircuitBreaker, AsyncCircuitBreaker
>>> from fluxgate.listeners.prometheus import PrometheusListener
>>>
>>> start_http_server(8000)
>>>
>>> cb = CircuitBreaker(listeners=[PrometheusListener(name="api")])
>>> async_cb = AsyncCircuitBreaker(
...     listeners=[PrometheusListener(name="api")]
... )
>>>
>>> # Metrics available at http://localhost:8000/metrics
Source code in fluxgate/listeners/prometheus.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    name: str,
    registry: CollectorRegistry | None = None,
) -> None:
    self._name = name
    if registry is None:
        self._gauge = _STATE_GAUGE
        self._counter = _STATE_TRANSITION
    else:
        self._gauge = Gauge(
            name="circuit_breaker_state",
            documentation="Current state of the circuit breaker",
            labelnames=["circuit_name", "state"],
            registry=registry,
        )
        self._counter = Counter(
            name="circuit_breaker_state_transition",
            documentation="Count of state transitions for circuit breakers",
            labelnames=["circuit_name", "old_state", "new_state"],
            registry=registry,
        )

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/prometheus.py
86
87
88
89
90
91
92
93
94
95
96
def __call__(self, signal: Signal) -> None:
    for state in _ALL_STATES:
        self._gauge.labels(
            circuit_name=self._name,
            state=state,
        ).set(1 if state == signal.new_state else 0)
    self._counter.labels(
        circuit_name=self._name,
        old_state=signal.old_state,
        new_state=signal.new_state,
    ).inc()

close

close() -> None

Drop every labelset this listener registered. Idempotent.

Source code in fluxgate/listeners/prometheus.py
 98
 99
100
101
102
103
104
def close(self) -> None:
    """Drop every labelset this listener registered. Idempotent."""
    for state in _ALL_STATES:
        self._gauge.remove(self._name, state)
    for old_state in _ALL_STATES:
        for new_state in _ALL_STATES:
            self._counter.remove(self._name, old_state, new_state)

Slack Listener

fluxgate.listeners.slack.SlackListener

SlackListener(name: str, channel: str, token: str)

Bases: _SlackBase, Listener

Listener that sends circuit breaker state transitions to Slack.

Posts formatted messages to a Slack channel when state transitions occur. Groups related transitions into threads based on failure cycles:

  • Thread starts on → OPEN (new or continued failure cycle)
  • Thread ends on → CLOSED, DISABLED, or METRICS_ONLY

Parameters:

Name Type Description Default
name str

Identifier shown in the Slack message body.

required
channel str

Slack channel ID (e.g., "C1234567890") or name (e.g., "#alerts")

required
token str

Slack bot token with chat:write permissions

required
Class Attributes

TRANSITION_TEMPLATES: Override to customize messages for specific transitions. FALLBACK_TEMPLATE: Override to customize the default message for other transitions.

Examples:

>>> from fluxgate import CircuitBreaker
>>> from fluxgate.listeners.slack import SlackListener
>>>
>>> listener = SlackListener(
...     name="api",
...     channel="C1234567890",
...     token="xoxb-your-slack-bot-token"
... )
>>>
>>> cb = CircuitBreaker(listeners=[listener])

To customize messages (e.g., for Korean):

>>> class KoreanSlackListener(SlackListener):
...     TRANSITION_TEMPLATES = {
...         ("closed", "open"): {
...             "title": "🚨 서킷 브레이커 작동",
...             "color": "#FF4C4C",
...             "description": "요청 실패율이 임계값을 초과했습니다.",
...         },
...         # ... other transitions
...     }
...     FALLBACK_TEMPLATE = {
...         "title": "ℹ️ 서킷 브레이커 상태 변경",
...         "color": "#808080",
...         "description": "서킷 브레이커 상태가 변경되었습니다.",
...     }
Source code in fluxgate/listeners/slack.py
191
192
193
194
195
196
def __init__(self, name: str, channel: str, token: str) -> None:
    super().__init__(name, channel)
    self._client = httpx.Client(
        headers={"Authorization": f"Bearer {token}"},
        timeout=5.0,
    )

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/slack.py
198
199
200
201
202
def __call__(self, signal: Signal) -> None:
    payload = self._build_payload(signal)
    response = self._client.post(_SLACK_POST_URL, json=payload)
    response.raise_for_status()
    self._consume_response(signal, response.json())

fluxgate.listeners.slack.AsyncSlackListener

AsyncSlackListener(name: str, channel: str, token: str)

Bases: _SlackBase, AsyncListener

Async listener that sends circuit breaker state transitions to Slack.

Posts formatted messages to a Slack channel when state transitions occur. Groups related transitions into threads based on failure cycles:

  • Thread starts on → OPEN (new or continued failure cycle)
  • Thread ends on → CLOSED, DISABLED, or METRICS_ONLY

Parameters:

Name Type Description Default
name str

Identifier shown in the Slack message body.

required
channel str

Slack channel ID (e.g., "C1234567890") or name (e.g., "#alerts")

required
token str

Slack bot token with chat:write permissions

required
Class Attributes

TRANSITION_TEMPLATES: Override to customize messages for specific transitions. FALLBACK_TEMPLATE: Override to customize the default message for other transitions.

Note

Uses httpx for async HTTP requests.

Examples:

>>> from fluxgate import AsyncCircuitBreaker
>>> from fluxgate.listeners.slack import AsyncSlackListener
>>>
>>> listener = AsyncSlackListener(
...     name="api",
...     channel="C1234567890",
...     token="xoxb-your-slack-bot-token"
... )
>>>
>>> cb = AsyncCircuitBreaker(listeners=[listener])
Source code in fluxgate/listeners/slack.py
239
240
241
242
243
244
def __init__(self, name: str, channel: str, token: str) -> None:
    super().__init__(name, channel)
    self._client = httpx.AsyncClient(
        headers={"Authorization": f"Bearer {token}"},
        timeout=5.0,
    )

__call__ async

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/slack.py
246
247
248
249
250
async def __call__(self, signal: Signal) -> None:
    payload = self._build_payload(signal)
    response = await self._client.post(_SLACK_POST_URL, json=payload)
    response.raise_for_status()
    self._consume_response(signal, response.json())

fluxgate.listeners.slack.Template

Bases: TypedDict

title instance-attribute

title: str

color instance-attribute

color: str

description instance-attribute

description: str