菜鸟系列Fabric源码学习 — committer记账节点
Liguo_Ji 人气:0Fabric 1.4 源码分析 committer记账节点
本文档主要介绍committer记账节点如何初始化的以及committer记账节点的功能及其实现。
1. 简介
记账节点负责验证交易和提交账本,包括公有数据(即区块数据,包括公共数据和私密数据hash值)与私密数据。在提交账本前需要验证交易数据的有效性,包括交易消息的格式、签名有效性以及调用VSCC验证消息的合法性及指定背书策略的有效性,接着通过MVCC检查读写集冲突并标记交易的有效性,最后提交区块数据到区块文件系统,建立索引信息并保存到区块索引数据库,更新有效交易和私密数据到状态数据库,将经过背书节点到有效交易同步到历史数据库,并更新隐私数据库。
2. 记账节点初始化
首先,每个通道里面的组织的peer节点都是committer记账节点(则commiter记账节点初始化肯定和通道操作相关),因此记账节点初始化肯定是在peer加入通道或者peer启动时已存在通道的初始化过程中。首先commiter节点主要负责验证交易和提交账本。因此实现了以下接口:
// 提交账本
type Committer interface {
// CommitWithPvtData block and private data into the ledger
CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error
// GetPvtDataAndBlockByNum retrieves block with private data with given
// sequence number
GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockAndPvtData, error)
// GetPvtDataByNum returns a slice of the private data from the ledger
// for given block and based on the filter which indicates a map of
// collections and namespaces of private data to retrieve
GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error)
// Get recent block sequence number
LedgerHeight() (uint64, error)
// Gets blocks with sequence numbers provided in the slice
GetBlocks(blockSeqs []uint64) []*common.Block
// GetConfigHistoryRetriever returns the ConfigHistoryRetriever
GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error)
// CommitPvtDataOfOldBlocks commits the private data corresponding to already committed block
// If hashes for some of the private data supplied in this function does not match
// the corresponding hash present in the block, the unmatched private data is not
// committed and instead the mismatch inforation is returned back
CommitPvtDataOfOldBlocks(blockPvtData []*ledger.BlockPvtData) ([]*ledger.PvtdataHashMismatch, error)
// GetMissingPvtDataTracker return the MissingPvtDataTracker
GetMissingPvtDataTracker() (ledger.MissingPvtDataTracker, error)
// Closes committing service
Close()
}
// 验证交易到合法性,包括交易格式的合法性、背书策略的有效性(vscc)
type Validator interface {
Validate(block *common.Block) error
}
// private interface to decouple tx validator
// and vscc execution, in order to increase
// testability of TxValidator
type vsccValidator interface {
VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode)
}
那么commiter模块功能是何时初始化的呢?core/peer/peer.go文件中的createChain()函数。(peer创建通道和peer启动时都会调用改函数)
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider, pm txvalidator.PluginMapper) error {
...
// 构建新的验证链码支持对象
vcs := struct {
*chainSupport
*semaphore.Weighted
}{cs, validationWorkersSemaphore}
// 创建交易验证器
validator := txvalidator.NewTxValidator(cid, vcs, sccp, pm)
// 创建账本提交器
c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})
ordererAddresses := bundle.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("no ordering service endpoint provided in configuration block")
}
// TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?
// 创建Transient隐私数据存储对象
store, err := TransientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
if err != nil {
return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())
}
csStoreSupport := &CollectionSupport{
PeerLedger: ledger,
}
simpleCollectionStore := privdata.NewSimpleCollectionStore(csStoreSupport)
// 初始化指定通道的gossip模块
service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
Validator: validator,
Committer: c,
Store: store,
Cs: simpleCollectionStore,
IdDeserializeFactory: csStoreSupport,
})
chains.Lock()
defer chains.Unlock()
//放入chain map中
chains.list[cid] = &chain{
cs: cs,
cb: cb,
committer: c,
}
return nil
}
3. 调用committer模块
本节主要介绍交易如何调用committer模块,即写区块流程。根据区块同步可知,最后区块传输流程为通过addPayload()函数将区块写入gossip.payloadbuff中,然后触发协程go deliverPayloads(),在里面调用了commitBlock()方法实现写区块过程。
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
// 1、保存区块
if err := s.ledger.StoreBlock(block, pvtData); err != nil {
logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
return err
}
// 2、更新区块高度
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
return nil
}
其中PvtDataCollections:
type PvtDataCollections []*ledger.TxPvtData
type TxPvtData struct {
// 在区块的序号
SeqInBlock uint64
// 写集
WriteSet *rwset.TxPvtReadWriteSet
}
在同步区块中,介绍到leader和orderer同步区块,peer pull区块以及leader push区块。但是leader和orderer同步区块时私密数据集PvtDataCollections=nil。
StoreBlock()函数
主要完成区块和私密数据的存储
// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {
// 对data和header验证
if block.Data == nil {
return errors.New("Block data is empty")
}
if block.Header == nil {
return errors.New("Block header is nil")
}
// 对交易进行验证,包括调用vscc链码
err := c.Validator.Validate(block)
c.reportValidationDuration(time.Since(validationStart))
blockAndPvtData := &ledger.BlockAndPvtData{
Block: block,
PvtData: make(ledger.TxPvtDataMap),
MissingPvtData: make(ledger.TxMissingPvtDataMap),
}
// 获取该区块上交易相关的私密数据集
ownedRWsets, err := computeOwnedRWsets(block, privateDataSets)
// 标识丢失的私密数据读写集,并尝试从本地瞬时数据库中检索它们
privateInfo, err := c.listMissingPrivateData(block, ownedRWsets)
for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {
// 从其他peer节点获取缺失的私密数据
c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)
}
// populate the private RWSets passed to the ledger
// 填充私密数据读写集
for seqInBlock, nsRWS := range ownedRWsets.bySeqsInBlock() {
rwsets := nsRWS.toRWSet()
// 构造blockAndPvtData结构中的私密数据
blockAndPvtData.PvtData[seqInBlock] = &ledger.TxPvtData{
SeqInBlock: seqInBlock,
WriteSet: rwsets,
}
}
// populate missing RWSets to be passed to the ledger
// 构造缺失的私密数据
for missingRWS := range privateInfo.missingKeys {
blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, true)
}
// populate missing RWSets for ineligible collections to be passed to the ledger
for _, missingRWS := range privateInfo.missingRWSButIneligible {
blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, false)
}
// commit block and private data
// 写账本
err = c.CommitWithPvtData(blockAndPvtData)
if len(blockAndPvtData.PvtData) > 0 {
// Finally, purge all transactions in block - valid or not valid.
if err := c.PurgeByTxids(privateInfo.txns); err != nil {
logger.Error("Purging transactions", privateInfo.txns, "failed:", err)
}
}
seq := block.Header.Number
if seq%c.transientBlockRetention == 0 && seq > c.transientBlockRetention {
err := c.PurgeByHeight(seq - c.transientBlockRetention)
if err != nil {
logger.Error("Failed purging data from transient store at block", seq, ":", err)
}
}
return nil
}
上述流程为:
- 验证区块头和区块数据的有效性
- 验证交易的合法性以及vscc验证背书策略的有效性
- 处理私密数据
- 过滤该区块存在的隐私数据读写集
- 计算本地缺失的私密数据信息
- 从其他节点获取缺失的私密数据信息
- 写区块和私密数据
4. 验证交易的合法性以及vscc验证背书策略的有效性
- Validate(block *common.Block)
主要是该方法实现验证过程。
func (v *TxValidator) Validate(block *common.Block) error {
.....
// 额外开启一个协程,针对区块里面每一个交易进行验证
results := make(chan *blockValidationResult)
go func() {
for tIdx, d := range block.Data.Data {
// ensure that we don't have too many concurrent validation workers
v.Support.Acquire(context.Background(), 1)
go func(index int, data []byte) {
defer v.Support.Release(1)
// 验证交易
v.validateTx(&blockValidationRequest{
d: data,
block: block,
tIdx: index,
}, results)
}(tIdx, d)
}
}()
// 对验证结果进行处理
for i := 0; i < len(block.Data.Data); i++ {
res := <-results
if res.err != nil {
...
} else {
// 设置交易状态码
txsfltr.SetFlag(res.tIdx, res.validationCode)
// 如果交易是有效的
if res.validationCode == peer.TxValidationCode_VALID {
// 设置链码名
if res.txsChaincodeName != nil {
txsChaincodeNames[res.tIdx] = res.txsChaincodeName
}
// 设置升级链码名
if res.txsUpgradedChaincode != nil {
txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode
}
// 设置交易id
txidArray[res.tIdx] = res.txid
}
}
}
// 如果存在重复交易,则设置该交易无效TxValidationCode_DUPLICATE_TXID,防止双花攻击
if v.Support.Capabilities().ForbidDuplicateTXIdInBlock() {
markTXIdDuplicates(txidArray, txsfltr)
}
// 防止多次重复升级链码
v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr)
utils.InitBlockMetadata(block)
// 设置区块交易索引
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
return nil
}
总结以下流程:
此处主要是完成交易验证及背书策略合法性验证。
1. 开启一个协程验证区块里面的交易,并且在该协程为每个交易开启一个协程进行交易验证
2. 对验证结果进行处理,即设置每个交易的交易码以及添加链码名、添加升级链码名
3. 判断是否存在重复交易,将重复交易交易码设置为TxValidationCode_DUPLICATE_TXID
4. 对多次链码升级的无效交易进行处理,此处将交易码设置为TxValidationCode_CHAINCODE_VERSION_CONFLICT
5. 在区块的Metadata.Metadata设置交易索引
- validateTx(req blockValidationRequest, results chan<- blockValidationResult)
该函数主要是验证每个交易的有效性以及背书策略的合法性,传入的参数为blockValidationRequest以及results,经过该方法验证后,将验证结果写入results通道
type blockValidationRequest struct {
// 区块
block *common.Block
// 交易数据
d []byte
// 交易在区块的序号
tIdx int
}
主要流程包括如下:
1. 首先调用validation.ValidateTransaction()验证交易格式、签名以及是否被篡改
2. 通过交易的payload.header获取通道id,判断该通道是否存在。
3. 根据交易类型进行分类处理
+ HeaderType_ENDORSER_TRANSACTION:经过背书节点背书的交易
1. 通过交易id判断交易的唯一性,检查账本是否存在相同的交易id(重放攻击)
2. 接着通过调用VSCCValidateTx验证交易背书签名是否符合对应的背书策略
3. 调用v.getTxCCInstance(payload)获取该交易调用的链码
+ HeaderType_CONFIG:通道配置交易
1. 调用接口configtx.UnmarshalConfigEnvelope(payload.Data)获取配置交易信息configEnvelope
2. 调用接口v.Support.Apply(configEnvelope)更新配置,具体实现fabric/core/peer/peer.go
+ 未知的消息类型
4. 将交易写入results通道中返回,其中合法和不合法的交易构造的blockValidationResult,不合法的只包含(只包含tIdx以及validationCode):
// invalid:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_UNKNOWN_TX_TYPE,
}
// valid:
results <- &blockValidationResult{
tIdx: tIdx,
txsChaincodeName: txsChaincodeName,
txsUpgradedChaincode: txsUpgradedChaincode,
validationCode: peer.TxValidationCode_VALID,
txid: txID,
}
综上,交易验证基本流程可以确定,可以分为验证交易格式、签名以及是否被篡改以及验证交易背书签名是否符合对应的背书策略(HeaderType_ENDORSER_TRANSACTION交易需要验证)这两个方面。接下来将分别介绍为验证交易格式、签名以及是否被篡改、双花攻击以及验证交易背书签名是否符合对应的背书策略这两个接口。
4.1 验证交易格式、交易真实性与完整性
- ValidateTransaction(e *common.Envelope, c channelconfig.ApplicationCapabilities)
该函数主要功能为验证交易格式、签名以及是否被篡改。
主要流程如下:
1. 验证Envelope交易的格式,其中包括(Envelope是否为nil,Envelope.Payload是否为nil,Envelope.Payload.Header)
2. 验证签名是否有效(验证该消息的创建者及其签名是否有效)
3. 根据不同消息类型进行处理
+ HeaderType_ENDORSER_TRANSACTION
1. 验证交易id
2. 验证背书交易是否被篡改
1. 反序列payload.data生成Transaction
2. 验证Actions.Header的格式(是否为nil,长度是否为0)
3. 反序列化ProposalResponsePayload,验证proposal hash
+ HeaderType_CONFIG
主要验证payload.Data, payload.Header是否为nil
+ HeaderType_TOKEN_TRANSACTION
验证交易id是否一致
4.2 VSCC验证
- VSCCValidateTx
该函数主要实现对交易vscc验证
主要流程如下:
1. 解析消息头拓展hdrExt以及通道头chdr,然后通过这两个信息验证链码id和版本是否一致
2. 创建一个命名空间集合,遍历交易读写集,保存namespace,例如lscc、mycc,并进行判断
1. 检查是否存在lscc命名空间
2. 检查是否是不可被其他链码调用的系统链码
3. 检查是否是不可以被外部链码调用的系统链码
3. 根据链码 类型进行验证(应用链码和系统链码)
1. 应用链码
1. 判断命名空间是否存在lscc以及不可调用系统链码
2. 循环遍历当前写集合的命名空间
0. 构造请求从lscc获取链码id、版本以及背书策略
1. 验证链码版本
2. vscc背书策略验证
2. 系统链码
1. 判断命名空间是否是不可调用系统链码
2. vscc背书策略验证
- VSCCValidateTxForCC()
该函数主要实现背书策略验证
VSCCValidateTxForCC()里面会调用ValidateWithPlugin(),调用Validate(),默认实现为core/handlers/validation/builtinhttps://img.qb5200.com/download-x/default_validation.go/Validate(),首先会对block、txPosition进行校验。然后根据不同的版本调用不同的接口。
switch {
case v.Capabilities.V1_3Validation():
err = v.TxValidatorV1_3.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())
case v.Capabilities.V1_2Validation():
fallthrough
default:
err = v.TxValidatorV1_2.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())
}
这里以v1.2版本为例。
func (vscc *Validator) Validate(
block *common.Block,
namespace string,
txPosition int,
actionPosition int,
policyBytes []byte,
) commonerrors.TxValidationError {
// get the envelope
// and the payload...
// validate the payload type
// ...and the transaction...
// 返回去掉重复背书节点身份的签名集合
signatureSet, err := vscc.deduplicateIdentity(cap)
// evaluate the signature set against the policy
// 背书策略验证
err = vscc.policyEvaluator.Evaluate(policyBytes, signatureSet)
//如果是lscc,则继续验证lscc
// do some extra validation that is specific to lscc
if namespace == "lscc" {
err := vscc.ValidateLSCCInvocation(chdr.ChannelId, env, cap, payl, vscc.capabilities)
}
return nil
}
- 背书策略验证
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
func (id *PolicyEvaluator) Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error {
pp := cauthdsl.NewPolicyProvider(id.IdentityDeserializer)
policy, _, err := pp.NewPolicy(policyBytes)
if err != nil {
return err
}
return policy.Evaluate(signatureSet)
}
最后会调用compile()返回的验证方法进行验证。
此处根据策略类型进行验证
+ SignaturePolicy_NOutOf_类型策略。
递归构造自策略验证方法compiledPolicy,并放入策略验证方法集合policies中。然后返回一个方法。在该方法中,会遍历policys,进行验证,如果子策略是SignaturePolicy_NOutOf_类型策略,会继续递归调用验证方法,最后直到最底层子策略为SignaturePolicy_SignedBy。如果通过验证,则verified自增,然后返回验证通过的个数是否满足策略要求。
+ SignaturePolicy_SignedBy类型策略
首先验证签名索引signedby的合法性。再返回一个方法。该方法遍历签名数据列表进行判断。
1. 跳过已经匹配的身份实体
2. 解析签名身份实体的identity
3. 验证identity是否满足指定签名策略identity.SatisfiesPrincipal(signedByID)
4. 再验证identity签名的真实性
其中,SatisfiesPrincipal会最终调用satisfiesPrincipalInternalPreV13()。其中存在多种验证方式。
1. MSPPrincipal_ROLE 基于角色的验证
1. 验证是否为相同的MSP;
2. 验证是否是有效的证书;
如果是admin,会遍历MSP里面的adminSFZ书,按字节比对。如果是peer/client,会验证组织部门信息是否匹配
2. MSPPrincipal_IDENTITY 基于身份的验证
此处主要验证SFZ书是否一致
3. MSPPrincipal_ORGANIZATION_UNIT 基于部门单元的验证
1. 验证是否为相同的MSP;
2. 验证是否是有效的证书;
3. 验证组织部门信息是否匹配
- lscc特殊验证
- 验证输入参数的合法性
- 验证deploy和upgrade的结果读写集以及背书策略
5. 写区块和私密数据
CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData)主要实现写区块和私密数据功能。
// CommitWithPvtData commits blocks atomically with private data
func (lc *LedgerCommitter) CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error {
// Do validation and whatever needed before
// committing new block
if err := lc.preCommit(blockAndPvtData.Block); err != nil {
return err
}
// Committing new block
if err := lc.PeerLedgerSupport.CommitWithPvtData(blockAndPvtData); err != nil {
return err
}
return nil
}
该方法首先会调用lc.preCommit(blockAndPvtData.Block)方法对需要提交的区块数据进行预处理,如果是配置区块则执行lc.eventer(block),其实现为core/peer/peer.go createChain()方法中:其主要功能为从区块中解析出通道id,然后调用SetCurrConfigBlock()方法,设置本地map[string]*chain,更新该chain最新配置块。接着会调用kvLedger.CommitWithPvtData()方法提交区块到账本中。
c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})
// SetCurrConfigBlock sets the current config block of the specified channel
func SetCurrConfigBlock(block *common.Block, cid string) error {
chains.Lock()
defer chains.Unlock()
if c, ok := chains.list[cid]; ok {
c.cb = block
return nil
}
return errors.Errorf("[channel %s] channel not associated with this peer", cid)
}
kvLedger.CommitWithPvtData()为提交区块写入账本核心方法,在该流程中,会对交易执行MVCC检查,判断读数据读有效性、标记交易读有效性再更新账本。因此主要分为验证和准备数据以及提交账本数据两个步骤。
5.1 验证和准备数据
5.1.1 预处理构造内部区块
kvLedger.CommitWithPvtData()会调用l.txtmgmt.ValidateAndPrepare(),最终会调用preprocessProtoBlock()进行预处理操作。该方法会将common.Block预处理成internal.Block。internal.Block以及internal.Transaction数据结果如下:
type Block struct {
Num uint64
Txs []*Transaction
}
type Transaction struct {
IndexInBlock int
ID string
RWSet *rwsetutil.TxRwSet
ValidationCode peer.TxValidationCode
}
preprocessProtoBlock():
- 处理Endorser交易:只保留有效的 Endorser 交易;
- 处理配置交易:获取配置更新的模拟结果,放入读写集;
- 检查读写集是否符合数据库要求格式
5.1.2 执行MVCC检查与准备公有数据
对交易数据进行MVCC检查用于验证交易结果读写集的读集的key版本是否在该交易前是否改变、RangeQuery 的结果未变、私密数据的key的版本是否改变,并标记无效的交易,最后将有效交易的公共数据与私密数据写集合添加到数据更新批量操作中。
- ValidateAndPrepareBatch()
updates := internal.NewPubAndHashUpdates() // 创建公共数据和私密数据hash值批处理更新操作
for _, tx := range block.Txs { // 遍历区块所有交易
var validationCode peer.TxValidationCode
var err error
// 背书交易mvcc验证
if validationCode, err = v.validateEndorserTX(tx.RWSet, doMVCCValidation, updates); err != nil {
return nil, err
}
tx.ValidationCode = validationCode
// 检查交易的有效性
if validationCode == peer.TxValidationCode_VALID {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator", block.Num, tx.IndexInBlock, tx.ID)
committingTxHeight := version.NewHeight(block.Num, uint64(tx.IndexInBlock))
updates.ApplyWriteSet(tx.RWSet, committingTxHeight, v.db) // 更新写集合到PubAndHashUpdates结构中
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
block.Num, tx.IndexInBlock, tx.ID, validationCode.String())
}
}
MVCC校验
- 验证公共数据读集key
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *privacyenabledstate.PubUpdateBatch) (bool, error) {
if updates.Exists(ns, kvRead.Key) { // 查看更新批处理,如果存在,则标示该交易使用了同一个区块上一个交易读读集,无效
return false, nil
}
committedVersion, err := v.db.GetVersion(ns, kvRead.Key) // 查看状态数据库已提交的版本
if err != nil {
return false, err
}
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) { // 构造单个key读数据版本,并与已提交版本比较,不一致则返回false
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%#v], Version in readSet [%#v]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
其中版本数据结构
type Height struct {
BlockNum uint64
TxNum uint64
}
-- example
"key": "marblesp",
"version": {
"block_num": "5",
"tx_num": "0"
}
验证范围查询
针对公共数据范围查询的读集合进行验证,循环遍历每个范围查询对象,验证范围查询数据的读数据版本是否一致。- 验证私密数据读集key hash
遍历所有的collHashedRWSets,再遍历collHashedRWSet.HashedRwSet.HashedReads,验证每个kvReadHash版本是否一致(类似于key验证)
func (v *Validator) validateNsHashedReadSets(ns string, collHashedRWSets []*rwsetutil.CollHashedRwSet,
updates *privacyenabledstate.HashedUpdateBatch) (bool, error) {
for _, collHashedRWSet := range collHashedRWSets {
if valid, err := v.validateCollHashedReadSet(ns, collHashedRWSet.CollectionName, collHashedRWSet.HashedRwSet.HashedReads, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
func (v *Validator) validateCollHashedReadSet(ns, coll string, kvReadHashes []*kvrwset.KVReadHash,
updates *privacyenabledstate.HashedUpdateBatch) (bool, error) {
for _, kvReadHash := range kvReadHashes {
if valid, err := v.validateKVReadHash(ns, coll, kvReadHash, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
5.1.3 验证与准备私密数据
对私密数据hash值进行校验,再将更新操作写入添加到数据更新批量操作中。
func validatePvtdata(tx *internal.Transaction, pvtdata *ledger.TxPvtData) error {
if pvtdata.WriteSet == nil {
return nil
}
for _, nsPvtdata := range pvtdata.WriteSet.NsPvtRwset {
for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
collPvtdataHash := util.ComputeHash(collPvtdata.Rwset)
hashInPubdata := tx.RetrieveHash(nsPvtdata.Namespace, collPvtdata.CollectionName)
if !bytes.Equal(collPvtdataHash, hashInPubdata) {
return &validator.ErrPvtdataHashMissmatch{
Msg: fmt.Sprintf(`Hash of pvt data for collection [%s:%s] does not match with the corresponding hash in the public data.
public hash = [%#v], pvt data hash = [%#v]`, nsPvtdata.Namespace, collPvtdata.CollectionName, hashInPubdata, collPvtdataHash),
}
}
}
}
return nil
}
5.1.4 更新区块元数据
更新区块元数据交易验证码列表,本来更新了一次,参见上文,但是在MVCC验证中还存在验证不通过的情况,因此再次刷新交易验证码。
func postprocessProtoBlock(block *common.Block, validatedBlock *internal.Block) {
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for _, tx := range validatedBlock.Txs {
txsFilter.SetFlag(tx.IndexInBlock, tx.ValidationCode)
}
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
5.2 提交账本数据
提交账本数据包括以下步骤
1、将区块数据写入账本、更新私密数据库以及更新区块索引数据库
2、更新状态数据库
3、更新历史数据库
5.2.1 提交区块和私密数据
- 准备提交私密数据
CommitWithPvtData()会调用pvtdataStore.Prepare()接口对私密数据进行处理,再处理过程中,首先会将私密数据转化为storeEntries结构,再将storeEntries结构的三个字段分别转化成KV键值对形式,并放入批处理更新操作UpdateBatch中。最后s.db.WriteBatch(batch, true)进行更新私密数据库操作。
type storeEntries struct {
dataEntries []*dataEntry
expiryEntries []*expiryEntry
missingDataEntries map[missingDataKey]*bitset.BitSet
}
type UpdateBatch struct {
KVs map[string][]byte
}
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
return nil
}
levelBatch := &leveldb.Batch{}
for k, v := range batch.KVs {
// key为h.dbName+[]byte{0x00}+[]byte(k)
key := constructLevelKey(h.dbName, []byte(k))
if v == nil {
levelBatch.Delete(key)
} else {
levelBatch.Put(key, v)
}
}
if err := h.db.WriteBatch(levelBatch, sync); err != nil {
return err
}
return nil
}
- 提交区块数据
本质上是通过(mgr blockfileMgr) addBlock(block common.Block)将区块写入区块文件系统中,接着调用mgr.index.indexBlock(*blockIdxInfo)更新当前区块信息到区块索引数据库。最后执行mgr.updateCheckpoint(newCPInfo)更新检查点信息以及执行mgr.updateBlockchainInfo(blockHash, block)更新区块链信息。
type blockIdxInfo struct {
blockNum uint64
blockHash []byte
flp *fileLocPointer
txOffsets []*txindexInfo
metadata *common.BlockMetadata
}
- 确认提交私密数据操作
当提交区块到区块文件系统时报错,则私密数据写数据库执行回滚操作,如果没有问题,执行真正到确认提交操作。
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
s.pvtdataStore.Rollback()
return err
}
if writtenToPvtStore {
return s.pvtdataStore.Commit()
}
return nil
5.2.2 更新状态数据库
if err = l.txtmgmt.Commit(); err != nil {
panic(errors.WithMessage(err, "error during commit to txmgr"))
}
主要实现方法为l.txtmgmt.Commit()
- l.txtmgmt.Commit()
// 准备清理过期到私密数据
if !txmgr.pvtdataPurgeMgr.usedOnce {
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum())
txmgr.pvtdataPurgeMgr.usedOnce = true
}
defer func() {
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.current.blockNum() + 1)
logger.Debugf("launched the background routine for preparing keys to purge with the next block")
txmgr.reset()
}()
// 更新私密数据生命周期记录数据库,这里记录了每个私密键值的存活期限
if err := txmgr.pvtdataPurgeMgr.DeleteExpiredAndUpdateBookkeeping(
txmgr.current.batch.PvtUpdates, txmgr.current.batch.HashUpdates); err != nil {
return err
}
// 更新状态数据库里面的公共数据和私密数据
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
txmgr.commitRWLock.Unlock()
return err
}
5.2.3 更新历史数据库
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}
主要实现方法为l.historyDB.Commit(block)
- l.historyDB.Commit(block)
5.2.4 清理工作
if len(blockAndPvtData.PvtData) > 0 {
// Finally, purge all transactions in block - valid or not valid.
if err := c.PurgeByTxids(privateInfo.txns); err != nil {
logger.Error("Purging transactions", privateInfo.txns, "failed:", err)
}
}
seq := block.Header.Number
if seq%c.transientBlockRetention == 0 && seq > c.transientBlockRetention {
err := c.PurgeByHeight(seq - c.transientBlockRetention)
if err != nil {
logger.Error("Failed purging data from transient store at block", seq, ":", err)
}
}
PurgeByTxids从瞬态存储中删除给定交易的私有读写集,PurgeByHeight会删除小于给定maxBlockNumToRetain的块高度处的私有读写集。
6. 附录
blkrouter提供的一个区块信息
{
"data":{
"data":[
{
"payload":{
"data":{
"actions":[
{
"header":{
"creator":{
"id_bytes":"LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNLekNDQWRHZ0F3SUJBZ0lSQVB1TWdsMkJZbS9WMEhCVW1NMFRibVF3Q2dZSUtvWkl6ajBFQXdJd2N6RUwKTUFrR0ExVUVCaE1DVlZNeEV6QVJCZ05WQkFnVENrTmhiR2xtYjNKdWFXRXhGakFVQmdOVkJBY1REVk5oYmlCRwpjbUZ1WTJselkyOHhHVEFYQmdOVkJBb1RFRzl5WnpJdVpYaGhiWEJzWlM1amIyMHhIREFhQmdOVkJBTVRFMk5oCkxtOXlaekl1WlhoaGJYQnNaUzVqYjIwd0hoY05NVGt4TWpNd01ETXhOVEF3V2hjTk1qa3hNakkzTURNeE5UQXcKV2pCc01Rc3dDUVlEVlFRR0V3SlZVekVUTUJFR0ExVUVDQk1LUTJGc2FXWnZjbTVwWVRFV01CUUdBMVVFQnhNTgpVMkZ1SUVaeVlXNWphWE5qYnpFUE1BMEdBMVVFQ3hNR1kyeHBaVzUwTVI4d0hRWURWUVFEREJaQlpHMXBia0J2CmNtY3lMbVY0WVcxd2JHVXVZMjl0TUZrd0V3WUhLb1pJemowQ0FRWUlLb1pJemowREFRY0RRZ0FFT2EzS3hBVVAKQzJIWUlrTlB6akpKbDViY21CbEt6cnVFT2h3VmViWVBYcVdNMFVJRlR0all3U09XNGpiTDNDWUVuSzVBNGJNZwpOcHROTjEvWlJ2elRxNk5OTUVzd0RnWURWUjBQQVFIL0JBUURBZ2VBTUF3R0ExVWRFd0VCL3dRQ01BQXdLd1lEClZSMGpCQ1F3SW9BZyt6QTM3SnhCYzZVbXJtTk5UTG1nb09RRFFaMnFOSVNJQWpxMmp5MUJKSlF3Q2dZSUtvWkkKemowRUF3SURTQUF3UlFJaEFNMW42SDlFY01qb09hYkQ3WVJwQXUwOWp4NWcrMEd2c05qNmFZTElWQ2FUQWlBcApQYlJ6MFo1NFo1a0NDOHhpS0t3d3BNK05jNjhPanRBSWRsQmRmZkM2QlE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==",
"mspid":"Org2MSP"
},
"nonce":"d82/MXTjQoG1RPdyuPxM16TjThX1bJks"
},
"payload":{
"action":{
"endorsements":[
{
"endorser":"CgdPcmcxTVNQEqoGLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNLRENDQWMrZ0F3SUJBZ0lSQUpNRFJ4TG5FbUhSVEVKZXowcTVjT293Q2dZSUtvWkl6ajBFQXdJd2N6RUwKTUFrR0ExVUVCaE1DVlZNeEV6QVJCZ05WQkFnVENrTmhiR2xtYjNKdWFXRXhGakFVQmdOVkJBY1REVk5oYmlCRwpjbUZ1WTJselkyOHhHVEFYQmdOVkJBb1RFRzl5WnpFdVpYaGhiWEJzWlM1amIyMHhIREFhQmdOVkJBTVRFMk5oCkxtOXlaekV1WlhoaGJYQnNaUzVqYjIwd0hoY05NVGt4TWpNd01ETXhOVEF3V2hjTk1qa3hNakkzTURNeE5UQXcKV2pCcU1Rc3dDUVlEVlFRR0V3SlZVekVUTUJFR0ExVUVDQk1LUTJGc2FXWnZjbTVwWVRFV01CUUdBMVVFQnhNTgpVMkZ1SUVaeVlXNWphWE5qYnpFTk1Bc0dBMVVFQ3hNRWNHVmxjakVmTUIwR0ExVUVBeE1XY0dWbGNqQXViM0puCk1TNWxlR0Z0Y0d4bExtTnZiVEJaTUJNR0J5cUdTTTQ5QWdFR0NDcUdTTTQ5QXdFSEEwSUFCR3pYU2pSUUMxZGYKNGFlMHAvSloxNjBPamY2VmZiVHh6RlFOdklSdndKTS9ETnB2UG9qTkVNRGF1V2JPRkFhUjcxK2FMQnhZRkpLbAp0aVVhRGJFcFJ4S2pUVEJMTUE0R0ExVWREd0VCL3dRRUF3SUhnREFNQmdOVkhSTUJBZjhFQWpBQU1Dc0dBMVVkCkl3UWtNQ0tBSUlMaXJ6YzlhdlJ4dW96c3VLSFU2TmJsLzVROGN3alBoTmtxb0QzSTRmc1dNQW9HQ0NxR1NNNDkKQkFNQ0EwY0FNRVFDSUJKcmhNNmZSMXVod3VYbnJPeFVHSXNlVFBoSDZlY0lHbXhGcGRIM2ZhQmxBaUJ5MC9ydQp2NmliMWdqWjdVUzJOdi9tL2dySENCc0gwSEU4Mk5KSm12bnE4dz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
"signature":"MEQCIAzbyxlzFDyEy3y26mqFpQjUfUO+Bsn6nBYxKY2yMvs9AiAObJZgBGuc7LjQcX1o8QArdmLM90XMOJ5t9Id6bYFnDg=="
},
{
"endorser":"CgdPcmcyTVNQEqYGLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNKekNDQWM2Z0F3SUJBZ0lRU3FDL1p5U0lyalNkOW9mL2FlNVNsekFLQmdncWhrak9QUVFEQWpCek1Rc3cKQ1FZRFZRUUdFd0pWVXpFVE1CRUdBMVVFQ0JNS1EyRnNhV1p2Y201cFlURVdNQlFHQTFVRUJ4TU5VMkZ1SUVaeQpZVzVqYVhOamJ6RVpNQmNHQTFVRUNoTVFiM0puTWk1bGVHRnRjR3hsTG1OdmJURWNNQm9HQTFVRUF4TVRZMkV1CmIzSm5NaTVsZUdGdGNHeGxMbU52YlRBZUZ3MHhPVEV5TXpBd016RTFNREJhRncweU9URXlNamN3TXpFMU1EQmEKTUdveEN6QUpCZ05WQkFZVEFsVlRNUk13RVFZRFZRUUlFd3BEWVd4cFptOXlibWxoTVJZd0ZBWURWUVFIRXcxVApZVzRnUm5KaGJtTnBjMk52TVEwd0N3WURWUVFMRXdSd1pXVnlNUjh3SFFZRFZRUURFeFp3WldWeU1DNXZjbWN5CkxtVjRZVzF3YkdVdVkyOXRNRmt3RXdZSEtvWkl6ajBDQVFZSUtvWkl6ajBEQVFjRFFnQUVoZ2tsWlVZbFZKN08KLzlIUXBZSXcvaTdodVBOTU95ejdpT0dzaWFLYTg0K3lyOHo2TzBFdk53Q1p5MjFNOEVENnVUWDdCeHFRL3NDRgo1Z2x5QlgvTG02Tk5NRXN3RGdZRFZSMFBBUUgvQkFRREFnZUFNQXdHQTFVZEV3RUIvd1FDTUFBd0t3WURWUjBqCkJDUXdJb0FnK3pBMzdKeEJjNlVtcm1OTlRMbWdvT1FEUVoycU5JU0lBanEyankxQkpKUXdDZ1lJS29aSXpqMEUKQXdJRFJ3QXdSQUlnUk5FMEZQUTdmM243dWswRUUzQmlEbVE4c1BwdDVNV0taWWlUclJlRkdud0NJRExkVGxXMQptbU5SdkVkdGpIM0xiR0h3UGZndk9vRlBkTzBQU2FOU2haQnEKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=",
"signature":"MEQCIAOSCpKv3DWb0eWSxwzIt4Y0D9U2dwpgaHDmO6jfyUO4AiApnBZi2kn+z/B0/2S8IuoAJIYGJp+8zG8qwxHKm2/ypQ=="
}
],
"proposal_response_payload":{
"extension":{
"chaincode_id":{
"name":"mycc",
"path":"",
"version":"1.0"
},
"events":null,
"response":{
"message":"",
"payload":null,
"status":200
},
"results":{
"data_model":"KV",
"ns_rwset":[
{
"collection_hashed_rwset":[
],
"namespace":"lscc",
"rwset":{
"metadata_writes":[
],
"range_queries_info":[
],
"reads":[
{
"key":"mycc",
"version":{
"block_num":"3",
"tx_num":"0"
}
}
],
"writes":[
]
}
},
{
"collection_hashed_rwset":[
],
"namespace":"mycc",
"rwset":{
"metadata_writes":[
],
"range_queries_info":[
],
"reads":[
{
"key":"a",
"version":{
"block_num":"3",
"tx_num":"0"
}
},
{
"key":"b",
"version":{
"block_num":"3",
"tx_num":"0"
}
}
],
"writes":[
{
"is_delete":false,
"key":"a",
"value":"OTA="
},
{
"is_delete":false,
"key":"b",
"value":"MjEw"
}
]
}
}
]
},
"token_expectation":null
},
"proposal_hash":"VstSrCFTRwBOoJodpbhtQsJUIFDz5UYKZPq34BmY+lg="
}
},
"chaincode_proposal_payload":{
"TransientMap":{
},
"input":{
"chaincode_spec":{
"chaincode_id":{
"name":"mycc",
"path":"",
"version":""
},
"input":{
"args":[
"aW52b2tl",
"YQ==",
"Yg==",
"MTA="
],
"decorations":{
}
},
"timeout":0,
"type":"GOLANG"
}
}
}
}
}
]
},
"header":{
"channel_header":{
"channel_id":"mychannel",
"epoch":"0",
"extension":"EgYSBG15Y2M=",
"timestamp":"2019-12-30T03:21:19.734584800Z",
"tls_cert_hash":null,
"tx_id":"13eafcea37a6adfdfd2ac6522b35f32697a0334f8c8a74d11df73bbb9f9dc5b5",
"type":3,
"version":0
},
"signature_header":{
"creator":{
"id_bytes":"LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNLekNDQWRHZ0F3SUJBZ0lSQVB1TWdsMkJZbS9WMEhCVW1NMFRibVF3Q2dZSUtvWkl6ajBFQXdJd2N6RUwKTUFrR0ExVUVCaE1DVlZNeEV6QVJCZ05WQkFnVENrTmhiR2xtYjNKdWFXRXhGakFVQmdOVkJBY1REVk5oYmlCRwpjbUZ1WTJselkyOHhHVEFYQmdOVkJBb1RFRzl5WnpJdVpYaGhiWEJzWlM1amIyMHhIREFhQmdOVkJBTVRFMk5oCkxtOXlaekl1WlhoaGJYQnNaUzVqYjIwd0hoY05NVGt4TWpNd01ETXhOVEF3V2hjTk1qa3hNakkzTURNeE5UQXcKV2pCc01Rc3dDUVlEVlFRR0V3SlZVekVUTUJFR0ExVUVDQk1LUTJGc2FXWnZjbTVwWVRFV01CUUdBMVVFQnhNTgpVMkZ1SUVaeVlXNWphWE5qYnpFUE1BMEdBMVVFQ3hNR1kyeHBaVzUwTVI4d0hRWURWUVFEREJaQlpHMXBia0J2CmNtY3lMbVY0WVcxd2JHVXVZMjl0TUZrd0V3WUhLb1pJemowQ0FRWUlLb1pJemowREFRY0RRZ0FFT2EzS3hBVVAKQzJIWUlrTlB6akpKbDViY21CbEt6cnVFT2h3VmViWVBYcVdNMFVJRlR0all3U09XNGpiTDNDWUVuSzVBNGJNZwpOcHROTjEvWlJ2elRxNk5OTUVzd0RnWURWUjBQQVFIL0JBUURBZ2VBTUF3R0ExVWRFd0VCL3dRQ01BQXdLd1lEClZSMGpCQ1F3SW9BZyt6QTM3SnhCYzZVbXJtTk5UTG1nb09RRFFaMnFOSVNJQWpxMmp5MUJKSlF3Q2dZSUtvWkkKemowRUF3SURTQUF3UlFJaEFNMW42SDlFY01qb09hYkQ3WVJwQXUwOWp4NWcrMEd2c05qNmFZTElWQ2FUQWlBcApQYlJ6MFo1NFo1a0NDOHhpS0t3d3BNK05jNjhPanRBSWRsQmRmZkM2QlE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==",
"mspid":"Org2MSP"
},
"nonce":"d82/MXTjQoG1RPdyuPxM16TjThX1bJks"
}
}
},
"signature":"MEUCIQDXH7HH1+++Fw9Y/MLRHj4smpxBJpMlM8ZuIGAHK0kmXgIgaLFa9R8ajOnZUZDTGmLpxTs4sVwOiyjD5BZJB6JLBBY="
}
]
},
"header":{
"data_hash":"VN26ozBNLgcSnB16dBhtCRjW0MOYD1sLNCGBOBg9da0=",
"number":"4",
"previous_hash":"HyT2nn+22vfSmZILRLspLimV9ENLempiKRfdAhl0/q4="
},
"metadata":{
"metadata":[
"CgQKAggCEv0GCrIGCpUGCgpPcmRlcmVyTVNQEoYGLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNEVENDQWJPZ0F3SUJBZ0lSQUsvbnFuTHJYbTN5ODJvaWdHQUpKWlF3Q2dZSUtvWkl6ajBFQXdJd2FURUwKTUFrR0ExVUVCaE1DVlZNeEV6QVJCZ05WQkFnVENrTmhiR2xtYjNKdWFXRXhGakFVQmdOVkJBY1REVk5oYmlCRwpjbUZ1WTJselkyOHhGREFTQmdOVkJBb1RDMlY0WVcxd2JHVXVZMjl0TVJjd0ZRWURWUVFERXc1allTNWxlR0Z0CmNHeGxMbU52YlRBZUZ3MHhPVEV5TXpBd016RTFNREJhRncweU9URXlNamN3TXpFMU1EQmFNRmd4Q3pBSkJnTlYKQkFZVEFsVlRNUk13RVFZRFZRUUlFd3BEWVd4cFptOXlibWxoTVJZd0ZBWURWUVFIRXcxVFlXNGdSbkpoYm1OcApjMk52TVJ3d0dnWURWUVFERXhOdmNtUmxjbVZ5TG1WNFlXMXdiR1V1WTI5dE1Ga3dFd1lIS29aSXpqMENBUVlJCktvWkl6ajBEQVFjRFFnQUVyVHJiMjNjTXAzMlExTDV6UXR3d29lQk1Ia1lLOGN6bVdya2lFZUhveWVWNjM4aWkKQ3JEUGt4U1BoMDR3Z3RXOTV5d3oxT1hDSG5DYWw2VThoWm1odGFOTk1Fc3dEZ1lEVlIwUEFRSC9CQVFEQWdlQQpNQXdHQTFVZEV3RUIvd1FDTUFBd0t3WURWUjBqQkNRd0lvQWdYZXZxK3lld2p4dUhEWk10eVZEckNQMXNlTmxjCk0wSmFzSE5BZ3JBcUQvUXdDZ1lJS29aSXpqMEVBd0lEU0FBd1JRSWhBSVBDQjdJNThrZzJJNkJiaHVpU3FHbkYKVjFRZC9wZ2RGT1JiWUU3MSt3cGNBaUFTejhMdWpzU1l3d0FLb2lRRmF4a0dQNTJmOTBhTGtnTFdKRk1UMWs1eApGUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KEhj98u3PfmVFt5+7jIBwQeOJsXhb280QIQ8SRjBEAiAl5Q7dLotTv2/kmn3JXubtdJU52Ti4WJKynmNPgIpEpQIgeb499fxau3mYtPtMiwrsnbJxpSFqogz1zdDIHiZmcOg=",
"CgIIAg==",
"AA==",
""
]
}
}
加载全部内容