You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

131 lines
4.7 KiB

# -*- coding: utf-8 -*-
import logging
import logging.config
import datetime
import redis
class OutLog:
_instance = None
logger = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(OutLog, cls).__new__(cls)
cls.logger = logging.getLogger("app") # 默认logger名称为"app"
# cls._instance.queue_dict = {}
# cls._instance.done_dict = {}
# 初始化 Redis 连接
cls._instance.redis_client = redis.StrictRedis(host='localhost', port=6379, password="root",db=0, decode_responses=True)
return cls._instance
def get_queue(self,user_id,producer_name):
# if user_id not in self.queue_dict:
# self.queue_dict[user_id] = []
# self.done_dict[user_id]={}
# self.done_dict[user_id][producer_name] = False # 初始化为未完成的字典
# 使用 Redis 进行存储和查询
if not self.redis_client.exists(f"queue:{user_id}"):
# self.redis_client.rpush(f"queue:{user_id}")
self.logger.info(f"queue:{user_id}")
self.redis_client.hset(f"done:{user_id}", producer_name, "0") # 初始化为未完成
return self.UserLogger(user_id)
def get_queueData(self, user_id):
# if user_id in self.queue_dict:
# return self.queue_dict[user_id]
if self.redis_client.exists(f"queue:{user_id}"):
return self.redis_client.lpop(f"queue:{user_id}") # 获取队列首个并删除数据
def del_queue(self,user_id):
# if self.is_done(user_id):
# del self.queue_dict[user_id]
# del self.done_dict[user_id]
if self.is_done(user_id):
self.redis_client.delete(f"queue:{user_id}")
self.redis_client.delete(f"done:{user_id}")
class UserLogger:
def __init__(self, user_id):
self.user_id = user_id
self.logger = OutLog._instance.logger
def log(self, item: str, level: str):
self._log_to_logger(item, level)
if(level != "INFO"):
return
dtf = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{dtf}[{level}]: {item}"
# print(log_entry)
# OutLog._instance.queue_dict[self.user_id].append(log_entry) # 保存到对应用户的队列
OutLog._instance.redis_client.rpush(f"queue:{self.user_id}", log_entry) # 保存到对应用户的队列
def _log_to_logger(self, item: str, level: str):
if level == "DEBUG":
self.logger.debug(item)
elif level == "INFO":
self.logger.info(item)
elif level == "WARNING":
self.logger.warning(item)
elif level == "ERROR":
self.logger.error(item)
elif level == "CRITICAL":
self.logger.critical(item)
def info(self, item: str):
self.log(item, "INFO")
def warning(self, item: str):
self.log(item, "WARNING")
def debug(self, item: str):
self.log(item, "DEBUG")
def error(self, item: str):
self.log(item, "ERROR")
def critical(self, item: str):
self.log(item, "CRITICAL")
# def mark_done(self, user_id, producer_name):
# self.done_dict[user_id][producer_name] = True
# def is_done(self, user_id):
# # print(self.done_dict.get(user_id, {}),self.done_dict.get(user_id, {}).values())
# return all(self.done_dict.get(user_id, {}).values()) # 检查所有生产者是否完成
def mark_done(self, user_id, producer_name):
self.redis_client.hset(f"done:{user_id}", producer_name, "1")
def is_done(self, user_id):
done_dict = self.redis_client.hgetall(f"done:{user_id}")
return all(value == "1" for value in done_dict.values()) if done_dict else False # 检查所有生产者是否完成
# 日志配置
log_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'standard',
'level': logging.INFO,
},
'file': {
'class': 'logging.FileHandler',
'filename': 'Logger.log',
'formatter': 'standard',
'level': logging.INFO,
},
},
'loggers': {
'': {
'handlers': ['console', 'file'],
'level': logging.INFO,
'propagate': True,
},
}
}
logging.config.dictConfig(log_config)
outLog = OutLog() # 获取单例实例