Filecoin – Sector状态管理逻辑

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。

好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。 Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。 ## 01 模块框架 Sector状态管理相关的模块如下: ![image.png](https://img.learnblockchain.cn/attachments/2020/06/6eoWVZS85ee842c35da75.png!/scale/50) Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起: ## 02 StateMachine StateMachine定义在go-statemachine包中,machine.go: ``` type StateMachine struct { planner Planner eventsIn chan Event name interface{} st *statestore.StoredState stateType reflect.Type stageDone chan struct{} closing chan struct{} closed chan struct{} busy int32 } ``` 其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。 `type Planner func(events []Event, user interface{}) (interface{}, uint64, error)` st是存储的状态。stateType是存储状态的类型。 StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键: ``` err := fsm.mutateUser(func(user interface{}) (err error) { nextStep, processed, err = fsm.planner(pendingEvents, user) ustate = user if xerrors.Is(err, ErrTerminated) { terminated = true return nil } return err }) ``` mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。 ![](https://img.learnblockchain.cn/2020/06/16_/723991229.png!/scale/50) mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。 ## 03 StateGroup 往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。 ``` type StateGroup struct { sts *statestore.StateStore hnd StateHandler stateType reflect.Type closing chan struct{} initNotifier sync.Once lk sync.Mutex sms map[datastore.Key]*StateMachine } ``` 其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口: ``` type StateHandler interface { Plan(events []Event, user interface{}) (interface{}, uint64, error) } ``` 也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。 ## 04 StoredState StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。 ## 05 SectorState storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中: ``` type SectorInfo struct { State SectorState SectorNumber abi.SectorNumber Nonce uint64 SectorType abi.RegisteredProof // Packing Pieces []Piece // PreCommit1 TicketValue abi.SealRandomness TicketEpoch abi.ChainEpoch PreCommit1Out storage.PreCommit1Out // PreCommit2 CommD *cid.Cid CommR *cid.Cid Proof []byte PreCommitMessage *cid.Cid // WaitSeed SeedValue abi.InteractiveSealRandomness SeedEpoch abi.ChainEpoch // Committing CommitMessage *cid.Cid InvalidProofs uint64 // Faults FaultReportMsg *cid.Cid // Debug LastErr string Log []Log } ``` SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中: ``` const ( UndefinedSectorState SectorState = "" // happy path Empty SectorState = "Empty" Packing SectorState = "Packing" // sector not in sealStore, and not on chain PreCommit1 SectorState = "PreCommit1" // do PreCommit1 PreCommit2 SectorState = "PreCommit2" // do PreCommit1 PreCommitting SectorState = "PreCommitting" // on chain pre-commit WaitSeed SectorState = "WaitSeed" // waiting for seed Committing SectorState = "Committing" CommitWait SectorState = "CommitWait" // waiting for message to land on chain FinalizeSector SectorState = "FinalizeSector" Proving SectorState = "Proving" // error modes FailedUnrecoverable SectorState = "FailedUnrecoverable" SealFailed SectorState = "SealFailed" PreCommitFailed SectorState = "PreCommitFailed" ComputeProofFailed SectorState = "ComputeProofFailed" CommitFailed SectorState = "CommitFailed" PackingFailed SectorState = "PackingFailed" Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain FaultedFinal SectorState = "FaultedFinal" // fault declared on chain ) ``` 熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。 fsm.go中Sealing结构的Plan函数是Sector状态的处理函数: ``` func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { next, err := m.plan(events, user.(*SectorInfo)) if err != nil || next == nil { return nil, uint64(len(events)), err } return func(ctx statemachine.Context, si SectorInfo) error { err := next(ctx, si) if err != nil { log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err) return nil } return nil }, uint64(len(events)), nil // TODO: This processed event count is not very correct } ``` Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。 针对Sector的状态转换的定义,在fsmPlanners中: ``` var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{ UndefinedSectorState: planOne(on(SectorStart{}, Packing)), Packing: planOne(on(SectorPacked{}, PreCommit1)), PreCommit1: planOne( on(SectorPreCommit1{}, PreCommit2), on(SectorSealPreCommitFailed{}, SealFailed), on(SectorPackingFailed{}, PackingFailed), ), PreCommit2: planOne( on(SectorPreCommit2{}, PreCommitting), on(SectorSealPreCommitFailed{}, SealFailed), on(SectorPackingFailed{}, PackingFailed), ), ... ``` 比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图: ![](https://img.learnblockchain.cn/2020/06/16_/365654219.png!/scale/50) 在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。 ``` func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { tok, epoch, err := m.api.ChainHead(ctx.Context()) ... pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) if err != nil { return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } ... } ``` 很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义: **Empty** - 空状态 **Packing** - 打包状态,多个Piece填充到一个Sector中 **PreCommit1** - PreCommit1计算 **PreCommit2** - PreCommit2计算 **PreCommitting** - 提交Precommit2的结果到链上 **WaitSeed** - 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测) **Committing** - 计算Commit1/Commit2,并将证明提交到链上 **CommitWait** - 等待链上确认 **FinalizeSector** - Sector状态确定 ## **总结:** Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。 --- 我的公众号**星想法**有很多原创高质量文章,欢迎大家扫码关注。 ![公众号-星想法](https://img.learnblockchain.cn/2019/15572190575887.jpg!/scale/20)

好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。

Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。

01 模块框架

Sector状态管理相关的模块如下:

Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起:

02 StateMachine

StateMachine定义在go-statemachine包中,machine.go:

 type StateMachine struct {
 planner  Planner
 eventsIn chan Event

 name      interface{}
 st        *statestore.StoredState
 stateType reflect.Type

 stageDone chan struct{}
 closing   chan struct{}
 closed    chan struct{}

 busy int32
 }

其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。

type Planner func(events []Event, user interface{}) (interface{}, uint64, error)

st是存储的状态。stateType是存储状态的类型。

StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键:

err := fsm.mutateUser(func(user interface{}) (err error) {
 nextStep, processed, err = fsm.planner(pendingEvents, user)
 ustate = user
 if xerrors.Is(err, ErrTerminated) {
 terminated = true
 return nil
 }
 return err
})

mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。

mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。

03 StateGroup

往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。

 type StateGroup struct {
 sts       *statestore.StateStore
 hnd       StateHandler
 stateType reflect.Type

 closing      chan struct{}
 initNotifier sync.Once

 lk  sync.Mutex
 sms map[datastore.Key]*StateMachine
 }

其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口:

 type StateHandler interface {
 Plan(events []Event, user interface{}) (interface{}, uint64, error)
 }

也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。

04 StoredState

StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。

05 SectorState

storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中:

 type SectorInfo struct {
 State        SectorState
 SectorNumber abi.SectorNumber
 Nonce        uint64
 SectorType abi.RegisteredProof
 // Packing
 Pieces []Piece
 // PreCommit1
 TicketValue   abi.SealRandomness
 TicketEpoch   abi.ChainEpoch
 PreCommit1Out storage.PreCommit1Out
 // PreCommit2
 CommD *cid.Cid
 CommR *cid.Cid
 Proof []byte
 PreCommitMessage *cid.Cid
 // WaitSeed
 SeedValue abi.InteractiveSealRandomness
 SeedEpoch abi.ChainEpoch
 // Committing
 CommitMessage *cid.Cid
 InvalidProofs uint64
 // Faults
 FaultReportMsg *cid.Cid
 // Debug
 LastErr string
 Log []Log
 }

SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中:

 const (
 UndefinedSectorState SectorState = ""

 // happy path
 Empty          SectorState = "Empty"
 Packing        SectorState = "Packing"       // sector not in sealStore, and not on chain
 PreCommit1     SectorState = "PreCommit1"    // do PreCommit1
 PreCommit2     SectorState = "PreCommit2"    // do PreCommit1
 PreCommitting  SectorState = "PreCommitting" // on chain pre-commit
 WaitSeed       SectorState = "WaitSeed"      // waiting for seed
 Committing     SectorState = "Committing"
 CommitWait     SectorState = "CommitWait" // waiting for message to land on chain
 FinalizeSector SectorState = "FinalizeSector"
 Proving        SectorState = "Proving"
 // error modes
 FailedUnrecoverable SectorState = "FailedUnrecoverable"
 SealFailed          SectorState = "SealFailed"
 PreCommitFailed     SectorState = "PreCommitFailed"
 ComputeProofFailed  SectorState = "ComputeProofFailed"
 CommitFailed        SectorState = "CommitFailed"
 PackingFailed       SectorState = "PackingFailed"
 Faulty              SectorState = "Faulty"        // sector is corrupted or gone for some reason
 FaultReported       SectorState = "FaultReported" // sector has been declared as a fault on chain
 FaultedFinal        SectorState = "FaultedFinal"  // fault declared on chain
 )

熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。

fsm.go中Sealing结构的Plan函数是Sector状态的处理函数:

 func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
 next, err := m.plan(events, user.(*SectorInfo))
 if err != nil || next == nil {
 return nil, uint64(len(events)), err
 }

 return func(ctx statemachine.Context, si SectorInfo) error {
 err := next(ctx, si)
 if err != nil {
 log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
 return nil
 }

 return nil
 }, uint64(len(events)), nil // TODO: This processed event count is not very correct
 }

Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。

针对Sector的状态转换的定义,在fsmPlanners中:

 var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
 UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
 Packing:              planOne(on(SectorPacked{}, PreCommit1)),
 PreCommit1: planOne(
 on(SectorPreCommit1{}, PreCommit2),
 on(SectorSealPreCommitFailed{}, SealFailed),
 on(SectorPackingFailed{}, PackingFailed),
 ),
 PreCommit2: planOne(
 on(SectorPreCommit2{}, PreCommitting),
 on(SectorSealPreCommitFailed{}, SealFailed),
 on(SectorPackingFailed{}, PackingFailed),
 ),
...

比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图:

在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。

func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
 tok, epoch, err := m.api.ChainHead(ctx.Context())
 ...
 pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
 if err != nil {
 return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
 }
 ...
 }

很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义:

Empty - 空状态

Packing - 打包状态,多个Piece填充到一个Sector中

PreCommit1 - PreCommit1计算

PreCommit2 - PreCommit2计算

PreCommitting - 提交Precommit2的结果到链上

WaitSeed - 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测)

Committing - 计算Commit1/Commit2,并将证明提交到链上

CommitWait - 等待链上确认 FinalizeSector - Sector状态确定

总结:

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。

我的公众号星想法有很多原创高质量文章,欢迎大家扫码关注。

区块链技术网。

  • 发表于 2020-06-16 11:54
  • 阅读 ( 2436 )
  • 学分 ( 111 )
  • 分类:FileCoin

评论