欢迎光临南宁市中国丧葬服务网
详情描述

方案一:SocketHandler(TCP/UDP直连)

核心思想:通过Socket将日志事件直接发送到远程服务器

# 客户端配置
import logging
import logging.handlers

socket_handler = logging.handlers.SocketHandler('localhost', 
                                                 logging.handlers.DEFAULT_TCP_LOGGING_PORT)
logger = logging.getLogger('app')
logger.addHandler(socket_handler)

# 服务端接收(需单独运行)
import socketserver
import struct
import logging
import logging.handlers

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    def handle(self):
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk += self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return logging.handlers.pickle.loads(data)

    def handleLogRecord(self, record):
        logger = logging.getLogger(record.name)
        logger.handle(record)

server = socketserver.ThreadingTCPServer(('localhost', 9020), LogRecordStreamHandler)
server.serve_forever()

优点:Python内置,无需额外依赖 缺点:无持久化队列,网络中断可能丢失日志

方案二:Redis队列 + 自定义Handler

核心思想:使用Redis作为可靠的消息队列存储日志

# 安装:pip install redis
import logging
import json
import redis
from datetime import datetime

class RedisLogHandler(logging.Handler):
    def __init__(self, redis_host='localhost', redis_port=6379, 
                 key='app:logs', max_memory=10000000, **kwargs):
        super().__init__()
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, **kwargs)
        self.key = key
        self.max_memory = max_memory

    def emit(self, record):
        try:
            log_entry = {
                'timestamp': datetime.utcnow().isoformat(),
                'level': record.levelname,
                'message': self.format(record),
                'module': record.module,
                'funcName': record.funcName,
                'lineno': record.lineno,
                'process': record.process,
                'thread': record.threadName
            }

            # 使用RPUSH添加日志,LTRIM限制内存
            pipe = self.redis_client.pipeline()
            pipe.rpush(self.key, json.dumps(log_entry))
            pipe.ltrim(self.key, -10000, -1)  # 保留最近10000条
            pipe.execute()

        except Exception:
            self.handleError(record)

# 使用示例
logger = logging.getLogger('app')
redis_handler = RedisLogHandler(host='redis-server.com', password='secret')
logger.addHandler(redis_handler)

日志消费服务

import redis
import json

def consume_logs():
    r = redis.Redis(host='redis-server.com')

    while True:
        # BRPOP阻塞获取,支持多个队列
        _, log_data = r.brpop('app:logs', timeout=30)
        if log_data:
            log_entry = json.loads(log_data)
            # 处理日志:写入文件/数据库/分析系统
            process_log_entry(log_entry)

优点:高可靠、支持持久化、解耦生产消费 缺点:需要Redis基础设施

方案三:HTTP/HTTPS API传输(带重试机制)

核心思想:通过HTTP API发送日志,包含失败重试和批量发送

