亲宝软件园·资讯

展开

python Logger日志

FlyLikeButterfly 人气:0

用到的4个类:

1、Logger:

打印日志用的对象; 

设置日志等级,添加移除handler,添加移除filter,设置下级Logger,使用各种方法打印日志;

创建方式有两种,使用logging.getLogger("mylog")和创建实例logging.Logger("mylog");

推荐使用getLogger()的方式,不传name参数的获得的Logger为root Logger,传name参数的上级为root Logger;

直接实例化的Logger没有上级,日志等级为NOTSET,并且用该实例创建的下级Logger的上级直接为root Logger;

 使用getLogger(name)传入相同的名字获得是同一个Logger;

 Logger拥有上下级关系,父子记录器名称之间用“.”分割,Logger拥有有效日志等级概念(getEffectiveLevel()方法返回的是日志等级整数值),直接新建实例创建的Logger日志等级为NOTSET,默认的根记录器为WARNING等级,如果NOTSET等级的Logger为根记录器则处理所有消息,否则会委托给父级记录器,遍历父记录器链,直到找到非NOTSET的父记录器并把该等级作为自己的有效等级,如果直到根记录器也是NOTSET则处理所有消息;

 

2、Formatter:

日志打印格式;

创建方式为logging.Formatter(fmt=None, datefmt=None, style='%', validate=True)

fmt:格式化日志

datefmt:格式化fmt中的%(asctime)s

style:在3.2版本添加

validate:在3.8版本添加,不正确或不匹配的样式和fmt将引发ValueError;

跟上一篇一致python的logging日志模块

 

3、Filter:

过滤日志;

通过继承logging.Filter类,并实现 filter(self, record: LogRecord) -> int 方法,返回0或者False表示不通过,返回非0或者True表示可以通过;(from logging import LogRecord)

LogRecord可以操作的属性大概有这么多:

同一个Handler可以添加多个Filter,依次过滤,当前Filter通过后传递给下一个Filter;

4、Handler:

日志输出方式;

设置日志等级,设置formatter,添加移除filter;

Logger可以添加多个handler,将日志按不同格式和过滤送往不同的地方,同一个Logger只会添加同一个handler一次,多次添加无影响;

子Logger处理完自己的handler后,会将日志传递给父Logger的handler处理,依次向上传递,不要将同一个handler同时添加到父子Logger里,否则父子Logger都会处理会打印多次相同日志;

可以通过设置Logger对象的propagate属性为False关闭传递给父Logger的handler;

 

大概有这么多Handler:

(from logging import handlers)

StreamHandler:

构造方法 logging.StreamHandler(stream=None)

将日志发送到像sys.stdout、sys.stderr或类似文件的对象中(支持write()和flush()方法的对象),默认使用sys.stderr,3.2版本后还有个terminator属性,可以设置终止符(默认“\n”);

FileHandler:

构造方法 logging.FileHandler(filename, mode='a', encoding=None, delay=False)

将日志记录到文件中,默认文件将无限增长,如果delay为true,打开文件会延迟到第一次调用emit();

3.6版本开始,pathlib.Path也可以作为filename的值

NullHandler:

不做任何格式化和输出,提供给库开发人员使用;

WatchedFileHandler:

这是一个FileHandler,监控正在记录日志的文件,如果文件变动了则关闭文件重新打开;(windows系统不适用)

BaseRotatingHandler:

构造方法 logging.handlers.BaseRotatingHandler(filename, mode, encoding=None, delay=False)

是RotatingFileHandler和TimedRotatingFileHandler的基类,不需要实例化该类;

RotatingFileHandler:

构造方法 logging.handlers.RotatingFileHandler(filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False)

按照日志文件大小和备份日志文件数量保存日志到文件;

maxBytes或者backupCount为0则不会滚动日志,当日志大小接近maxBytes时会用“.1”“.2”“.3”...后缀保存旧文件,并保持旧文件数量不超过backupCount,当前日志一直是没有后缀的那个文件;

TimedRotatingFileHandler:

构造方法 logging.handlers.TimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None)

按照时间间隔和备份文件数量保存日志到文件;

when:“S”秒,“M”分钟,“H”小时,“D”天,“W0”-“W6”工作日(W0为周一),“midnight”午夜(不指定atTime的时候午夜12点,否则按照atTime的时间滚动)

interval:时间间隔;

backupCount:保留文件数量上限;

utc:为true时文件名后缀使用UTC时间否则使用本地时间;

atTime:3.4添加,datetime.time类型,指定一天中发生滚动日志的时间,只有when为“midnight”或者“W0”-“W6”时有效;

生成的备份文件文件名后缀格式为%Y-%m-%d_%H-%M-%S,第一次计算滚动时间(程序启动后)时会使用现有日志的最后修改时间或者当前时间计算;

 

SocketHandler:

构造方法 logging.handlers.SocketHandler(host, port)

将日志发送到网络套接字,基类使用的是TCP;

