整体框架
所有的交易在节点间同步到每个节点的“Message Pool”中。经过“Expected Consensus”共识机制,当选为Leader的一个或者多个节点从“Message Pool”中挑选Message,并打包。被打包的区块,会同步给其他节点。打包的区块中的交易Message会被Filecoin虚拟机执行,更新各个Actor的状态。所有的区块数据,Actor的状态是通过IPFS/IPLD进行存储。
挖矿
// Mine implements the DefaultWorkers main mining function..
// The returned bool indicates if this miner created a new block or not.
func (w *DefaultWorker) Mine(ctx context.Context, base types.TipSet, nullBlkCount int, outCh chan<- Output) bool {
log.Info("Worker.Mine")
ctx = log.Start(ctx, "Worker.Mine")
defer log.Finish(ctx)
if len(base) == 0 {
log.Warning("Worker.Mine returning because it can't mine on an empty tipset")
outCh <- Output{Err: errors.New("bad input tipset with no blocks sent to Mine()")}
return false
}
st, err := w.getStateTree(ctx, base)
if err != nil {
log.Errorf("Worker.Mine couldn't get state tree for tipset: %s", err.Error())
outCh <- Output{Err: err}
return false
}
log.Debugf("Mining on tipset: %s, with %d null blocks.", base.String(), nullBlkCount)
if ctx.Err() != nil {
log.Warningf("Worker.Mine returning with ctx error %s", ctx.Err().Error())
return false
}
//生成随机挑战数,用上一个Tipset的最小tickets和空块数进行HASH
challenge, err := consensus.CreateChallengeSeed(base, uint64(nullBlkCount))
if err != nil {
outCh <- Output{Err: err}
return false
}
//根据随机挑战数生成时空证明
prCh := createProof(challenge, w.createPoSTFunc)
var proof proofs.PoStProof
var ticket []byte
select {
case <-ctx.Done():
log.Infof("Mining run on base %s with %d null blocks canceled.", base.String(), nullBlkCount)
return false
case prChRead, more := <-prCh:
if !more {
log.Errorf("Worker.Mine got zero value from channel prChRead")
return false
}
copy(proof[:], prChRead[:])
//时空证明成功,生成ticket
ticket, err = consensus.CreateTicket(proof, w.minerPubKey, w.workerSigner)
if err != nil {
log.Errorf("failed to create ticket: %s", err)
return false
}
}
// TODO: Test the interplay of isWinningTicket() and createPoSTFunc()
// https://github.com/filecoin-project/go-filecoin/issues/1791
//判断是否是中奖区块
weHaveAWinner, err := consensus.IsWinningTicket(ctx, w.blockstore, w.powerTable, st, ticket, w.minerAddr)
if err != nil {
log.Errorf("Worker.Mine couldn't compute ticket: %s", err.Error())
outCh <- Output{Err: err}
return false
}
if weHaveAWinner {
//打包生成区块
next, err := w.Generate(ctx, base, ticket, proof, uint64(nullBlkCount))
if err == nil {
log.SetTag(ctx, "block", next)
log.Debugf("Worker.Mine generates new winning block! %s", next.Cid().String())
}
outCh <- NewOutput(next, err)
return true
}
return false
}
生成时空证明
// TODO: Actually use the results of the PoST once it is implemented.
// Currently createProof just passes the challenge seed through.
func createProof(challengeSeed proofs.PoStChallengeSeed, createPoST DoSomeWorkFunc) <-chan proofs.PoStChallengeSeed {
c := make(chan proofs.PoStChallengeSeed)
go func() {
// TODO send new PoST on channel once we can create it
// https://github.com/filecoin-project/go-filecoin/issues/1791
createPoST()
c <- challengeSeed
}()
return c
}
处理挖矿进程输出的block
func (node *Node) handleNewMiningOutput(miningOutCh <-chan mining.Output) {
defer func() {
node.miningDoneWg.Done()
}()
for {
select {
case <-node.miningCtx.Done():
return
case output, ok := <-miningOutCh:
if !ok {
return
}
if output.Err != nil {
log.Errorf("stopping mining. error: %s", output.Err.Error())
node.StopMining(context.Background())
} else {
node.miningDoneWg.Add(1)
go func() {
if node.IsMining() {
node.AddNewlyMinedBlock(node.miningCtx, output.NewBlock)
}
node.miningDoneWg.Done()
}()
}
}
}
}
共识机制
Filecoin的共识算法叫Expected Consensus,简称EC共识机制。Expected Consensus实现的相关代码在consensus目录。
// Expected implements expected consensus.
type Expected struct {
// PwrTableView provides miner and total power for the EC chain weight
// computation.
PwrTableView PowerTableView
// cstore is used for loading state trees during message running.
cstore *hamt.CborIpldStore
// bstore contains data referenced by actors within the state
// during message running. Additionally bstore is used for
// accessing the power table.
bstore blockstore.Blockstore
// processor is what we use to process messages and pay rewards
processor Processor
genesisCid cid.Cid
verifier proofs.Verifier
}
除了区块链数据外,Expected Consensus每一轮会生成一个Ticket,每个节点通过一定的计算,确定是否是该轮的Leader。如果选为Leader,节点可以打包区块。也就是说,每一轮可能没有Leader(所有节点都不符合Leader的条件),或者多个Leader(有多个节点符合Leader)。Filecoin使用TipSet的概念,表明一轮中多个Leader产生的指向同一个父亲区块的区块集合。每个矿工在一轮round 中只能提交一个block。
// CreateTicket computes a valid ticket.
// params: proof []byte, the proof to sign
// signerPubKey []byte, the public key for the signer. Must exist in the signer
// signer, implements TicketSigner interface. Must have signerPubKey in its keyinfo.
// returns: types.Signature ( []byte ), error
func CreateTicket(proof proofs.PoStProof, signerPubKey []byte, signer TicketSigner) (types.Signature, error) {
var ticket types.Signature
signerAddr, err := signer.GetAddressForPubKey(signerPubKey)
if err != nil {
return ticket, errors.Wrap(err, "could not get address for signerPubKey")
}
buf := append(proof[:], signerAddr.Bytes()...)
// Don't hash it here; it gets hashed in walletutil.Sign
return signer.SignBytes(buf[:], signerAddr)
}
Leader的选择:
在每个Ticket生成的基础上,进行Leader的选择,具体查看consensus/expected.go中的IsWinningTicket函数。也就是说,如果Ticket的数值小于当前节点的有效存储的比例,该节点在该轮就是Leader。
// IsWinningTicket fetches miner power & total power, returns true if it's a winning ticket, false if not,
// errors out if minerPower or totalPower can't be found.
// See https://github.com/filecoin-project/aq/issues/70 for an explanation of the math here.
func IsWinningTicket(ctx context.Context, bs blockstore.Blockstore, ptv PowerTableView, st state.Tree,
ticket types.Signature, miner address.Address) (bool, error) {
totalPower, err := ptv.Total(ctx, st, bs)
if err != nil {
return false, errors.Wrap(err, "Couldn't get totalPower")
}
minerPower, err := ptv.Miner(ctx, st, bs, miner)
if err != nil {
return false, errors.Wrap(err, "Couldn't get minerPower")
}
return CompareTicketPower(ticket, minerPower, totalPower), nil
}
时空证明
矿工向链上CommitSector,然后会为这些数据提供存储证明。
// CommitSector adds a commitment to the specified sector. The sector must not
// already be committed.
func (ma *Actor) CommitSector(ctx exec.VMContext, sectorID uint64, commD, commR, commRStar, proof []byte) (uint8, error) {
if err := ctx.Charge(actor.DefaultGasCost); err != nil {
return exec.ErrInsufficientGas, errors.RevertErrorWrap(err, "Insufficient gas")
}
…………
req := proofs.VerifySealRequest{}
copy(req.CommD[:], commD)
copy(req.CommR[:], commR)
copy(req.CommRStar[:], commRStar)
copy(req.Proof[:], proof)
req.ProverID = sectorbuilder.AddressToProverID(ctx.Message().To)
req.SectorID = sectorbuilder.SectorIDToBytes(sectorID)
req.StoreType = sectorStoreType
}
// TODO: use uint64 instead of this abomination, once refmt is fixed
// https://github.com/polydawn/refmt/issues/35
sectorIDstr := strconv.FormatUint(sectorID, 10)
return 0, nil
}
生成时空证明
每个proving period,矿工调用generatepost,产生一个紧凑的时空证明,然后通过submitpost提交给链上。
// generatePoSt creates the required PoSt, given a list of sector ids and
// matching seeds. It returns the Snark Proof for the PoSt, and a list of
// sectors that faulted, if there were any faults.
func (sm *Miner) generatePoSt(commRs []proofs.CommR, seed proofs.PoStChallengeSeed) ([]proofs.PoStProof, []uint64, error) {
req := sectorbuilder.GeneratePoStRequest{
CommRs: commRs,
ChallengeSeed: seed,
}
res, err := sm.node.SectorBuilder().GeneratePoSt(req)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to generate PoSt")
}
return res.Proofs, res.Faults, nil
}
提交时空证明
证明期是一个时间窗口,在该窗口中,矿工必须生成时空证明。如果矿工在验证期间未收到区块中包含的“submitPost”消息,则会受到处罚。有两种不同的时间提交。标准提交:标准提交是指在验证期结束前将其链接起来的提交。合理设置所需的时间长度,以便在验证期的实际结束之间有一个宽限期,从而最小化网络拥塞对典型矿工操作的影响。惩罚性提交:惩罚性提交是指在证明期结束后,但在生成攻击阈值之前的链上提交,视为有效的后提交,但矿工必须因为迟提交而支付罚款。
func (sm *Miner) submitPoSt(start, end *types.BlockHeight, seed proofs.PoStChallengeSeed, inputs []generatePostInput) {
commRs := make([]proofs.CommR, len(inputs))
for i, input := range inputs {
commRs[i] = input.commR
}
proofs, faults, err := sm.generatePoSt(commRs, seed)
if err != nil {
log.Errorf("failed to generate PoSts: %s", err)
return
}
if len(faults) != 0 {
log.Warningf("some faults when generating PoSt: %v", faults)
// TODO: proper fault handling
}
height, err := sm.node.BlockHeight()
if err != nil {
log.Errorf("failed to submit PoSt, as the current block height can not be determined: %s", err)
// TODO: what should happen in this case?
return
}
if height.LessThan(start) {
// TODO: what to do here? not sure this can happen, maybe through reordering?
log.Errorf("PoSt generation time took negative block time: %s < %s", height, start)
return
}
if height.GreaterEqual(end) {
// TODO: we are too late, figure out faults and decide if we want to still submit
log.Errorf("PoSt generation was too slow height=%s end=%s", height, end)
return
}
// TODO: figure out a more sensible timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// TODO: algorithmically determine appropriate values for these
gasPrice := types.NewGasPrice(submitPostGasPrice)
gasLimit := types.NewGasUnits(submitPostGasLimit)
_, err = sm.porcelainAPI.MessageSend(ctx, sm.minerOwnerAddr, sm.minerAddr, types.ZeroAttoFIL, gasPrice, gasLimit, "submitPoSt", proofs)
if err != nil {
log.Errorf("failed to submit PoSt: %s", err)
return
}
log.Debug("submitted PoSt")
}
区块同步
接收区块处理进程
func (node *Node) processBlock(ctx context.Context, pubSubMsg pubsub.Message) (err error) {
// ignore messages from ourself
if pubSubMsg.GetFrom() == node.Host().ID() {
return nil
}
blk, err := types.DecodeBlock(pubSubMsg.GetData())
if err != nil {
return errors.Wrap(err, "got bad block data")
}
log.Infof("Received new block from network cid: %s", blk.Cid().String())
log.Debugf("Received new block from network: %s", blk)
err = node.Syncer.HandleNewBlocks(ctx, []cid.Cid{blk.Cid()})
if err != nil {
return errors.Wrap(err, "processing block from network")
}
return nil
}
CollectChain是代码的入口点与网络的交互,它不会向主链添加tipsets。解析head tipset 的cids,直到它解析存储中包含parent tipset。返回新的未完全验证的tipset链和parent tipset的id,同步到存储区中。
func (syncer *DefaultSyncer) collectChain(ctx context.Context, blkCids []cid.Cid) ([]types.TipSet, error) {
var chain []types.TipSet
defer logSyncer.Info("chain synced")
for {
var blks []*types.Block
// check the cache for bad tipsets before doing anything
tsKey := types.NewSortedCidSet(blkCids...).String()
// Finish traversal if the tipset made is tracked in the store.
if syncer.chainStore.HasTipSetAndState(ctx, tsKey) {
return chain, nil
}
logSyncer.Debugf("CollectChain next link: %s", tsKey)
if syncer.badTipSets.Has(tsKey) {
return nil, ErrChainHasBadTipSet
}
blks, err := syncer.getBlksMaybeFromNet(ctx, blkCids)
if err != nil {
return nil, err
}
ts, err := syncer.consensus.NewValidTipSet(ctx, blks)
if err != nil {
syncer.badTipSets.Add(tsKey)
syncer.badTipSets.AddChain(chain)
return nil, err
}
height, _ := ts.Height()
if len(chain)%500 == 0 {
logSyncer.Infof("syncing the chain, currently at block height %d", height)
}
// Update values to traverse next tipset
chain = append([]types.TipSet{ts}, chain...)
parentCidSet, err := ts.Parents()
if err != nil {
return nil, err
}
blkCids = parentCidSet.ToSlice()
}
}
func (c *Expected) validateBlockStructure(ctx context.Context, b *types.Block) error {
// TODO: validate signature on block
ctx = log.Start(ctx, "Expected.validateBlockStructure")
log.LogKV(ctx, "ValidateBlockStructure", b.Cid().String())
if !b.StateRoot.Defined() {
return fmt.Errorf("block has nil StateRoot")
}
return nil
}
SyncOne计算tipset的父状态,调用EC共识以运行状态转换以验证tipset。如果输入tipset有效,调用共识来检查其权重,如果这个tipset的权重是最重的,就更新区块头。Filecoin采用EC共识机制,通过TipSet的Weight确定主链。
// syncOne syncs a single tipset with the chain store. syncOne calculates the
// parent state of the tipset and calls into consensus to run a state transition
// in order to validate the tipset. In the case the input tipset is valid,
// syncOne calls into consensus to check its weight, and then updates the head
// of the store if this tipset is the heaviest.
//
// Precondition: the caller of syncOne must hold the syncer's lock (syncer.mu) to
// ensure head is not modified by another goroutine during run.
func (syncer *DefaultSyncer) syncOne(ctx context.Context, parent, next types.TipSet) error {
// Lookup parent state. It is guaranteed by the syncer that it is in
// the chainStore.
……
//验证tipset状态,给store添加新的状态
err = syncer.chainStore.PutTipSetAndState(ctx, &TipSetAndState{
TipSet: next,
TipSetStateRoot: root,
})
if err != nil {
return err
}
logSyncer.Debugf("Successfully updated store with %s", next.String())
// TipSet is validated and added to store, now check if it is the heaviest.
// If it is the heaviest update the chainStore.
nextParentSt, err := syncer.tipSetState(ctx, parent.String()) // call again to get a copy
if err != nil {
return err
}
headParentCids, err := syncer.chainStore.Head().Parents()
if err != nil {
return err
}
var headParentSt state.Tree
if headParentCids.Len() != 0 { // head is not genesis
headParentSt, err = syncer.tipSetState(ctx, headParentCids.String())
if err != nil {
return err
}
}
heavier, err := syncer.consensus.IsHeavier(ctx, next, syncer.chainStore.Head(), nextParentSt, headParentSt)
if err != nil {
return err
}
if heavier {
// Gather the entire new chain for reorg comparison.
// See Issue #2151 for making this scalable.
newChain, err := CollectTipSetsOfHeightAtLeast(ctx, syncer.chainStore.BlockHistory(ctx, parent), types.NewBlockHeight(uint64(0)))
if err != nil {
return err
}
newChain = append(newChain, next)
if IsReorg(syncer.chainStore.Head(), newChain) {
logSyncer.Infof("reorg occurring while switching from %s to %s", syncer.chainStore.Head().String(), next.String())
}
if err = syncer.chainStore.SetHead(ctx, next); err != nil {
return err
}
}
return nil
}
区块验证
// validateMining checks validity of the block ticket, proof, and miner address.
// Returns an error if:
// * any tipset's block was mined by an invalid miner address.
// * the block proof is invalid for the challenge
// * the block ticket fails the power check, i.e. is not a winning ticket
// Returns nil if all the above checks pass.
// See https://github.com/filecoin-project/specs/blob/master/mining.md#chain-validation
func (c *Expected) validateMining(ctx context.Context, st state.Tree, ts types.TipSet, parentTs types.TipSet) error {
for _, blk := range ts.ToSlice() {
// TODO: Also need to validate BlockSig
// TODO: Once we've picked a delay function (see #2119), we need to
// verify its proof here. The proof will likely be written to a field on
// the mined block.
// See https://github.com/filecoin-project/specs/blob/master/mining.md#ticket-checking
result, err := IsWinningTicket(ctx, c.bstore, c.PwrTableView, st, blk.Ticket, blk.Miner)
if err != nil {
return errors.Wrap(err, "can't check for winning ticket")
}
if !result {
return errors.New("not a winning ticket")
}
}
return nil
}
每个区块的Weight的计算公式:
Weight = ParentWeight + ECV + ECPrM * ratio
目前,ECV设置为10, ECPrM设置为100,Ratio是当前节点的存储有效率(节点存储的容量/所有节点的存储容量)。在目前的算法下节点的ratio高,Weight就高。
一个TipSet的Weight等于TipSet中所有区块的Weight的总和。Weight大的TipSet认为是主链。当两个TipSet的Weight一样大的时候,取Ticket较小者。
// Weight returns the EC weight of this TipSet in uint64 encoded fixed point
// representation.
func (c *Expected) Weight(ctx context.Context, ts types.TipSet, pSt state.Tree) (uint64, error) {
ctx = log.Start(ctx, "Expected.Weight")
log.LogKV(ctx, "Weight", ts.String())
if len(ts) == 1 && ts.ToSlice()[0].Cid().Equals(c.genesisCid) {
return uint64(0), nil
}
// Compute parent weight.
parentW, err := ts.ParentWeight()
if err != nil {
return uint64(0), err
}
w, err := types.FixedToBig(parentW)
if err != nil {
return uint64(0), err
}
// Each block in the tipset adds ECV + ECPrm * miner_power to parent weight.
totalBytes, err := c.PwrTableView.Total(ctx, pSt, c.bstore)
if err != nil {
return uint64(0), err
}
floatTotalBytes := new(big.Float).SetInt64(int64(totalBytes))
floatECV := new(big.Float).SetInt64(int64(ECV))
floatECPrM := new(big.Float).SetInt64(int64(ECPrM))
for _, blk := range ts.ToSlice() {
minerBytes, err := c.PwrTableView.Miner(ctx, pSt, c.bstore, blk.Miner)
if err != nil {
return uint64(0), err
}
floatOwnBytes := new(big.Float).SetInt64(int64(minerBytes))
wBlk := new(big.Float)
wBlk.Quo(floatOwnBytes, floatTotalBytes)
wBlk.Mul(wBlk, floatECPrM) // Power addition
wBlk.Add(wBlk, floatECV) // Constant addition
w.Add(w, wBlk)
}
return types.BigToFixed(w)
}
存储协议流程
Filecoin实现三个上层协议:Hello,Storage(存储协议)以及Retrieval(检索协议)。
Miner是存储矿工,通过createMiner创建Miner Actor。
// CreateMiner creates a new miner with the a pledge of the given amount of sectors. The
// miners collateral is set by the value in the message.
func (sma *Actor) CreateMiner(vmctx exec.VMContext, pledge *big.Int, publicKey []byte, pid peer.ID) (address.Address, uint8, error) {
if err := vmctx.Charge(actor.DefaultGasCost); err != nil {
return address.Undef, exec.ErrInsufficientGas, errors.RevertErrorWrap(err, "Insufficient gas")
}
var state State
ret, err := actor.WithState(vmctx, &state, func() (interface{}, error) {
if pledge.Cmp(MinimumPledge) < 0 {
// TODO This should probably return a non-zero exit code instead of an error.
return nil, Errors[ErrPledgeTooLow]
}
addr, err := vmctx.AddressForNewActor()
minerInitializationParams := miner.NewState(vmctx.Message().From, publicKey, pledge, pid, vmctx.Message().Value)
actorCodeCid := types.MinerActorCodeCid
if vmctx.BlockHeight().Equal(types.NewBlockHeight(0)) {
actorCodeCid = types.BootstrapMinerActorCodeCid
}
if err := vmctx.CreateNewActor(addr, actorCodeCid, minerInitializationParams); err != nil {
return nil, err
}
_, _, err = vmctx.Send(addr, "", vmctx.Message().Value, nil)
if err != nil {
return nil, err
}
}
使用Miner Actor的addAsk提供存储服务。存储需求方,也就是Client。
// AddAsk adds an ask to this miners ask list
func (ma *Actor) AddAsk(ctx exec.VMContext, price *types.AttoFIL, expiry *big.Int) (*big.Int, uint8,
error) {
if err := ctx.Charge(actor.DefaultGasCost); err != nil {
return nil, exec.ErrInsufficientGas, errors.RevertErrorWrap(err, "Insufficient gas")
}
var state State
out, err := actor.WithState(ctx, &state, func() (interface{}, error) {
if ctx.Message().From != state.Owner {
return nil, Errors[ErrCallerUnauthorized]
}
id := big.NewInt(0).Set(state.NextAskID)
state.NextAskID = state.NextAskID.Add(state.NextAskID, big.NewInt(1))
// filter out expired asks
asks := state.Asks
state.Asks = state.Asks[:0]
for _, a := range asks {
if ctx.BlockHeight().LessThan(a.Expiry) {
state.Asks = append(state.Asks, a)
}
}
if !expiry.IsUint64() {
return nil, errors.NewRevertError("expiry was invalid")
}
expiryBH := types.NewBlockHeight(expiry.Uint64())
state.Asks = append(state.Asks, &Ask{
Price: price,
Expiry: ctx.BlockHeight().Add(expiryBH),
ID: id,
})
return id, nil
})
if err != nil {
return nil, errors.CodeError(err), err
}
askID, ok := out.(*big.Int)
if !ok {
return nil, 1, errors.NewRevertErrorf("expected an Integer return value from call, but got %T instead", out)
}
return askID, 0, nil
}
通过getAsks获取所有Miner Actor的存储服务,并在这些服务中确定相应的存储矿工。
// GetAsks returns all the asks for this miner. (TODO: this isnt a great function signature, it returns the asks in a
// serialized array. Consider doing this some other way)
func (ma *Actor) GetAsks(ctx exec.VMContext) ([]uint64, uint8, error) {
if err := ctx.Charge(actor.DefaultGasCost); err != nil {
return nil, exec.ErrInsufficientGas, errors.RevertErrorWrap(err, "Insufficient gas")
}
var state State
out, err := actor.WithState(ctx, &state, func() (interface{}, error) {
var askids []uint64
for _, ask := range state.Asks {
if !ask.ID.IsUint64() {
return nil, errors.NewFaultErrorf("miner ask has invalid ID (bad invariant)")
}
askids = append(askids, ask.ID.Uint64())
}
return askids, nil
})
if err != nil {
return nil, errors.CodeError(err), err
}
askids, ok := out.([]uint64)
if !ok {
return nil, 1, errors.NewRevertErrorf("expected a []uint64 return value from call, but got %T instead", out)
}
return askids, 0, nil
}
在确定存储矿工的基础上,使用createChannel创建支付通道,并和存储矿工进行数据的传输。存储矿工在存储数据后,定期向Miner Actor报告存储证明(submitPoSt)。
// CreateChannel creates a new payment channel from the caller to the target.
// The value attached to the invocation is used as the deposit, and the channel
// will expire and return all of its money to the owner after the given block height.
func (pb *Actor) CreateChannel(vmctx exec.VMContext, target address.Address, eol *types.BlockHeight) (*types.ChannelID, uint8, error) {
if err := vmctx.Charge(actor.DefaultGasCost); err != nil {
return nil, exec.ErrInsufficientGas, errors.RevertErrorWrap(err, "Insufficient gas")
}
…………
return channelID, 0, nil
}
存储矿工通过FPS(Filecon Proving Subsystem)实现数据的存储以及存储证明:SectorBase提供存储接口,Storage Proof提供PoRep以及PoSt的存储证明。