# 安装:pip install requests
import logging
import threading
import queue
import time
import json
from datetime import datetime
from typing import List, Dict, Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class BufferedHttpLogHandler(logging.Handler):
    def __init__(self, url: str, batch_size: int = 100, 
                 flush_interval: int = 10, max_retries: int = 3):
        super().__init__()
        self.url = url
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer: List[Dict[str, Any]] = []
        self.lock = threading.Lock()
        self.queue = queue.Queue()

        # 配置带重试的Session
        self.session = requests.Session()
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        # 启动后台发送线程
        self.sender_thread = threading.Thread(target=self._sender_worker, daemon=True)
        self.sender_thread.start()

        # 启动定时刷新线程
        self.flush_thread = threading.Thread(target=self._flush_worker, daemon=True)
        self.flush_thread.start()

    def emit(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': self.format(record),
            'path': record.pathname,
            'line': record.lineno,
            'stack_trace': record.exc_text
        }

        with self.lock:
            self.buffer.append(log_entry)

            if len(self.buffer) >= self.batch_size:
                self._flush_buffer()

    def _flush_buffer(self):
        if not self.buffer:
            return

        current_buffer = self.buffer.copy()
        self.buffer.clear()

        # 将缓冲区的日志放入队列供后台线程发送
        self.queue.put(current_buffer)

    def _flush_worker(self):
        while True:
            time.sleep(self.flush_interval)
            with self.lock:
                if self.buffer:
                    self._flush_buffer()

    def _sender_worker(self):
        while True:
            try:
                batch = self.queue.get(timeout=30)
                if batch:
                    self._send_batch(batch)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error in sender worker: {e}")

    def _send_batch(self, batch: List[Dict[str, Any]]):
        try:
            response = self.session.post(
                self.url,
                json={'logs': batch},
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            # 发送失败,可以加入死信队列或本地文件
            self._handle_failed_batch(batch, str(e))

    def _handle_failed_batch(self, batch: List[Dict[str, Any]], error: str):
        # 实现失败处理逻辑:写入本地文件/备用存储
        with open('failed_logs.ndjson', 'a') as f:
            for entry in batch:
                entry['_error'] = error
                f.write(json.dumps(entry) + '\n')

    def close(self):
        with self.lock:
            self._flush_buffer()
        super().close()

# 使用示例
handler = BufferedHttpLogHandler(
    url='https://log-server.com/api/logs',
    batch_size=50,
    flush_interval=5,
    max_retries=5
)
logger = logging.getLogger('app')
logger.addHandler(handler)

服务端API示例(Flask)

from flask import Flask, request, jsonify
import json

app = Flask(__name__)

@app.route('/api/logs', methods=['POST'])
def receive_logs():
    logs = request.json.get('logs', [])

    for log in logs:
        # 处理日志:存储到ES/数据库/文件等
        save_to_elasticsearch(log)

    return jsonify({'status': 'success', 'received': len(logs)})

def save_to_elasticsearch(log_entry):
    # 实现ES存储逻辑
    pass

方案对比与选型建议

方案 可靠性 性能 复杂性 适用场景
SocketHandler 内网环境,容忍少量丢失
Redis队列 高吞吐,需要持久化队列
HTTP API 跨网络,需要认证和加密

最佳实践建议

异步处理:避免日志传输阻塞主线程 失败处理:实现本地降级(写入文件)和重试机制 流量控制:添加批处理和速率限制 格式标准化:使用JSON等结构化格式 安全传输:HTTPS + 认证令牌 监控告警:监控日志队列积压和发送失败率

完整生产级示例

import logging
import logging.config

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            'format': '{"time": "%(asctime)s", "level": "%(levelname)s", "name": "%(name)s", "message": "%(message)s"}',
            'datefmt': '%Y-%m-%dT%H:%M:%SZ'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json'
        },
        'http': {
            '()': 'your_module.BufferedHttpLogHandler',
            'url': 'https://logs.example.com/ingest',
            'batch_size': 100,
            'flush_interval': 5,
            'level': 'INFO'
        },
        'local_fallback': {
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'app.log',
            'maxBytes': 10485760,
            'backupCount': 5,
            'formatter': 'json'
        }
    },
    'loggers': {
        'app': {
            'handlers': ['http', 'local_fallback', 'console'],
            'level': 'INFO',
            'propagate': False
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('app')

根据具体需求选择合适的方案:内部系统可选SocketHandler,大规模分布式系统推荐Redis队列,跨云或需要严格安全控制的场景使用HTTP API方案。

相关帖子
2026年,有哪些工具或应用能帮助用户集中管理所有订阅并进行提醒?
2026年,有哪些工具或应用能帮助用户集中管理所有订阅并进行提醒?
对于新手来说,掌握机动轮椅车安全驾驶有哪些必须知道的要点和技巧?
对于新手来说,掌握机动轮椅车安全驾驶有哪些必须知道的要点和技巧?
南宁市java开源商城二次开发%网络推广,专业开发团队
南宁市java开源商城二次开发%网络推广,专业开发团队
2026年正月理发到底有什么传统习俗的由来与具体说法?
2026年正月理发到底有什么传统习俗的由来与具体说法?
在工作中因制止违法犯罪行为而受伤,能否认定为工伤?
在工作中因制止违法犯罪行为而受伤,能否认定为工伤?
手机充电一夜不拔,对现在的电池来说究竟是一种保护还是伤害?
手机充电一夜不拔,对现在的电池来说究竟是一种保护还是伤害?
如何利用社交媒体和本地网络,扩大寻找走失老人的信息传播范围?
如何利用社交媒体和本地网络,扩大寻找走失老人的信息传播范围?
太原市网站运营%短视频营销推广,企业解决方案
太原市网站运营%短视频营销推广,企业解决方案
面对楼道和车棚被废旧自行车占据,业主和物业可以采取哪些合法措施?
面对楼道和车棚被废旧自行车占据,业主和物业可以采取哪些合法措施?
收到礼物时当面打开并具体赞美,这种行为体现了哪些高情商社交礼仪?
收到礼物时当面打开并具体赞美,这种行为体现了哪些高情商社交礼仪?
普通人想要在元宇宙中拥有一块地,需要了解哪些基本的技术与步骤?
普通人想要在元宇宙中拥有一块地,需要了解哪些基本的技术与步骤?
台州市殡葬一条龙-丧葬一站式服务,个性化服务
台州市殡葬一条龙-丧葬一站式服务,个性化服务
遇到突发困难,2026年临时救助的申请条件、流程和标准是什么?
遇到突发困难,2026年临时救助的申请条件、流程和标准是什么?
运动后大汗淋漓,为什么很多人建议等汗收了再洗澡?
运动后大汗淋漓,为什么很多人建议等汗收了再洗澡?
大同市殡葬服务电话|殡仪服务,白事灵堂策划
大同市殡葬服务电话|殡仪服务,白事灵堂策划
淮安市品牌网站开发建设%企业获客渠道,专业建站公司
淮安市品牌网站开发建设%企业获客渠道,专业建站公司
房产证上存在抵押贷款,在还清贷款前后如何办理过户手续?
房产证上存在抵押贷款,在还清贷款前后如何办理过户手续?
嘉兴市殡葬一条龙服务-白事一站式服务,收费透明,1小时上门
嘉兴市殡葬一条龙服务-白事一站式服务,收费透明,1小时上门
临汾市殡葬服务价格|丧事一站式服务,丧事灵堂策划
临汾市殡葬服务价格|丧事一站式服务,丧事灵堂策划
如何利用养老服务补贴更好地为家中老人安排日常照料与陪伴?
如何利用养老服务补贴更好地为家中老人安排日常照料与陪伴?