发送的二进制bytes是由编码的,由“内容长度+内容”组成,内容是用pickle序列化的一个dict(LogRecord,可以用logging.makeLogRecord(attrdict)转换),长度是用struct将序列化后的内容打包的一个大端无符号long数值,具体逻辑在SocketHandler类源码的makePickle()方法中有体现:

搞了半天才搞好的小demo(之前没看到编码方式还以为socket的编码,服务端接收数据解不出):

#!/usr/bin/env python3
# coding=utf-8
 
import logging
from logging import handlers
import time
 
log = logging.getLogger("mylog")
log.setLevel(logging.DEBUG)
h = logging.handlers.SocketHandler(host="127.0.0.1", port=8899)
f = logging.Formatter("[%(name)s][%(asctime)s]%(message)s")
h.setFormatter(f)
log.addHandler(h)
for x in range(1, 6):
    log.info("test log %d" % x)
    time.sleep(0.5)

 服务端demo:

#!/usr/bin/env python3
# coding=utf-8
 
import socket
import pickle
import struct
import logging
 
 
def unPickle(bs: bytes):
    data_len_bytes = bs[0:4]
    data_len = struct.unpack(">L", data_len_bytes)[0]
    pickled_data = bs[4: data_len + 4 + 1]
    return pickle.loads(pickled_data)
 
 
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind(("127.0.0.1", 8899))
socket_server.listen(3)
while True:
    try:
        client, client_addr = socket_server.accept()
        while True:
            data = client.recv(1024 * 10)
            if data != b'':
                log_dict = unPickle(data)
                log_record = logging.makeLogRecord(log_dict)
                print("recv log:", log_record)
    except Exception as e:
        print("Exception:", e)
 

 先运行socket服务,再测日志,因为连接不到服务日志会被丢弃,运行结果:

DatagramHandler:

构造方法 logging.handlers.DatagramHandler(host, port)

继承了SocketHandler,使用UDP发送,类似SocketHandler的使用;

 

SysLogHandler:

构造方法 logging.handlers.SysLogHandler(address=('localhost', SYSLOG_UDP_PORT), facility=LOG_USER, socktype=socket.SOCK_DGRAM)

将日志发送到远程或者本地的Unix系统日志,未指定address则使用('localhost', 514)地址;

NTEventLogHandler:

构造方法 logging.handlers.NTEventLogHandler(appname, dllname=None, logtype='Application')

将日志发送到本地的Windows NT、Windows 2000、Windows XP系统日志,使用时需要Mark Hammond's Win32的python扩展;

SMTPHandler:

构造方法 logging.handlers.SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=None, secure=None, timeout=1.0)

将日志发送到电子邮件;

mailhost使用(host, port)元组,toaddrs是一个字符串列表,credentials可以用(username, password)元组指定用户名密码

MemoryHandler:

构造方法 logging.handlers.MemoryHandler(capacity, flushLevel=ERROR, target=None, flushOnClose=True)

支持在内存中缓冲日志,当缓存将满或者出现某种严重情况时把日志发送到目标handler;

是BufferingHandler的子类;

每当向缓冲区添加日志的时候都会调用shouldFlush()判断是否需要刷新,如果需要则会调用flush()刷新;

HTTPHandler:

构造方法 logging.handlers.HTTPHandler(host, url, method='GET', secure=False, credentials=None, context=None)

通过GET或者POST向一个web服务器发送日志;

如果要指定端口,host可以使用host:port值;

secure为true,则使用HTTPS;

对HTTPHandler使用setFormatter()是无效的,HTTPHandler没有调用format()格式化,而是调用了mapLogRecord()方法然后使用urllib.parse.urlencode()编码的;

mapLogRecord()函数很简单(有需要可以重写):

而LogRecord()里是没有asctime字段的,所以log.asctime是错误的,但是logRecord里有created和msecs字段表时间:

 小小的demo:

#!/usr/bin/env python3
# coding=utf-8
 
import logging
from logging import handlers
import time
 
log = logging.getLogger("mylog")
log.setLevel(logging.DEBUG)
h_get = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="GET",
                             secure=False, credentials=None, context=None)
log.addHandler(h_get)
h_post = handlers.HTTPHandler(host="127.0.0.1:8080", url="test_http_log", method="POST",
                              secure=False, credentials=None, context=None)
log.addHandler(h_post)
 
h = logging.StreamHandler()
f = logging.Formatter("[%(levelname)s][%(asctime)s]%(message)s")
h.setFormatter(f)
log.addHandler(h)
for x in range(1, 3):
    log.info("test HTTP log %d" % x)
    time.sleep(0.5)

http服务端:

#!/usr/bin/env python3
# coding=utf-8
 
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from urllib import parse
import logging
import time
 
 
class MyHttpServer(BaseHTTPRequestHandler):
    def do_GET(self):
        url = parse.urlparse(self.path)
        q = parse.parse_qs(url.query)
        log = logging.makeLogRecord(q)
        log_created = time.localtime(float(log.created[0]))
        format_time = time.strftime("%Y/%m/%d %H:%M:%S", log_created)
        print("-GET:", log.name, log.levelname, log.msg, format_time+","+str(int(float(log.msecs[0])//1)))
        self.send_response(200)
        self.end_headers()
        self.wfile.flush()
 
    def do_POST(self):
        length = int(self.headers["content-length"])
        data = self.rfile.read(length)
        data = data.decode(encoding="utf-8")
        data_dict = parse.parse_qs(data)
        data = logging.makeLogRecord(data_dict)
        print("=POST:", data)
        self.send_response(200)
        self.end_headers()
        self.wfile.flush()
 
 
httpserver = ThreadingHTTPServer(("127.0.0.1", 8080), MyHttpServer)
httpserver.serve_forever()

 先运行服务端再跑日志demo,运行结果:

QueueHandler/QueueListener:

构造方法 logging.handlers.QueueHandler(queue)

logging.handlers.QueueListener(queue, *handlers, respect_handler_level=False)

将日志发送到队列,用于处理队列或者多线程模块情况;

queue可以是类队列的任何对象(原样传给dequeue()函数),也可以用queue.SimpleQueue代替queue;

QueueHandler的小demo:

#!/usr/bin/env python3
# coding=utf-8
 
import logging
from logging import handlers
from logging import LogRecord
import queue
 
 
class MyLogHandler(logging.Handler):
    def handle(self, record: LogRecord) -> None:
        print(record)
 
 
log = logging.getLogger("mylog")
log.setLevel(logging.DEBUG)
q = queue.SimpleQueue()
h = handlers.QueueHandler(q)
f = logging.Formatter("[%(levelname)s]%(message)s")
h.setFormatter(f)
log.addHandler(h)
 
listener = handlers.QueueListener(q, MyLogHandler(), respect_handler_level=False)
 
listener.start()
print("listener started")
log.info("test queue log1 info")
log.error("test queue log2 error")
 
listener.stop()
print("listener stopted")
log.info("test queue log3 info")
log.error("test queue log4 error")
print("test end")

执行结果:

常用Demo:

#!/usr/bin/env python3
# coding=utf-8
 
import logging
from logging import LogRecord
from logging import handlers
import sys
 
# log1 = logging.Logger("a", logging.DEBUG)
log1 = logging.getLogger("a")
_log1 = logging.getLogger("a")
print(log1 is _log1, id(log1), id(_log1))
log1.setLevel(logging.DEBUG)
log2 = log1.getChild("b")
log3 = log1.getChild("c")
print(log1)
print(log2)
print(log3)
 
h1 = logging.StreamHandler()
format1 = logging.Formatter("[H1] [%(levelname)-8s] %(message)s")
h1.setFormatter(format1)
log1.addHandler(h1)
 
h2 = logging.StreamHandler()
format2 = logging.Formatter(fmt="[H2][%(levelname)7s][%(asctime)s]%(message)s", datefmt="%c")
h2.setFormatter(format2)
 
 
class myFilter(logging.Filter):
    def filter(self, record: LogRecord) -> int:
        print("record:" + repr(record))
        if "HELLO" in record.msg:
            return True
        else:
            return False
 
 
h2.addFilter(myFilter())
log2.addHandler(h2)
# log2.addFilter(myFilter())
 
h3 = logging.StreamHandler()
format3 = logging.Formatter("[H3]{%(levelname)s}{%(name)s}%(message)s")
h3.setFormatter(format3)
h3.setStream(sys.stdout)
h3.setLevel(logging.INFO)
log3.addHandler(h3)
#
h3_file = logging.FileHandler(filename="test_Logging2.log", mode="w")
h3_file.setFormatter(logging.Formatter("[H3File][%(levelname)8s][%(asctime)s]%(message)s"))
log3.addHandler(h3_file)
#
h3_rotatingfile = handlers.RotatingFileHandler(filename="test_Logging2_rotating.log", mode="a",
                                               maxBytes=256, backupCount=3,
                                               encoding="utf-8", delay=False)
h3_rotatingfile.setFormatter(logging.Formatter("[h3_rotating][%(levelname)s] %(message)s"))
log3.addHandler(h3_rotatingfile)
#
h3_timedrotationgfile = handlers.TimedRotatingFileHandler(filename="test_Logging2_timedrotating.log",
                                                          when="S", interval=5,
                                                          backupCount=5, encoding="utf-8",
                                                          delay=False, utc=False,
                                                          # atTime=
                                                          )
h3_timedrotationgfile.setFormatter(logging.Formatter("[H3timed][%(levelname)s]%(message)s"))
log3.addHandler(h3_timedrotationgfile)
log3.propagate = False
 
log1.debug("test log1")
log1.warning("warn log1")
log2.debug("test log2 HELLO")
log2.info("info log2")
log2.warning("warn log2 HELLO")
log2.error("error log2")
log3.debug("test log3")
log3.warning("warn log3")

多次执行结果中的一次:

生成的日志文件:

 

参考:

logging — Logging facility for Python — Python 3.8.12 documentation

logging.handlers — Logging handlers — Python 3.8.12 documentation

Python3 日志(内置logging模块) - 天马行宇 - 博客园

加载全部内容

相关教程
猜你喜欢
用户评论