# -*- coding: utf-8 -*- import logging import logging.config import datetime import redis class OutLog: _instance = None logger = None def __new__(cls): if cls._instance is None: cls._instance = super(OutLog, cls).__new__(cls) cls.logger = logging.getLogger("app") # 默认logger名称为"app" # cls._instance.queue_dict = {} # cls._instance.done_dict = {} # 初始化 Redis 连接 cls._instance.redis_client = redis.StrictRedis(host='localhost', port=6379, password="root",db=0, decode_responses=True) return cls._instance def get_queue(self,user_id,producer_name): # if user_id not in self.queue_dict: # self.queue_dict[user_id] = [] # self.done_dict[user_id]={} # self.done_dict[user_id][producer_name] = False # 初始化为未完成的字典 # 使用 Redis 进行存储和查询 if not self.redis_client.exists(f"queue:{user_id}"): # self.redis_client.rpush(f"queue:{user_id}") self.logger.info(f"queue:{user_id}") self.redis_client.hset(f"done:{user_id}", producer_name, "0") # 初始化为未完成 return self.UserLogger(user_id) def get_queueData(self, user_id): # if user_id in self.queue_dict: # return self.queue_dict[user_id] if self.redis_client.exists(f"queue:{user_id}"): return self.redis_client.lpop(f"queue:{user_id}") # 获取队列首个并删除数据 def del_queue(self,user_id): # if self.is_done(user_id): # del self.queue_dict[user_id] # del self.done_dict[user_id] if self.is_done(user_id): self.redis_client.delete(f"queue:{user_id}") self.redis_client.delete(f"done:{user_id}") class UserLogger: def __init__(self, user_id): self.user_id = user_id self.logger = OutLog._instance.logger def log(self, item: str, level: str): self._log_to_logger(item, level) if(level != "INFO"): return dtf = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"{dtf}[{level}]: {item}" # print(log_entry) # OutLog._instance.queue_dict[self.user_id].append(log_entry) # 保存到对应用户的队列 OutLog._instance.redis_client.rpush(f"queue:{self.user_id}", log_entry) # 保存到对应用户的队列 def _log_to_logger(self, item: str, level: str): if level == "DEBUG": self.logger.debug(item) elif level == "INFO": self.logger.info(item) elif level == "WARNING": self.logger.warning(item) elif level == "ERROR": self.logger.error(item) elif level == "CRITICAL": self.logger.critical(item) def info(self, item: str): self.log(item, "INFO") def warning(self, item: str): self.log(item, "WARNING") def debug(self, item: str): self.log(item, "DEBUG") def error(self, item: str): self.log(item, "ERROR") def critical(self, item: str): self.log(item, "CRITICAL") # def mark_done(self, user_id, producer_name): # self.done_dict[user_id][producer_name] = True # def is_done(self, user_id): # # print(self.done_dict.get(user_id, {}),self.done_dict.get(user_id, {}).values()) # return all(self.done_dict.get(user_id, {}).values()) # 检查所有生产者是否完成 def mark_done(self, user_id, producer_name): self.redis_client.hset(f"done:{user_id}", producer_name, "1") def is_done(self, user_id): done_dict = self.redis_client.hgetall(f"done:{user_id}") return all(value == "1" for value in done_dict.values()) if done_dict else False # 检查所有生产者是否完成 # 日志配置 log_config = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'standard', 'level': logging.INFO, }, 'file': { 'class': 'logging.FileHandler', 'filename': 'Logger.log', 'formatter': 'standard', 'level': logging.INFO, }, }, 'loggers': { '': { 'handlers': ['console', 'file'], 'level': logging.INFO, 'propagate': True, }, } } logging.config.dictConfig(log_config) outLog = OutLog() # 获取单例实例