콘텐츠로 이동

API: Listener

이 페이지는 사용 가능한 Listener 구현을 문서화합니다. Listener를 사용하면 로깅, 모니터링 및 알림 시스템과 통합하여 Circuit Breaker 상태 변경에 반응할 수 있습니다.

Listener 사용 및 사용자 정의에 대한 개요는 Listener 컴포넌트 가이드를 참조하십시오.


로깅 Listener

fluxgate.listeners.log.LogListener

LogListener(
    logger: Logger | None = None,
    level_map: dict[StateEnum, int] | None = None,
)

Bases: IListener

Listener that logs circuit breaker state transitions.

Logs state changes using Python's standard logging module. Works with both CircuitBreaker and AsyncCircuitBreaker.

Parameters:

Name Type Description Default
logger Logger | None

Custom logger instance. If None, uses the root logger.

None
level_map dict[StateEnum, int] | None

Mapping from new_state to log level (logging.INFO, etc.). Default levels: OPEN/FORCED_OPEN -> WARNING, others -> INFO.

None
Note

logging methods are thread-safe and can be safely used in async contexts.

Examples:

Basic usage with default logger:

>>> import logging
>>> from fluxgate import CircuitBreaker
>>> from fluxgate.listeners.log import LogListener
>>>
>>> logging.basicConfig(level=logging.INFO)
>>> cb = CircuitBreaker(..., listeners=[LogListener()])

With custom logger:

>>> logger = logging.getLogger("my_app.circuit_breaker")
>>> cb = CircuitBreaker(..., listeners=[LogListener(logger=logger)])

With custom level_map:

>>> from fluxgate.state import StateEnum
>>> level_map = {
...     StateEnum.OPEN: logging.ERROR,
...     StateEnum.HALF_OPEN: logging.WARNING,
...     StateEnum.CLOSED: logging.DEBUG,
... }
>>> cb = CircuitBreaker(..., listeners=[LogListener(level_map=level_map)])
Source code in fluxgate/listeners/log.py
60
61
62
63
64
65
66
def __init__(
    self,
    logger: logging.Logger | None = None,
    level_map: dict[StateEnum, int] | None = None,
) -> None:
    self._logger = logger or logging.getLogger()
    self._level_map = {**self.DEFAULT_LEVEL_MAP, **(level_map or {})}

DEFAULT_LEVEL_MAP class-attribute instance-attribute

DEFAULT_LEVEL_MAP: dict[StateEnum, int] = {
    CLOSED: INFO,
    OPEN: WARNING,
    HALF_OPEN: INFO,
    METRICS_ONLY: INFO,
    DISABLED: INFO,
    FORCED_OPEN: WARNING,
}

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/log.py
68
69
70
71
72
73
74
75
76
77
78
def __call__(self, signal: Signal) -> None:
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(signal.timestamp))
    level = self._level_map.get(signal.new_state, logging.INFO)
    self._logger.log(
        level,
        "[%s] Circuit Breaker '%s' transitioned from %s to %s",
        timestamp,
        signal.circuit_name,
        signal.old_state.value,
        signal.new_state.value,
    )

Prometheus Listener

fluxgate.listeners.prometheus.PrometheusListener

Bases: IListener

Listener that exports circuit breaker metrics to Prometheus.

Exports two metrics:

  • circuit_breaker_state: Gauge showing current state (0 or 1 for each state)
  • circuit_breaker_state_transition: Counter of state transitions

Works with both CircuitBreaker and AsyncCircuitBreaker.

Note

prometheus_client is thread-safe and can be safely used in async contexts.

Examples:

>>> from prometheus_client import start_http_server
>>> from fluxgate import CircuitBreaker, AsyncCircuitBreaker
>>> from fluxgate.listeners.prometheus import PrometheusListener
>>>
>>> start_http_server(8000)
>>>
>>> cb = CircuitBreaker(..., listeners=[PrometheusListener()])
>>> async_cb = AsyncCircuitBreaker(..., listeners=[PrometheusListener()])
>>>
>>> # Metrics available at http://localhost:8000/metrics

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/prometheus.py
48
49
50
51
52
53
54
55
56
57
58
def __call__(self, signal: Signal) -> None:
    for state in StateEnum:
        _STATE_GAUGE.labels(
            circuit_name=signal.circuit_name,
            state=state.value,
        ).set(1 if state == signal.new_state else 0)
    _STATE_TRANSITION.labels(
        circuit_name=signal.circuit_name,
        old_state=signal.old_state.value,
        new_state=signal.new_state.value,
    ).inc()

Slack Listener

fluxgate.listeners.slack.SlackListener

SlackListener(channel: str, token: str)

Bases: IListener

Listener that sends circuit breaker state transitions to Slack.

Posts formatted messages to a Slack channel when state transitions occur. Groups related transitions into threads based on failure cycles:

  • Thread starts on → OPEN (new or continued failure cycle)
  • Thread ends on → CLOSED, DISABLED, or METRICS_ONLY

Parameters:

Name Type Description Default
channel str

Slack channel ID (e.g., "C1234567890") or name (e.g., "#alerts")

required
token str

Slack bot token with chat:write permissions

required
Class Attributes

TRANSITION_TEMPLATES: Override to customize messages for specific transitions. FALLBACK_TEMPLATE: Override to customize the default message for other transitions.

Examples:

>>> from fluxgate import CircuitBreaker
>>> from fluxgate.listeners.slack import SlackListener
>>>
>>> listener = SlackListener(
...     channel="C1234567890",
...     token="xoxb-your-slack-bot-token"
... )
>>>
>>> cb = CircuitBreaker(..., listeners=[listener])

To customize messages (e.g., for Korean):

>>> class KoreanSlackListener(SlackListener):
...     TRANSITION_TEMPLATES = {
...         (StateEnum.CLOSED, StateEnum.OPEN): {
...             "title": "🚨 서킷 브레이커 작동",
...             "color": "#FF4C4C",
...             "description": "요청 실패율이 임계값을 초과했습니다.",
...         },
...         # ... other transitions
...     }
...     FALLBACK_TEMPLATE = {
...         "title": "ℹ️ 서킷 브레이커 상태 변경",
...         "color": "#808080",
...         "description": "서킷 브레이커 상태가 변경되었습니다.",
...     }
Source code in fluxgate/listeners/slack.py
149
150
151
152
153
154
155
156
def __init__(self, channel: str, token: str) -> None:
    self._channel = channel
    self._token = token
    self._client = httpx.Client(
        headers={"Authorization": f"Bearer {token}"},
        timeout=5.0,
    )
    self._open_threads: dict[str, str] = {}

TRANSITION_TEMPLATES class-attribute

TRANSITION_TEMPLATES: dict[
    tuple[StateEnum, StateEnum], Template
] = _DEFAULT_TRANSITION_TEMPLATES

FALLBACK_TEMPLATE class-attribute

FALLBACK_TEMPLATE: Template = _DEFAULT_FALLBACK_TEMPLATE

__call__

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/slack.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def __call__(self, signal: Signal) -> None:
    template = self._get_template(signal.old_state, signal.new_state)
    message = _build_message(
        channel=self._channel,
        signal=signal,
        template=template,
        thread=self._open_threads.get(signal.circuit_name),
    )
    response = self._client.post(
        "https://slack.com/api/chat.postMessage", json=message
    )
    response.raise_for_status()
    data = response.json()
    ts = data.get("ts")
    if not data.get("ok") or not ts:
        raise RuntimeError(f"Failed to send message: {data.get('error')}")
    if signal.new_state == StateEnum.OPEN:
        self._open_threads.setdefault(signal.circuit_name, ts)
    elif signal.new_state in (
        StateEnum.CLOSED,
        StateEnum.DISABLED,
        StateEnum.METRICS_ONLY,
    ):
        self._open_threads.pop(signal.circuit_name, None)

fluxgate.listeners.slack.AsyncSlackListener

AsyncSlackListener(channel: str, token: str)

Bases: IAsyncListener

Async listener that sends circuit breaker state transitions to Slack.

Posts formatted messages to a Slack channel when state transitions occur. Groups related transitions into threads based on failure cycles:

  • Thread starts on → OPEN (new or continued failure cycle)
  • Thread ends on → CLOSED, DISABLED, or METRICS_ONLY

Parameters:

Name Type Description Default
channel str

Slack channel ID (e.g., "C1234567890") or name (e.g., "#alerts")

required
token str

Slack bot token with chat:write permissions

required
Class Attributes

TRANSITION_TEMPLATES: Override to customize messages for specific transitions. FALLBACK_TEMPLATE: Override to customize the default message for other transitions.

Note

Uses httpx for async HTTP requests.

Examples:

>>> from fluxgate import AsyncCircuitBreaker
>>> from fluxgate.listeners.slack import AsyncSlackListener
>>>
>>> listener = AsyncSlackListener(
...     channel="C1234567890",
...     token="xoxb-your-slack-bot-token"
... )
>>>
>>> cb = AsyncCircuitBreaker(..., listeners=[listener])
Source code in fluxgate/listeners/slack.py
227
228
229
230
231
232
233
234
def __init__(self, channel: str, token: str) -> None:
    self._channel = channel
    self._token = token
    self._client = httpx.AsyncClient(
        headers={"Authorization": f"Bearer {token}"},
        timeout=5.0,
    )
    self._open_threads: dict[str, str] = {}

TRANSITION_TEMPLATES class-attribute

TRANSITION_TEMPLATES: dict[
    tuple[StateEnum, StateEnum], Template
] = TRANSITION_TEMPLATES

FALLBACK_TEMPLATE class-attribute

FALLBACK_TEMPLATE: Template = FALLBACK_TEMPLATE

__call__ async

__call__(signal: Signal) -> None
Source code in fluxgate/listeners/slack.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def __call__(self, signal: Signal) -> None:
    template = self._get_template(signal.old_state, signal.new_state)
    message = _build_message(
        channel=self._channel,
        signal=signal,
        template=template,
        thread=self._open_threads.get(signal.circuit_name),
    )
    response = await self._client.post(
        "https://slack.com/api/chat.postMessage", json=message
    )
    response.raise_for_status()
    data = response.json()
    ts = data.get("ts")
    if not data.get("ok") or not ts:
        raise RuntimeError(f"Failed to send message: {data.get('error')}")
    if signal.new_state == StateEnum.OPEN:
        self._open_threads.setdefault(signal.circuit_name, ts)
    elif signal.new_state in (
        StateEnum.CLOSED,
        StateEnum.DISABLED,
        StateEnum.METRICS_ONLY,
    ):
        self._open_threads.pop(signal.circuit_name, None)