Skip to content

Handlers Module

The logkiss handlers module contains handler classes for sending logs to various output destinations.

logkiss.handlers

Handlers module for logkiss logging.

Copyright (c) 2025 Taka Suzuki SPDX-License-Identifier: MIT See LICENSE for details.

This module contains various handlers used in logkiss. Main classes: - BaseHandler: Base handler class for implementing custom handlers - AWSCloudWatchHandler: Handler for sending logs to AWS CloudWatch Logs

Classes

AWSCloudWatchHandler

Bases: Handler

AWS CloudWatch Logs handler

Source code in logkiss/handlers.py
class AWSCloudWatchHandler(logging.Handler):
    """
    AWS CloudWatch Logs handler
    """

    def __init__(
        self,
        log_group_name: str,
        log_stream_name: Optional[str] = None,
        region_name: Optional[str] = None,
        batch_size: int = 100,
        flush_interval: float = 5.0,
    ) -> None:
        """
        Args:
            log_group_name: Log group name
            log_stream_name: Log stream name. If None, instance ID will be used
            region_name: Region name. If None, it will be automatically detected from environment variables
            batch_size: Batch size
            flush_interval: Flush interval in seconds
        """
        if not AWS_AVAILABLE:
            raise ImportError(
                "boto3 package is required. "
                "Install it with: pip install 'logkiss[cloud]'"
            )

        # 先に基底クラスの初期化
        super().__init__()

        # 属性を初期化して、初期化失敗時のエラーを防ぐ
        self._batch = []
        self._batch_lock = threading.Lock()
        self._batch_size = batch_size
        self._flush_interval = flush_interval
        self._sequence_token = None
        self._executor = None
        self._running = False
        self._flush_thread = None

        try:
            # AWS CloudWatch Logsクライアントを初期化
            self.client = boto3.client("logs", region_name=region_name)
            self.log_group_name = log_group_name

            if log_stream_name is None:
                # Try to determine an appropriate log stream name
                # First check if we're running on EC2
                log_stream_name = self._get_instance_identifier()

            self.log_stream_name = log_stream_name
            self._ensure_log_group_and_stream()

            # 定期的なフラッシュを開始
            self._start_periodic_flush()
        except Exception as e:
            import sys
            print(f"Error initializing AWSCloudWatchHandler: {e}", file=sys.stderr)
            # 初期化に失敗した場合は、runningフラグをFalseにして、スレッドが開始されないようにする
            self._running = False
            raise

    def _get_instance_identifier(self) -> str:
        """
        Get instance identifier for log stream name
        """
        try:
            # Try to get EC2 instance ID
            import requests
            response = requests.get(
                "http://169.254.169.254/latest/meta-data/instance-id",
                timeout=0.1
            )
            if response.status_code == 200:
                return response.text
        except Exception:
            pass

        # Fallback to hostname
        import socket
        return socket.gethostname()

    def _ensure_log_group_and_stream(self) -> None:
        """
        Ensure log group and stream exist
        """
        # Create log group if it doesn't exist
        try:
            self.client.create_log_group(logGroupName=self.log_group_name)
        except self.client.exceptions.ResourceAlreadyExistsException:
            pass

        # Create log stream if it doesn't exist
        try:
            self.client.create_log_stream(
                logGroupName=self.log_group_name,
                logStreamName=self.log_stream_name
            )
        except self.client.exceptions.ResourceAlreadyExistsException:
            # Get sequence token
            response = self.client.describe_log_streams(
                logGroupName=self.log_group_name,
                logStreamNamePrefix=self.log_stream_name,
                limit=1
            )

            for stream in response.get("logStreams", []):
                if stream.get("logStreamName") == self.log_stream_name:
                    self._sequence_token = stream.get("uploadSequenceToken")
                    break

    def _start_periodic_flush(self):
        """Start a background thread to periodically flush the batch."""
        if self._flush_thread is not None and self._flush_thread.is_alive():
            return  # すでに実行中のスレッドがある場合は何もしない

        self._running = True
        self._flush_thread = threading.Thread(
            target=self._periodic_flush_worker,
            daemon=True,  # デーモンスレッドとして実行(プログラム終了時に自動終了)
        )
        self._flush_thread.start()

    def _periodic_flush_worker(self):
        """Worker function for the periodic flush thread."""
        while self._running:
            try:
                # ここでの二重チェックは重要
                if not self._running:
                    break
                self._flush()
            except Exception as e:
                import sys
                print(f"Error in periodic flush: {e}", file=sys.stderr)
                # エラーが発生しても継続する

            # スリープ前に終了フラグを確認
            if not self._running:
                break

            # 次の実行までスリープ
            time.sleep(self._flush_interval)

    def emit(self, record: logging.LogRecord) -> None:
        """Process log record"""
        if not self._running:
            return

        try:
            # ログレコードからメッセージを取得
            message = self.format(record)

            # タイムスタンプを取得(ミリ秒単位)
            timestamp = int(record.created * 1000)

            # エントリを作成
            entry = {
                "timestamp": timestamp,
                "message": message,
            }

            # exc_info=Trueが指定された場合のスタックトレース情報を追加
            if record.exc_info:
                import traceback
                import json
                # JSONとして追加情報を埋め込む
                entry["message"] += "\nStack Trace: " + json.dumps({
                    "stack_trace": traceback.format_exception(*record.exc_info)
                })

            # バッチに追加
            with self._batch_lock:
                self._batch.append(entry)

                # バッチサイズに達したらフラッシュ
                if len(self._batch) >= self._batch_size:
                    self._flush()
        except Exception as e:
            import sys
            print(f"Error in AWSCloudWatchHandler.emit: {e}", file=sys.stderr)

    def _flush(self) -> None:
        """Flush batch"""
        if not self._running:
            return

        entries = []
        with self._batch_lock:
            if not self._batch:
                return

            entries = self._batch
            self._batch = []

        if not entries:
            return

        # Sort entries by timestamp
        entries.sort(key=lambda x: x["timestamp"])

        # Convert to CloudWatch Logs format
        log_events = [
            {
                "timestamp": entry["timestamp"],
                "message": entry["message"]
            }
            for entry in entries
        ]

        # Send to CloudWatch Logs
        try:
            kwargs = {
                "logGroupName": self.log_group_name,
                "logStreamName": self.log_stream_name,
                "logEvents": log_events
            }

            if self._sequence_token:
                kwargs["sequenceToken"] = self._sequence_token

            response = self.client.put_log_events(**kwargs)
            self._sequence_token = response.get("nextSequenceToken")
        except self.client.exceptions.InvalidSequenceTokenException as e:
            # Get the correct sequence token from the error message
            import re
            match = re.search(r"sequenceToken is: (\S+)", str(e))
            if match:
                self._sequence_token = match.group(1)
                # Retry with the correct sequence token
                self._flush()
        except Exception as e:
            import sys
            print(f"Error writing to CloudWatch Logs: {e}", file=sys.stderr)
            # Put the entries back in the batch
            with self._batch_lock:
                self._batch = entries + self._batch

    def close(self):
        """
        Close the handler and release all resources.
        """
        if not hasattr(self, '_running'):
            # 初期化が完了していない場合は何もしない
            super().close()
            return

        try:
            # スレッドを停止
            self._running = False

            # 最後の一回フラッシュを試みる
            try:
                self._flush()
            except Exception as e:
                import sys
                print(f"Error in final flush: {e}", file=sys.stderr)

            # スレッドが存在し、実行中であれば、終了を待つ(最大1秒)
            if hasattr(self, '_flush_thread') and self._flush_thread is not None:
                if self._flush_thread.is_alive():
                    self._flush_thread.join(timeout=1.0)
        except Exception as e:
            import sys
            print(f"Error closing handler: {e}", file=sys.stderr)
        finally:
            # 親クラスのcloseメソッドを呼び出す
            super().close()

    def __del__(self):
        """Cleanup when the handler is deleted"""
        try:
            if hasattr(self, '_running') and self._running:
                self.close()
        except Exception:
            # __del__内では例外を無視
            pass
