python Logger日志
FlyLikeButterfly 人气:0用到的4个类:
1、Logger:
打印日志用的对象;
设置日志等级,添加移除handler,添加移除filter,设置下级Logger,使用各种方法打印日志;
创建方式有两种,使用logging.getLogger("mylog")和创建实例logging.Logger("mylog");
推荐使用getLogger()的方式,不传name参数的获得的Logger为root Logger,传name参数的上级为root Logger;
直接实例化的Logger没有上级,日志等级为NOTSET,并且用该实例创建的下级Logger的上级直接为root Logger;
使用getLogger(name)传入相同的名字获得是同一个Logger;
Logger拥有上下级关系,父子记录器名称之间用“.”分割,Logger拥有有效日志等级概念(getEffectiveLevel()方法返回的是日志等级整数值),直接新建实例创建的Logger日志等级为NOTSET,默认的根记录器为WARNING等级,如果NOTSET等级的Logger为根记录器则处理所有消息,否则会委托给父级记录器,遍历父记录器链,直到找到非NOTSET的父记录器并把该等级作为自己的有效等级,如果直到根记录器也是NOTSET则处理所有消息;
2、Formatter:
日志打印格式;
创建方式为logging.Formatter(fmt=None, datefmt=None, style='%', validate=True)
fmt:格式化日志
datefmt:格式化fmt中的%(asctime)s
style:在3.2版本添加
validate:在3.8版本添加,不正确或不匹配的样式和fmt将引发ValueError;
跟上一篇一致python的logging日志模块
3、Filter:
过滤日志;
通过继承logging.Filter类,并实现 filter(self, record: LogRecord) -> int 方法,返回0或者False表示不通过,返回非0或者True表示可以通过;(from logging import LogRecord)
LogRecord可以操作的属性大概有这么多:
同一个Handler可以添加多个Filter,依次过滤,当前Filter通过后传递给下一个Filter;
4、Handler:
日志输出方式;
设置日志等级,设置formatter,添加移除filter;
Logger可以添加多个handler,将日志按不同格式和过滤送往不同的地方,同一个Logger只会添加同一个handler一次,多次添加无影响;
子Logger处理完自己的handler后,会将日志传递给父Logger的handler处理,依次向上传递,不要将同一个handler同时添加到父子Logger里,否则父子Logger都会处理会打印多次相同日志;
可以通过设置Logger对象的propagate属性为False关闭传递给父Logger的handler;
大概有这么多Handler:
(from logging import handlers)
StreamHandler:
构造方法 logging.StreamHandler(stream=None)
将日志发送到像sys.stdout、sys.stderr或类似文件的对象中(支持write()和flush()方法的对象),默认使用sys.stderr,3.2版本后还有个terminator属性,可以设置终止符(默认“\n”);
FileHandler:
构造方法 logging.FileHandler(filename, mode='a', encoding=None, delay=False)
将日志记录到文件中,默认文件将无限增长,如果delay为true,打开文件会延迟到第一次调用emit();
3.6版本开始,pathlib.Path也可以作为filename的值
NullHandler:
不做任何格式化和输出,提供给库开发人员使用;
WatchedFileHandler:
这是一个FileHandler,监控正在记录日志的文件,如果文件变动了则关闭文件重新打开;(windows系统不适用)
BaseRotatingHandler:
构造方法 logging.handlers.BaseRotatingHandler(filename, mode, encoding=None, delay=False)
是RotatingFileHandler和TimedRotatingFileHandler的基类,不需要实例化该类;
RotatingFileHandler:
构造方法 logging.handlers.RotatingFileHandler(filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False)
按照日志文件大小和备份日志文件数量保存日志到文件;
maxBytes或者backupCount为0则不会滚动日志,当日志大小接近maxBytes时会用“.1”“.2”“.3”...后缀保存旧文件,并保持旧文件数量不超过backupCount,当前日志一直是没有后缀的那个文件;
TimedRotatingFileHandler:
构造方法 logging.handlers.TimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None)
按照时间间隔和备份文件数量保存日志到文件;
when:“S”秒,“M”分钟,“H”小时,“D”天,“W0”-“W6”工作日(W0为周一),“midnight”午夜(不指定atTime的时候午夜12点,否则按照atTime的时间滚动)
interval:时间间隔;
backupCount:保留文件数量上限;
utc:为true时文件名后缀使用UTC时间否则使用本地时间;
atTime:3.4添加,datetime.time类型,指定一天中发生滚动日志的时间,只有when为“midnight”或者“W0”-“W6”时有效;
生成的备份文件文件名后缀格式为%Y-%m-%d_%H-%M-%S,第一次计算滚动时间(程序启动后)时会使用现有日志的最后修改时间或者当前时间计算;
SocketHandler:
构造方法 logging.handlers.SocketHandler(host, port)
将日志发送到网络套接字,基类使用的是TCP;
发送的二进制bytes是由编码的,由“内容长度+内容”组成,内容是用pickle序列化的一个dict(LogRecord,可以用logging.makeLogRecord(attrdict)转换),长度是用struct将序列化后的内容打包的一个大端无符号long数值,具体逻辑在SocketHandler类源码的makePickle()方法中有体现:
搞了半天才搞好的小demo(之前没看到编码方式还以为socket的编码,服务端接收数据解不出):
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers import time log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) h = logging.handlers.SocketHandler(host="127.0.0.1", port=8899) f = logging.Formatter("[%(name)s][%(asctime)s]%(message)s") h.setFormatter(f) log.addHandler(h) for x in range(1, 6): log.info("test log %d" % x) time.sleep(0.5)
服务端demo:
#!/usr/bin/env python3 # coding=utf-8 import socket import pickle import struct import logging def unPickle(bs: bytes): data_len_bytes = bs[0:4] data_len = struct.unpack(">L", data_len_bytes)[0] pickled_data = bs[4: data_len + 4 + 1] return pickle.loads(pickled_data) socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_server.bind(("127.0.0.1", 8899)) socket_server.listen(3) while True: try: client, client_addr = socket_server.accept() while True: data = client.recv(1024 * 10) if data != b'': log_dict = unPickle(data) log_record = logging.makeLogRecord(log_dict) print("recv log:", log_record) except Exception as e: print("Exception:", e)
先运行socket服务,再测日志,因为连接不到服务日志会被丢弃,运行结果:
DatagramHandler:
构造方法 logging.handlers.DatagramHandler(host, port)
继承了SocketHandler,使用UDP发送,类似SocketHandler的使用;
SysLogHandler:
构造方法 logging.handlers.SysLogHandler(address=('localhost', SYSLOG_UDP_PORT), facility=LOG_USER, socktype=socket.SOCK_DGRAM)
将日志发送到远程或者本地的Unix系统日志,未指定address则使用('localhost', 514)地址;
NTEventLogHandler:
构造方法 logging.handlers.NTEventLogHandler(appname, dllname=None, logtype='Application')
将日志发送到本地的Windows NT、Windows 2000、Windows XP系统日志,使用时需要Mark Hammond's Win32的python扩展;
SMTPHandler:
构造方法 logging.handlers.SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=None, secure=None, timeout=1.0)
将日志发送到电子邮件;
mailhost使用(host, port)元组,toaddrs是一个字符串列表,credentials可以用(username, password)元组指定用户名密码
MemoryHandler:
构造方法 logging.handlers.MemoryHandler(capacity, flushLevel=ERROR, target=None, flushOnClose=True)
支持在内存中缓冲日志,当缓存将满或者出现某种严重情况时把日志发送到目标handler;
是BufferingHandler的子类;
每当向缓冲区添加日志的时候都会调用shouldFlush()判断是否需要刷新,如果需要则会调用flush()刷新;
HTTPHandler:
构造方法 logging.handlers.HTTPHandler(host, url, method='GET', secure=False, credentials=None, context=None)
通过GET或者POST向一个web服务器发送日志;
如果要指定端口,host可以使用host:port值;
secure为true,则使用HTTPS;
对HTTPHandler使用setFormatter()是无效的,HTTPHandler没有调用format()格式化,而是调用了mapLogRecord()方法然后使用urllib.parse.urlencode()编码的;
mapLogRecord()函数很简单(有需要可以重写):
而LogRecord()里是没有asctime字段的,所以log.asctime是错误的,但是logRecord里有created和msecs字段表时间:
小小的demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers import time log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) h_get = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="GET", secure=False, credentials=None, context=None) log.addHandler(h_get) h_post = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="POST", secure=False, credentials=None, context=None) log.addHandler(h_post) h = logging.StreamHandler() f = logging.Formatter("[%(levelname)s][%(asctime)s]%(message)s") h.setFormatter(f) log.addHandler(h) for x in range(1, 3): log.info("test HTTP log %d" % x) time.sleep(0.5)
http服务端:
#!/usr/bin/env python3 # coding=utf-8 from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from urllib import parse import logging import time class MyHttpServer(BaseHTTPRequestHandler): def do_GET(self): url = parse.urlparse(self.path) q = parse.parse_qs(url.query) log = logging.makeLogRecord(q) log_created = time.localtime(float(log.created[0])) format_time = time.strftime("%Y/%m/%d %H:%M:%S", log_created) print("-GET:", log.name, log.levelname, log.msg, format_time+","+str(int(float(log.msecs[0])//1))) self.send_response(200) self.end_headers() self.wfile.flush() def do_POST(self): length = int(self.headers["content-length"]) data = self.rfile.read(length) data = data.decode(encoding="utf-8") data_dict = parse.parse_qs(data) data = logging.makeLogRecord(data_dict) print("=POST:", data) self.send_response(200) self.end_headers() self.wfile.flush() httpserver = ThreadingHTTPServer(("127.0.0.1", 8080), MyHttpServer) httpserver.serve_forever()
先运行服务端再跑日志demo,运行结果:
QueueHandler/QueueListener:
构造方法 logging.handlers.QueueHandler(queue)
logging.handlers.QueueListener(queue, *handlers, respect_handler_level=False)
将日志发送到队列,用于处理队列或者多线程模块情况;
queue可以是类队列的任何对象(原样传给dequeue()函数),也可以用queue.SimpleQueue代替queue;
QueueHandler的小demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import handlers from logging import LogRecord import queue class MyLogHandler(logging.Handler): def handle(self, record: LogRecord) -> None: print(record) log = logging.getLogger("mylog") log.setLevel(logging.DEBUG) q = queue.SimpleQueue() h = handlers.QueueHandler(q) f = logging.Formatter("[%(levelname)s]%(message)s") h.setFormatter(f) log.addHandler(h) listener = handlers.QueueListener(q, MyLogHandler(), respect_handler_level=False) listener.start() print("listener started") log.info("test queue log1 info") log.error("test queue log2 error") listener.stop() print("listener stopted") log.info("test queue log3 info") log.error("test queue log4 error") print("test end")
执行结果:
常用Demo:
#!/usr/bin/env python3 # coding=utf-8 import logging from logging import LogRecord from logging import handlers import sys # log1 = logging.Logger("a", logging.DEBUG) log1 = logging.getLogger("a") _log1 = logging.getLogger("a") print(log1 is _log1, id(log1), id(_log1)) log1.setLevel(logging.DEBUG) log2 = log1.getChild("b") log3 = log1.getChild("c") print(log1) print(log2) print(log3) h1 = logging.StreamHandler() format1 = logging.Formatter("[H1] [%(levelname)-8s] %(message)s") h1.setFormatter(format1) log1.addHandler(h1) h2 = logging.StreamHandler() format2 = logging.Formatter(fmt="[H2][%(levelname)7s][%(asctime)s]%(message)s", datefmt="%c") h2.setFormatter(format2) class myFilter(logging.Filter): def filter(self, record: LogRecord) -> int: print("record:" + repr(record)) if "HELLO" in record.msg: return True else: return False h2.addFilter(myFilter()) log2.addHandler(h2) # log2.addFilter(myFilter()) h3 = logging.StreamHandler() format3 = logging.Formatter("[H3]{%(levelname)s}{%(name)s}%(message)s") h3.setFormatter(format3) h3.setStream(sys.stdout) h3.setLevel(logging.INFO) log3.addHandler(h3) # h3_file = logging.FileHandler(filename="test_Logging2.log", mode="w") h3_file.setFormatter(logging.Formatter("[H3File][%(levelname)8s][%(asctime)s]%(message)s")) log3.addHandler(h3_file) # h3_rotatingfile = handlers.RotatingFileHandler(filename="test_Logging2_rotating.log", mode="a", maxBytes=256, backupCount=3, encoding="utf-8", delay=False) h3_rotatingfile.setFormatter(logging.Formatter("[h3_rotating][%(levelname)s] %(message)s")) log3.addHandler(h3_rotatingfile) # h3_timedrotationgfile = handlers.TimedRotatingFileHandler(filename="test_Logging2_timedrotating.log", when="S", interval=5, backupCount=5, encoding="utf-8", delay=False, utc=False, # atTime= ) h3_timedrotationgfile.setFormatter(logging.Formatter("[H3timed][%(levelname)s]%(message)s")) log3.addHandler(h3_timedrotationgfile) log3.propagate = False log1.debug("test log1") log1.warning("warn log1") log2.debug("test log2 HELLO") log2.info("info log2") log2.warning("warn log2 HELLO") log2.error("error log2") log3.debug("test log3") log3.warning("warn log3")
多次执行结果中的一次:
生成的日志文件:
参考:
logging — Logging facility for Python — Python 3.8.12 documentation
logging.handlers — Logging handlers — Python 3.8.12 documentation
加载全部内容