上篇在介绍交易池时有讲到对于本地交易的特殊处理。为了不丢失未完成的本地交易,以太坊交易池通过 journal 文件存储和管理当前交易池中的本地交易,并定期更新存储。
下图是交易池对本地待处理交易的磁盘存储管理流程,涉及加载、实时写入和定期更新维护。
在交易池首次启动 journal 时,将主动将该文件已存储的交易加载到交易池。
//core/tx_journal.go:61
if _, err := os.Stat(journal.path); os.IsNotExist(err) { //❶
return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path) //❷
if err != nil {
return err
}
defer input.Close()
处理时,如果文件不存在则退出 ❶,否则 Open 文件,获得 input 文件流 ❷。
//core/tx_journal.go:76
stream := rlp.NewStream(input, 0)//❸
total, dropped := 0, 0
因为存储的内容格式是 rlp 编码内容,因此可以直接初始化 rlp 内容流 ❸,为连续解码做准备。
var (
failure error
batch types.Transactions
)
for {
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil { //❹
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {//❼
loadBatch(batch)
}
break
}
total++
if batch = append(batch, tx); batch.Len() > 1024 {//❺
loadBatch(batch)//❻
batch = batch[:0]
}
}
直接进入 for 循环遍历,不断从 stream 中一笔笔地解码出交易❹。但交易并非单笔直接载入交易池,而是采用批量提交模式,每 1024 笔交易提交一次 ❺。 批量写入,有利于降低交易池在每次写入交易后的更新。一个批次只需要更新(排序与超限处理等)一次。当然在遍历结束时(err==io.EOF),也需要将当前批次中的交易载入❼。
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++ //❽
}
}
}
loadBatch 就是将交易一批次加入到交易池,并获得交易池的每笔交易的处理情况。如果交易加入失败,则进行计数 ❽。最终在 load 方法执行完毕时,显示交易载入情况。
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
当交易池新交易来自于本地账户时❶,如果已开启记录本地交易,则将此交易加入journal ❷。到交易池时,将实时存储到 journal 文件中。
//core/tx_pool.go:757
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {//❶
return
}
if err := pool.journal.insert(tx); err != nil { //❷
log.Warn("Failed to journal local transaction", "err", err)
}
}
而 journal.insert
则将交易实时写入文件流中❸,相当于实时存储到磁盘。而在写入时,是将交易进行RLP编码。
//core/tx_journal.go:120
func (journal *txJournal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {//❸
return err
}
return nil
}
这里引发了在上面载入已存储交易时将交易重复写入文件。因此在加载交易时,使用一个 空 writer 替代 ❹。
//core/tx_journal.go:72
journal.writer = new(devNull) //❹
defer func() { journal.writer = nil }() //❺
并且在加载结束时清理❺。
journal 的目的是长期存储本地尚未完成的交易,以便交易不丢失。而文件内容属于交易的RLP编码内容,不便于实时清空已完成或已无效的交易。因此以太坊采取的是定期将交易池在途交易更新到 journal 文件中。
首先,在首次加载文件中的交易到交易池后,利用交易池的检查功能,将已完成或者已完成的交易拒绝在交易池外。在加载完成后,交易池中的交易仅仅是本地账户待处理的交易,因此在加载完成后❶,立即将交易池中的所有本地交易覆盖journal文件❷。
//core/tx_pool.go:264
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {//❶
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {//❷
log.Warn("Failed to rotate transaction journal", "err", err)
}
在 rotate 中,并非直接覆盖。而是先创建另一个新文件❸,将所有交易RLP编码写入此文件❹ 。
replacement, err := os.OpenFile(journal.path+".new", //❸
os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {//❹
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()
写入完毕,将此文件直接移动(重命名),已覆盖原 journal 文件。
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
其次,是交易池根据参数 txpool.rejournal
所设置的更新间隔定期更新❺。将交易池中的本地交易存储到磁盘❻。
//core/tx_pool.go:298
journal := time.NewTicker(pool.config.Rejournal)//❺
//...
for {
select {
//...
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil { //❻
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
上述,是以太坊交易池对于本地交易进行持久化存储管理细节。