Functions
__del__()

Cleanup when the handler is deleted

Source code in logkiss/handlers.py
def __del__(self):
    """Cleanup when the handler is deleted"""
    try:
        if hasattr(self, '_running') and self._running:
            self.close()
    except Exception:
        # __del__内では例外を無視
        pass
__init__(log_group_name, log_stream_name=None, region_name=None, batch_size=100, flush_interval=5.0)

Parameters:

Name Type Description Default
log_group_name str

Log group name

required
log_stream_name Optional[str]

Log stream name. If None, instance ID will be used

None
region_name Optional[str]

Region name. If None, it will be automatically detected from environment variables

None
batch_size int

Batch size

100
flush_interval float

Flush interval in seconds

5.0
Source code in logkiss/handlers.py
def __init__(
    self,
    log_group_name: str,
    log_stream_name: Optional[str] = None,
    region_name: Optional[str] = None,
    batch_size: int = 100,
    flush_interval: float = 5.0,
) -> None:
    """
    Args:
        log_group_name: Log group name
        log_stream_name: Log stream name. If None, instance ID will be used
        region_name: Region name. If None, it will be automatically detected from environment variables
        batch_size: Batch size
        flush_interval: Flush interval in seconds
    """
    if not AWS_AVAILABLE:
        raise ImportError(
            "boto3 package is required. "
            "Install it with: pip install 'logkiss[cloud]'"
        )

    # 先に基底クラスの初期化
    super().__init__()

    # 属性を初期化して、初期化失敗時のエラーを防ぐ
    self._batch = []
    self._batch_lock = threading.Lock()
    self._batch_size = batch_size
    self._flush_interval = flush_interval
    self._sequence_token = None
    self._executor = None
    self._running = False
    self._flush_thread = None

    try:
        # AWS CloudWatch Logsクライアントを初期化
        self.client = boto3.client("logs", region_name=region_name)
        self.log_group_name = log_group_name

        if log_stream_name is None:
            # Try to determine an appropriate log stream name
            # First check if we're running on EC2
            log_stream_name = self._get_instance_identifier()

        self.log_stream_name = log_stream_name
        self._ensure_log_group_and_stream()

        # 定期的なフラッシュを開始
        self._start_periodic_flush()
    except Exception as e:
        import sys
        print(f"Error initializing AWSCloudWatchHandler: {e}", file=sys.stderr)
        # 初期化に失敗した場合は、runningフラグをFalseにして、スレッドが開始されないようにする
        self._running = False
        raise
