欢迎光临南宁市中国丧葬服务网
详情描述

方案一:SocketHandler(TCP/UDP直连)

核心思想:通过Socket将日志事件直接发送到远程服务器

# 客户端配置
import logging
import logging.handlers

socket_handler = logging.handlers.SocketHandler('localhost', 
                                                 logging.handlers.DEFAULT_TCP_LOGGING_PORT)
logger = logging.getLogger('app')
logger.addHandler(socket_handler)

# 服务端接收(需单独运行)
import socketserver
import struct
import logging
import logging.handlers

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    def handle(self):
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk += self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return logging.handlers.pickle.loads(data)

    def handleLogRecord(self, record):
        logger = logging.getLogger(record.name)
        logger.handle(record)

server = socketserver.ThreadingTCPServer(('localhost', 9020), LogRecordStreamHandler)
server.serve_forever()

优点:Python内置,无需额外依赖 缺点:无持久化队列,网络中断可能丢失日志

方案二:Redis队列 + 自定义Handler

核心思想:使用Redis作为可靠的消息队列存储日志

# 安装:pip install redis
import logging
import json
import redis
from datetime import datetime

class RedisLogHandler(logging.Handler):
    def __init__(self, redis_host='localhost', redis_port=6379, 
                 key='app:logs', max_memory=10000000, **kwargs):
        super().__init__()
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, **kwargs)
        self.key = key
        self.max_memory = max_memory

    def emit(self, record):
        try:
            log_entry = {
                'timestamp': datetime.utcnow().isoformat(),
                'level': record.levelname,
                'message': self.format(record),
                'module': record.module,
                'funcName': record.funcName,
                'lineno': record.lineno,
                'process': record.process,
                'thread': record.threadName
            }

            # 使用RPUSH添加日志,LTRIM限制内存
            pipe = self.redis_client.pipeline()
            pipe.rpush(self.key, json.dumps(log_entry))
            pipe.ltrim(self.key, -10000, -1)  # 保留最近10000条
            pipe.execute()

        except Exception:
            self.handleError(record)

# 使用示例
logger = logging.getLogger('app')
redis_handler = RedisLogHandler(host='redis-server.com', password='secret')
logger.addHandler(redis_handler)

日志消费服务

import redis
import json

def consume_logs():
    r = redis.Redis(host='redis-server.com')

    while True:
        # BRPOP阻塞获取,支持多个队列
        _, log_data = r.brpop('app:logs', timeout=30)
        if log_data:
            log_entry = json.loads(log_data)
            # 处理日志:写入文件/数据库/分析系统
            process_log_entry(log_entry)

优点:高可靠、支持持久化、解耦生产消费 缺点:需要Redis基础设施

方案三:HTTP/HTTPS API传输(带重试机制)

核心思想:通过HTTP API发送日志,包含失败重试和批量发送

# 安装:pip install requests
import logging
import threading
import queue
import time
import json
from datetime import datetime
from typing import List, Dict, Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class BufferedHttpLogHandler(logging.Handler):
    def __init__(self, url: str, batch_size: int = 100, 
                 flush_interval: int = 10, max_retries: int = 3):
        super().__init__()
        self.url = url
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer: List[Dict[str, Any]] = []
        self.lock = threading.Lock()
        self.queue = queue.Queue()

        # 配置带重试的Session
        self.session = requests.Session()
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        # 启动后台发送线程
        self.sender_thread = threading.Thread(target=self._sender_worker, daemon=True)
        self.sender_thread.start()

        # 启动定时刷新线程
        self.flush_thread = threading.Thread(target=self._flush_worker, daemon=True)
        self.flush_thread.start()

    def emit(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': self.format(record),
            'path': record.pathname,
            'line': record.lineno,
            'stack_trace': record.exc_text
        }

        with self.lock:
            self.buffer.append(log_entry)

            if len(self.buffer) >= self.batch_size:
                self._flush_buffer()

    def _flush_buffer(self):
        if not self.buffer:
            return

        current_buffer = self.buffer.copy()
        self.buffer.clear()

        # 将缓冲区的日志放入队列供后台线程发送
        self.queue.put(current_buffer)

    def _flush_worker(self):
        while True:
            time.sleep(self.flush_interval)
            with self.lock:
                if self.buffer:
                    self._flush_buffer()

    def _sender_worker(self):
        while True:
            try:
                batch = self.queue.get(timeout=30)
                if batch:
                    self._send_batch(batch)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error in sender worker: {e}")

    def _send_batch(self, batch: List[Dict[str, Any]]):
        try:
            response = self.session.post(
                self.url,
                json={'logs': batch},
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            # 发送失败,可以加入死信队列或本地文件
            self._handle_failed_batch(batch, str(e))

    def _handle_failed_batch(self, batch: List[Dict[str, Any]], error: str):
        # 实现失败处理逻辑:写入本地文件/备用存储
        with open('failed_logs.ndjson', 'a') as f:
            for entry in batch:
                entry['_error'] = error
                f.write(json.dumps(entry) + '\n')

    def close(self):
        with self.lock:
            self._flush_buffer()
        super().close()

# 使用示例
handler = BufferedHttpLogHandler(
    url='https://log-server.com/api/logs',
    batch_size=50,
    flush_interval=5,
    max_retries=5
)
logger = logging.getLogger('app')
logger.addHandler(handler)

服务端API示例(Flask)

from flask import Flask, request, jsonify
import json

app = Flask(__name__)

@app.route('/api/logs', methods=['POST'])
def receive_logs():
    logs = request.json.get('logs', [])

    for log in logs:
        # 处理日志:存储到ES/数据库/文件等
        save_to_elasticsearch(log)

    return jsonify({'status': 'success', 'received': len(logs)})

def save_to_elasticsearch(log_entry):
    # 实现ES存储逻辑
    pass

方案对比与选型建议

方案 可靠性 性能 复杂性 适用场景
SocketHandler 内网环境,容忍少量丢失
Redis队列 高吞吐,需要持久化队列
HTTP API 跨网络,需要认证和加密

最佳实践建议

异步处理:避免日志传输阻塞主线程 失败处理:实现本地降级(写入文件)和重试机制 流量控制:添加批处理和速率限制 格式标准化:使用JSON等结构化格式 安全传输:HTTPS + 认证令牌 监控告警:监控日志队列积压和发送失败率

完整生产级示例

import logging
import logging.config

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            'format': '{"time": "%(asctime)s", "level": "%(levelname)s", "name": "%(name)s", "message": "%(message)s"}',
            'datefmt': '%Y-%m-%dT%H:%M:%SZ'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json'
        },
        'http': {
            '()': 'your_module.BufferedHttpLogHandler',
            'url': 'https://logs.example.com/ingest',
            'batch_size': 100,
            'flush_interval': 5,
            'level': 'INFO'
        },
        'local_fallback': {
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'app.log',
            'maxBytes': 10485760,
            'backupCount': 5,
            'formatter': 'json'
        }
    },
    'loggers': {
        'app': {
            'handlers': ['http', 'local_fallback', 'console'],
            'level': 'INFO',
            'propagate': False
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('app')

根据具体需求选择合适的方案:内部系统可选SocketHandler,大规模分布式系统推荐Redis队列,跨云或需要严格安全控制的场景使用HTTP API方案。