PostgreSQL事务回卷
渊渱 人气:0背景
前阵子某个客户反馈他的RDS PostgreSQL无法写入,报错信息如下:
postgres=# select * from test;
id
----
(0 rows)postgres=# insert into test select 1;
ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT: Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.
随后RDS工程师介入处理以后,该问题立马得到了解决。
XID基础原理
XID 定义
XID(Transaction ID)是 PostgreSQL 内部的事务编号,每个事务都会分配一个XID,依次递增。PostgreSQL 数据中每个元组头部都会保存着 插入 或者 删除 这条元组的XID(Transaction ID),然后内核通过这个 XID 构造数据库的一致性读。在事务隔离级别是 可重复读 的情况下,假设如有两个事务,xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元组,看不到 t_xmin > 200 的元组。
typedef uint32 TransactionId; /* 事务号定义,32位无符号整数 */ typedef struct HeapTupleFields { TransactionId t_xmin; /* 插入该元组的事务号 */ TransactionId t_xmax; /* 删除或锁定该元组的事务号 */ /*** 其它属性省略 ***/ } HeapTupleFields; struct HeapTupleHeaderData { union { HeapTupleFields t_heap; DatumTupleFields t_datum; } t_choice; /*** 其它属性省略 ***/ };
XID 发行机制
从上面结构中我们可以看到,XID 是一个32位无符号整数,也就是 XID 的范围是 0到2^32-1;那么超过了 2^32-1的事务怎么办呢?其实 XID 是一个环,超过了 2^32-1 之后又会从头开始分配。通过源代码也证明了上述结论:
// 无效事务号 #define InvalidTransactionId ((TransactionId) 0) // 引导事务号,在数据库初始化过程(BKI执行)中使用 #define BootstrapTransactionId ((TransactionId) 1) // 冻结事务号用于表示非常陈旧的元组,它们比所有正常事务号都要早(也就是可见) #define FrozenTransactionId ((TransactionId) 2) // 第一个正常事务号 #define FirstNormalTransactionId ((TransactionId) 3) // 把 FullTransactionId 的低32位作为无符号整数生成 xid #define XidFromFullTransactionId(x) ((uint32) (x).value) static inline void FullTransactionIdAdvance(FullTransactionId *dest) { dest->value++; while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId) dest->value++; } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); /*** 省略 ***/ FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid); /*** 省略 *** return full_xid; } static void AssignTransactionId(TransactionState s) { /*** 省略 ***/ s->fullTransactionId = GetNewTransactionId(isSubXact); if (!isSubXact) XactTopFullTransactionId = s->fullTransactionId; /*** 省略 ***/ } TransactionId GetTopTransactionId(void) { if (!FullTransactionIdIsValid(XactTopFullTransactionId)) AssignTransactionId(&TopTransactionStateData); return XidFromFullTransactionId(XactTopFullTransactionId); }
可以看到,新事务号保存在共享变量缓存中:ShmemVariableCache->nextFullXid,每发行一个事务号后,向上调整它的值,并跳过上述三个特殊值。三个特殊仠分别为0、1和2,作用可以看上面代码注释。
XID 回卷机制
前面说到,XID 是一个环,分配到 2^32-1 之后又从 3 开始,那么内核是怎么比较两个事务的大小的呢?比如 xid 经历了这样一个过程 3-> 2^32-1 -> 5,那么内核怎么样知道 5 这个事务在 2^32-1 后面呢?我们再看一下代码:
/* * TransactionIdPrecedes --- is id1 logically < id2? */ bool TransactionIdPrecedes(TransactionId id1, TransactionId id2) { /* * If either ID is a permanent XID then we can just do unsigned * comparison. If both are normal, do a modulo-2^32 comparison. */ int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 < id2); diff = (int32) (id1 - id2); return (diff < 0); }
可以看到,内核使用了一个比较取巧的方法:(int32) (id1 - id2) < 0,32位有符号整数的取值范围是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以转换成 int32 会变成负数。但是这里面有一个问题,「最新事务号-最老事务号」 必须小于 2^31,一旦大于就会出现回卷,导致老事务产生的数据对新事务不可见。
XID 回卷预防
前面讲到,「最新事务号-最老事务号」 必须小于 2^31,否则会发生回卷导致老事务产生的数据对新事务不可见,那内核是怎么避免这个问题的呢?内核是这样处理的:通过定期把老事务产生的元组的 XID 更新为 FrozenTransactionId,即更新为2,来回收 XID,而 XID 为2 的元组对所有的事务可见,这个过程称为 XID 冻结,通过这个方式可以回收 XID 来保证 |最新事务号-最老事务号| < 2^31。
除了内核自动冻结回收XID,我们也可以通过命令或者 sql 的方式手动进行 xid 冻结回收
- 查询数据库或表的年龄,数据库年龄指的是:「最新事务号-数据库中最老事务号」,表年龄指的是:「最新事务号-表中最老事务号」
# 查看每个库的年龄 SELECT datname, age(datfrozenxid) FROM pg_database; # 1个库每个表的年龄排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1个表的年龄 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
手动冻结回收一张表的元组的 xid 的sql:
vacuum freeze 表名;
手动冻结回收一个库里面的所有表 xid 的命令:
vacuumdb -d 库名 --freeze --jobs=30 -h 连接串 -p 端口号 -U 库Owner
冻结回收过程是一个重 IO 的操作,这个过程内核会描述表的所有页面,然后把符合要求的元组的 t_xmin 字段更新为 2,所以这个过程需要在业务低峰进行,避免影响业务。
与冻结回收相关的内核参数有三个:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于笔者对于这三个参数理解不深,就不在这里班门弄斧了,感兴趣的同学可以自行找资料了解一下。
解决方案
问题分析
基于上面的原理分析,我们知道,「最新事务号-最老事务号」 = 2^31-1000000,即当前可用的 xid 仅剩下一百万的时候,内核就会禁止实例写入并报错:database is not accepting commands to avoid wraparound data loss in database, 这个时候必须连到提示中的 "xxxx" 对表进行 freeze 回收更多的 XID。
void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) { TransactionId xidVacLimit; TransactionId xidWarnLimit; TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* * xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将发生回卷 */ xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1); if (xidWrapLimit < FirstNormalTransactionId) xidWrapLimit += FirstNormalTransactionId; /* * 一旦当前事务号到达xidStopLimit,实例将不可写入,保留 1000000 的xid用于vacuum * 每 vacuum 一张表需要占用一个xid */ xidStopLimit = xidWrapLimit - 1000000; if (xidStopLimit < FirstNormalTransactionId) xidStopLimit -= FirstNormalTransactionId; /* * 一旦当前事务号到达xidWarnLimit,将不停地收到 * WARNING: database "xxxx" must be vacuumed within 2740112 transactions */ xidWarnLimit = xidStopLimit - 10000000; if (xidWarnLimit < FirstNormalTransactionId) xidWarnLimit -= FirstNormalTransactionId; /* * 一旦当前事务号到达xidVacLimit将触发force autovacuums */ xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age; if (xidVacLimit < FirstNormalTransactionId) xidVacLimit += FirstNormalTransactionId; /* Grab lock for just long enough to set the new limit values */ LWLockAcquire(XidGenLock, LW_EXCLUSIVE); ShmemVariableCache->oldestXid = oldest_datfrozenxid; ShmemVariableCache->xidVacLimit = xidVacLimit; ShmemVariableCache->xidWarnLimit = xidWarnLimit; ShmemVariableCache->xidStopLimit = xidStopLimit; ShmemVariableCache->xidWrapLimit = xidWrapLimit; ShmemVariableCache->oldestXidDB = oldest_datoid; curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); LWLockRelease(XidGenLock); /* Log the info */ ereport(DEBUG1, (errmsg("transaction ID wrap limit is %u, limited by database with OID %u", xidWrapLimit, oldest_datoid))); /* * 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age * 触发 autovacuum 对年龄最老的数据库进行清理,如果有多个数据库达到要求,按年龄最老的顺序依次清理 * 通过设置标志位标记当前 autovacuum 结束之后再来一次 autovacuum */ if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) && IsUnderPostmaster && !InRecovery) SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER); /* Give an immediate warning if past the wrap warn point */ if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery) { char *oldest_datname; if (IsTransactionState()) oldest_datname = get_database_name(oldest_datoid); else oldest_datname = NULL; if (oldest_datname) ereport(WARNING, (errmsg("database \"%s\" must be vacuumed within %u transactions", oldest_datname, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); else ereport(WARNING, (errmsg("database with OID %u must be vacuumed within %u transactions", oldest_datoid, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); } } bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2) { int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 >= id2); diff = (int32) (id1 - id2); return (diff >= 0); } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) { TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit; TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit; TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit; Oid oldest_datoid = ShmemVariableCache->oldestXidDB; /*** 省略 ***/ if (IsUnderPostmaster && TransactionIdFollowsOrEquals(xid, xidStopLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"", oldest_datname), errhint("Stop the postmaster and vacuum that database in single-user mode.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); /*** 省略 ***/ } /*** 省略 ***/ } /*** 省略 ***/ }
问题定位
# 查看每个库的年龄 SELECT datname, age(datfrozenxid) FROM pg_database; # 1个库每个表的年龄排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1个表的年龄 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名称.表名称'::regclass::oid;
问题解决
- 通过上面的第一个 sql,查找年龄最大的数据库,数据库年龄指的是:|最新事务号-数据库中最老事务号|
- 通过上面第二个 sql,查找年龄最大的表,然后对表依次执行:vacuum freeze 表名,把表中的老事务号冻结回收,表年龄指的是:|最新事务号-表中最老事务号|
- 运维脚本
单进程 Shell 脚本
# 对指定数据库中年龄最大的前 50 张表进行 vacuum freeze for cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd | grep -v row | grep vacuum`; do psql -U用户名 -p端口号 -h连接串 -d数据库名 -c "$cmd" done
多进程 Python 脚本
from multiprocessing import Pool import psycopg2 args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='数据库名', user='用户名', password='密码') def vacuum_handler(sql): sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; " try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql) conn.commit() cur = conn.cursor() cur.execute(sql_str) print cur.fetchall() conn.close() except Exception as e: print str(e) # 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发执行 def multi_vacuum(): pool = Pool(processes=32) sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;"; try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql_str) rows = cur.fetchall() for row in rows: cmd = row['vacuum_cmd'] pool.apply_async(vacuum_handler, (cmd, )) conn.close() pool.close() pool.join() except Exception as e: print str(e) multi_vacuum()
友情提示
vacuum freeze 会扫描表的所有页面并更新,是一个重 IO 的操作,操作过程中一定要控制好并发数,否则非常容易把实例打挂。
作者信息
谢桂起(花名:渊渱) 2020年毕业后加入阿里云,一直从事RDS PostgreSQL相关工作,善于解决线上各类RDS PostgreSQL运维管控相关问题。
总结
加载全部内容