为了描述方便,定义一些名词:
wal/WAL -- Write Ahead Log,指PG中事务日志模块,这里所指的其实比较模糊,相关的函数,内存都算,自定义WAL就是指,用户可以决定自己存储在shared buffer page里的数据是啥格式,而如果希望,写shared buffer page时,有事务日志的保护,那么就需要调用PG内部的函数,遵从一定的编程规范,构造自己的WAL日志记录,并且插入到WAL缓存。
shared buffer page -- 表的page加载到buffer管理器后的内存块,8K,buffer管理器中的page都会有一个buffer id。
shared buffer -- 内存中缓存table page的一块内存,又buffer管理器管理,是数据库全局的。
wal record -- 即 “WAL记录” 每条记录可能包含多个shared buffer page的修改,都有一个LSN编号。
table page -- 存储在磁盘的表的数据块,每个数据块有表内的偏移量page no,加载到buffer管理器后,有个buffer id,page no和buffer id不同。
1、PG的自定义WAL编程规范
在PG源码中,为自定义wal提供了比较底层的API和编程规范,当用户想要自定义的数据,也具备wal保护时,用户可以使用这套API和编程规范,向wal缓存中插入自定义的日志记录,恢复时,再提取自己的wal record,并自由决定如何恢复。
这个编程规范背后有个理论知识:为了事务的一致性(单个事务的原子性和并发事务的隔离性),在shared buffer page落盘前,对应的wal record要先落盘。
根据PG源码中的 src/backend/access/transam/README的描述,自定义wal的编程规范如下:
1)假设,要修改一个shared buffer page,先对这个page加pin和互斥锁。
注意,加互斥锁会阻塞其它想读写这个page的事务,所以一般是把要存储的数据准备(计算)好,最后一步才写wal和shared buffer page。
2)调用 START_CRIT_SECTION()
3)修改shared buffer page的内容和头部LSN并调用MarkBufferDirty(),构造wal record 插入wal缓存。
4)调用 END_CRIT_SECTION()
START_CRIT_SECTION 和 END_CRIT_SECTION 之间的代码称为临界区,那么是否其它线程/进程执行到这里会阻塞?不会,这是个误解,这里的临界区不是linux系统编程里的临界区。
在PG中,这两个宏之间代码,如果报错(调用ereport ),就会导致整个shared buffer 的重置(应该是哪个进程的退出?),仅此而已,不会阻塞,注意,与linux系统编程里的临界区不是一个概念!
5)释放此shared buffer page上的互斥锁。在释放这个page的互斥锁之前,是先写page,还是先构造wal record并插入wal缓存,都无所谓,只要在释放互斥锁前wal record插入了wal缓存,PG会保证wal record在对应的page之前落盘,这就遵守了前面的一致性理论。
6)那么红字的部分,具体怎么做?如何构造wal record?下面是一个例子:
(来自src/backend/access/transam/generic_xlog.c)
XLogRecPtr
GenericXLogFinish(GenericXLogState *state)
{XLogRecPtr lsn;int i;if (state->isLogged){/* Logged relation: make xlog record in critical section. */XLogBeginInsert();START_CRIT_SECTION();/** Compute deltas if necessary, write changes to buffers, mark buffers* dirty, and register changes.*/for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++){PageData *pageData = &state->pages[i];Page page;PageHeader pageHeader;if (BufferIsInvalid(pageData->buffer))continue;page = BufferGetPage(pageData->buffer);pageHeader = (PageHeader) pageData->image;/** Compute delta while we still have both the unmodified page and* the new image. Not needed if we are logging the full image.*/if (!(pageData->flags & GENERIC_XLOG_FULL_IMAGE))computeDelta(pageData, page, (Page) pageData->image);/** Apply the image, being careful to zero the "hole" between* pd_lower and pd_upper in order to avoid divergence between* actual page state and what replay would produce.*/memcpy(page, pageData->image, pageHeader->pd_lower);memset(page + pageHeader->pd_lower, 0,pageHeader->pd_upper - pageHeader->pd_lower);memcpy(page + pageHeader->pd_upper,pageData->image + pageHeader->pd_upper,BLCKSZ - pageHeader->pd_upper);MarkBufferDirty(pageData->buffer);if (pageData->flags & GENERIC_XLOG_FULL_IMAGE){XLogRegisterBuffer(i, pageData->buffer,REGBUF_FORCE_IMAGE | REGBUF_STANDARD);}else{XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);}}/* Insert xlog record */lsn = XLogInsert(RM_GENERIC_ID, 0);/* Set LSN */for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++){PageData *pageData = &state->pages[i];if (BufferIsInvalid(pageData->buffer))continue;PageSetLSN(BufferGetPage(pageData->buffer), lsn);}END_CRIT_SECTION();}else{/* Unlogged relation: skip xlog-related stuff */START_CRIT_SECTION();for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++){PageData *pageData = &state->pages[i];if (BufferIsInvalid(pageData->buffer))continue;memcpy(BufferGetPage(pageData->buffer),pageData->image,BLCKSZ);/* We don't worry about zeroing the "hole" in this case */MarkBufferDirty(pageData->buffer);}END_CRIT_SECTION();/* We don't have a LSN to return, in this case */lsn = InvalidXLogRecPtr;}pfree(state);return lsn;
}
void XLogBeginInsert(void); // 表示开始构造一个wal record
/*
XLogRegisterBuffer的参数block_id,指wal模块内部缓存的id,这个缓存用来存储wal record信息,大于8K,buffer参数是shared buffer page的id, 表示这条wal record是对shared buffer里id为buffer的page的修改,调用这个函数,这些信息会包括在wal record里,我猜测也应该包括buffer所在的表和它在表中的页号。
可以在一个wal record的构造过程中,多次调用XLogRegisterBuffer分配多个缓存,但是一个wal record,最多包括4个缓存(也可以调整)。
如果要做full page write,这个函数会自动判断并为我们做。
*/
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
/*
根据对shared buffer page的修改,我们会构造表示修改的数据,然后,把这些数据 “装进” PG为我们提供的容器(wal record),这些数据就是wal record的主体内容。
这个操作,使用函数 void XLogRegisterBufData(uint8 block_id, char *data, uint32 len) 来实现。
参数 block_id 是前一步调用 XLogRegisterBuffer 时指定的 id,参数data是修改数据的起始地址,参数len是修改数据的长度,这个函数把数据放到id指定的缓存中。
如果在构造一个wal record的过程中,多次调用XLogRegisterBufData,数据会追加到前一次数据的后面。
*/
void XLogRegisterBufData(uint8 block_id, char *data, uint32 len)
/*
当我们想结束一个wal record构造时,调用XLogRecPtr XLogInsert(RmgrId rmid, uint8 info),
它的返回值是WAL日志文件的末尾偏移,可做为LSN写到shared buffer page头部,
参数rmid,表示我们自定义WAL模块的id,它会记录在wal record里,恢复时,PG会根据wal record的id,调用属于那个模块的redo函数。
这个函数把wal record写到wal缓存,并不一定落盘,commit时才落盘。
*/
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
7)如何恢复?redo函数如何写?
XLogInsert(RM_GENERIC_ID, 0) 的第一个参数,指定了我们自定义的WAL日志处理模块的ID,我们在定义这个模块时,要指定一个redo函数,例如 generic_redo() :
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
当PG做恢复时,会从WAL日志文件里一条一条读取日志记录,并根据记录中的这个ID信息,调用对应的redo函数。
传进redo函数的参数是XLogReaderState结构体(指针),当初(写WAL日志时)存储在这条wal record里的所有信息,都可以从这个XLogReaderState结构体里获取。
在redo函数里,我们调用PG提供的API函数,以XLogReaderState为参数,提取信息:包括修改了哪个表的哪个page、修改数据块。
redo函数里典型的恢复流程(以generic_redo()为例)如下:
/** Redo function for generic xlog record.*/
void
generic_redo(XLogReaderState *record)
{XLogRecPtr lsn = record->EndRecPtr;Buffer buffers[MAX_GENERIC_XLOG_PAGES];uint8 block_id;/* Protect limited size of buffers[] array */Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES);/* Iterate over blocks */for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++){XLogRedoAction action;if (!XLogRecHasBlockRef(record, block_id)){buffers[block_id] = InvalidBuffer;continue;}action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);/* Apply redo to given block if needed */if (action == BLK_NEEDS_REDO){Page page;PageHeader pageHeader;char *blockDelta;Size blockDeltaSize;page = BufferGetPage(buffers[block_id]);blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);applyPageRedo(page, blockDelta, blockDeltaSize);/** Since the delta contains no information about what's in the* "hole" between pd_lower and pd_upper, set that to zero to* ensure we produce the same page state that application of the* logged action by GenericXLogFinish did.*/pageHeader = (PageHeader) page;memset(page + pageHeader->pd_lower, 0,pageHeader->pd_upper - pageHeader->pd_lower);PageSetLSN(page, lsn);MarkBufferDirty(buffers[block_id]);}}/* Changes are done: unlock and release all buffers */for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++){if (BufferIsValid(buffers[block_id]))UnlockReleaseBuffer(buffers[block_id]);}
}
a、因为一个wal record中可能包含多个page的修改,每个page对应wal record里的一个block,所以,我们先看看这个wal record中有多少个block,调用XLogRecMaxBlockId(),获得这条wal record有多少个block。
b、然后对于每个wal record里的block,调用XLogRecHasBlockRef,确认一下这个block里有没有数据或是full-page-image。
c、确定了一个block是可用的后,将这个block对应的表的page加载进shared buffer,调用
XLogRedoActionXLogReadBufferForRedo(XLogReaderState *record, uint8 block_id, Buffer *buf)
这个函数还会对加载进来的page加互斥锁,参数中Buffer *buf是返回值,是shared buffer中的那个page的buffer id,后面会用这个buffer id获取page在内存中的地址。
如果这个block里存储的是full page image,那么这个函数会自动把full page image复制到buffer,然后用户就不需要做什么了。
这个函数的返回值,指示了用户接下来该怎么做,有:
BLK_NEEDS_REDO, /* changes from WAL record need to be applied */BLK_DONE, /* block is already up-to-date */BLK_RESTORED, /* block was restored from a full-page image */BLK_NOTFOUND, /* block was not found (and hence does not need to be replayed) */
只有 BLK_NEEDS_REDO 才需要用户根据当初写到wal record里的数据,修改shared buffer page。
如果返回BLK_DONE,说明这个block对应的表的page不需要恢复。
d、假设 XLogReadBufferForRedo 返回的是 BLK_NEEDS_REDO,那么我接着就需要调用:
blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize)
这个函数返回一个数据块,它是写WAL日志时,XLogRegisterBufData()写到block里的数据块。
修改完shared buffer page后记得要调用PageSetLSN设置page头部的LSN,然后调用MarkBufferDirty()告诉buffer manager,这个page落盘前,它的wal record要先落盘。
e、最后调用UnlockReleaseBuffer,释放XLogReadBufferForRedo加的互斥锁。
2、generic_xlog.c 代码解析
当自定义的access method(AM),想对shared buffer page进行修改,而且希望修改是在wal保护下,最简单的方法就是使用generic xlog,这样不需要自己定义一套wal record格式和redo逻辑。
2.1那如何使用generic xlog呢?或者说它的编程规范是什么呢?
假设我们想写的table page已经在shared buffer中了(如果不在可以调用ReadBufferExtended把它加载到shared buffer)。
1)对于这个shared buffer page,调用LockBuffer对它加互斥锁。
2)然后调用GenericXLogState *GenericXLogStart(Relation relation),表示开始构建一条wal record,它会创建GenericXLogState对象并返回指针。
3)然后调用 Page GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags),输入的参数包括GenericXLogState对象指针和shared buffer page的buffer id,还有flags这个参数目前没有用,函数会复制buffer id所指的shared buffer page到GenericXLogState里的之前分配的一块缓存(targetpage),并返回起始地址,之后对于shared buffer page的修改,全部在这个缓存内修改(就是把这个缓存当作shared buffer page来操作),所以所修改的内存范围也不能超过这缓存(8K)。
4)当修改完毕,想要写wal record时,调用 XLogRecPtr GenericXLogFinish(GenericXLogState *state) ,这个函数做的,其实是xlog底层api编程规范的写日志流程,不过它会帮你计算GenericXLogState里的缓存(tagetpage)和shared buffer page(curpage)的差异,按照xlog的编程规范,生成wal record,插入record到wal buffer。
5)释放shared buffer page上的互斥锁。
6)这样,就对shared buffer page完成了一次修改,并为它生成了wal record,并插入到wal buffer。但是这样是否提交了事务呢?我认为是没有提交事务的,一个事务中,可能包含多个对shared buffer page的修改,即包含多个wal record,上面的流程,只是完成了一次wal日志保护下的shared buffer page修改。一个事务应该可以包含多次上面的修改流程。
7)关于GenericXLogState,这个结构体贯穿于一个wal record的生命周期,里面包括了4个page缓存(GenericXLogState::images)和4个delta缓存(PageData::delta),它们的大小大约都是一个标准page(每个page缓存和它对应的delta由一个PageData对象表示,又称为block),调用GenericXLogRegisterBuffer时会使用一个page缓存,在一个generic xlog record的构建中,可以多次调用GenericXLogRegisterBuffer,一个generic xlog record最多可以包括4个shared buffer page的修改(即4个page缓存和delta缓存),在调用GenericXLogFinish时,GenericXLogState里page缓存与对应shared buffer page的差异,被转换成一个个segment存储在相关联的delta缓存中。
/* Struct of generic xlog data for single page */
typedef struct
{Buffer buffer; /* registered buffer */int flags; /* flags for this buffer */int deltaLen; /* space consumed in delta field */char *image; /* copy of page image for modification, do not* do it in-place to have aligned memory chunk */char delta[MAX_DELTA_SIZE]; /* delta between page images */
} PageData;/** State of generic xlog record construction. Must be allocated at an I/O* aligned address.*/
struct GenericXLogState
{/* Page images (properly aligned, must be first) */PGIOAlignedBlock images[MAX_GENERIC_XLOG_PAGES];/* Info about each page, see above */PageData pages[MAX_GENERIC_XLOG_PAGES];bool isLogged;
};
2.2 那么,GenericXLogState里page缓存与对应shared buffer page的差异,具体是如何被转换成一个个segment,segment又是什么数据结构呢?
(相关的逻辑在computeRegionDelta()函数)
为了便于描述,我们把GenericXLogState里的page缓存,称为targetpage,shared buffer page称为curpage。
我们的目的是:用尽量少的数据,表示targetpage与curpage之间数据的差异。
而且别忘了,写wal日志的流程是:curpage加互斥锁,复制到targetpage,用户把修改写到targetpage,计算targetpage与curpage的差异,生成wal record,插入wal缓存,差异写到curpage并标记dirty和lsn,释放curpage互斥锁。
我们逐个字节比较targetpage和curpage,将差异部分,分成一个一个数据块,数据块之间是不允许有重叠的,我们遍历完两个page,就得到了若干个数据块,我们再用2字节(short)存储数据块在page中的偏移量,2字节存储数据块长度,这4个字节加上数据块组成一个差异单元,称为segment,一个page的所有segment,紧挨在一起,写到deta缓存。
如上图所示,左边黄色表示shared buffer page最初的数据,右边表示复制到GenericXLogState里page缓存后,用户做了修改(蓝色表示两者不同的数据),我们会逐个字节比较curpage和targetpage,分三种情况生成segment:
一、当遇到不同的字节时,表示一个segment开始了,继续向后比较,当遇到相同的字节时,这个segment的结束(例如segment1),再把差异数据块和它的偏移量、长度、组合成segment1写到delta里。
二、如果第一种情况遇到了相同的字节,但是相同的字节很少(小于8Byte,例如segment2),之后又遇到了不同的字节,这时会把很少的相同字节和后面不同的字节都包含在这个segment里,就是说,当不同字节间的“缝隙”很小时,这些不同的字节和“缝隙”都被包含在一个segment里,即segment2的情况。
注意,这里的前提是,page的布局是标准格式,因此会有lower和upper的指针的概念,它们之间的是空白区域,不用来计算segment。
三、还有可能targetpage修改的数据多于(或小于)curpage,即 targetpage 的 lower 大于 curpage 的 lower(segment3的情况),当然也可能是 targetpage 的 lower 小于 curpage 的lower。不管哪种情况,两个page,lower之间的那部分数据,不再逐个字节比较是否相同字节,直接算作一个segment。对于 upper 端也是一样处理:targetpage 和 curpage 的两个 upper 之间的内存区域的,不管是否有相同、不同或者缝隙,简单地算作一个segment(segment4)。
2.3 generic xlog 的redo函数:
使用generic xlog,恢复时redo函数,用户不需要考虑也不需要编写,generic xlog已经编写了void generic_redo(XLogReaderState *record)帮你处理。
简单说一下,PG 读取 wal 记录,发现其ID是属于generic xlog,就调用 generic_redo,把记录传给它(XLogReaderState ),generic_redo的代码逻辑,从中提取修改数据,即delta缓存,以及与它对应的table page号,加载page到shared buffer并加互斥锁,用delta里的segment修改对应的 shared buffer page,再修改page 头部的lsn,设置dirty,解互斥锁,继续如此处理下一个page或 wal 记录。
修改的具体过程是,把delta中的一个个segment按提取出来,按照偏移量和长度,覆盖到shared buffer 里对应的page中。
参考:
PostgreSQL: Documentation: 12: Chapter 62. Generic WAL Records
postgresql/src/backend/access/transam/README