close()

Close the handler and release all resources.

Source code in logkiss/handlers.py
def close(self):
    """
    Close the handler and release all resources.
    """
    if not hasattr(self, '_running'):
        # 初期化が完了していない場合は何もしない
        super().close()
        return

    try:
        # スレッドを停止
        self._running = False

        # 最後の一回フラッシュを試みる
        try:
            self._flush()
        except Exception as e:
            import sys
            print(f"Error in final flush: {e}", file=sys.stderr)

        # スレッドが存在し、実行中であれば、終了を待つ(最大1秒)
        if hasattr(self, '_flush_thread') and self._flush_thread is not None:
            if self._flush_thread.is_alive():
                self._flush_thread.join(timeout=1.0)
    except Exception as e:
        import sys
        print(f"Error closing handler: {e}", file=sys.stderr)
    finally:
        # 親クラスのcloseメソッドを呼び出す
        super().close()
emit(record)

Process log record

Source code in logkiss/handlers.py
def emit(self, record: logging.LogRecord) -> None:
    """Process log record"""
    if not self._running:
        return

    try:
        # ログレコードからメッセージを取得
        message = self.format(record)

        # タイムスタンプを取得(ミリ秒単位)
        timestamp = int(record.created * 1000)

        # エントリを作成
        entry = {
            "timestamp": timestamp,
            "message": message,
        }

        # exc_info=Trueが指定された場合のスタックトレース情報を追加
        if record.exc_info:
            import traceback
            import json
            # JSONとして追加情報を埋め込む
            entry["message"] += "\nStack Trace: " + json.dumps({
                "stack_trace": traceback.format_exception(*record.exc_info)
            })

        # バッチに追加
        with self._batch_lock:
            self._batch.append(entry)

            # バッチサイズに達したらフラッシュ
            if len(self._batch) >= self._batch_size:
                self._flush()
    except Exception as e:
        import sys
        print(f"Error in AWSCloudWatchHandler.emit: {e}", file=sys.stderr)

BaseHandler

Bases: Handler

Base handler class for implementing custom handlers

Source code in logkiss/handlers.py
class BaseHandler(logging.Handler):
    """Base handler class for implementing custom handlers"""

    def __init__(self, level=logging.NOTSET):
        """Initialize the handler"""
        super().__init__(level)

    def handle(self, record):
        """Handle a log record"""
        raise NotImplementedError("handle method must be implemented")
Functions
__init__(level=logging.NOTSET)

Initialize the handler

Source code in logkiss/handlers.py
def __init__(self, level=logging.NOTSET):
    """Initialize the handler"""
    super().__init__(level)
handle(record)

Handle a log record

Source code in logkiss/handlers.py
def handle(self, record):
    """Handle a log record"""
    raise NotImplementedError("handle method must be implemented")