node包中的node.go文件
// cmd.go中调用的
// (geth启动追踪)
// Start create a live P2P node and starts running it.
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
// Short circuit if the node's already running
if n.server != nil {//n.server是*p2p.Server
return ErrNodeRunning
}
// 打开datadir目录
if err := n.openDataDir(); err != nil {
return err
}
// 初始化serverConfig的相关配置
// Initialize the p2p server. This creates the node key and
// discovery databases.
n.serverConfig = n.config.P2P
// 设置私钥结构(这个结构中包括了公钥)
n.serverConfig.PrivateKey = n.config.NodeKey()
fmt.Printf("cccccccc:n.serverConfig.PrivateKey:%+v\n",*(n.serverConfig.PrivateKey))
n.serverConfig.Name = n.config.NodeName()
n.serverConfig.Logger = n.log
if n.serverConfig.StaticNodes == nil {
n.serverConfig.StaticNodes = n.config.StaticNodes()
}
if n.serverConfig.TrustedNodes == nil {
n.serverConfig.TrustedNodes = n.config.TrustedNodes()
}
// 节点发现数据库
if n.serverConfig.NodeDatabase == "" {
n.serverConfig.NodeDatabase = n.config.NodeDB()
}
fmt.Printf("loglog:n.serverConfig.NodeDatabase:%+v\n",n.serverConfig.NodeDatabase)
//创建p2p.Server{},此时Server中的Protocol[]是空的
//创建的p2p.Server中的Config是从Node中得到的 (geth启动追踪)
fmt.Printf("config:protocols:%+v\n",n.serverConfig.Protocols)
running := &p2p.Server{Config: n.serverConfig}//zp 所有远端peer与己方之间的通信都是通过p2p.Server{}来管理的,此时Protocol[]是空的
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
// Otherwise copy and specialize the P2P configuration
services := make(map[reflect.Type]Service)
fmt.Printf("log:n.serviceFuncs len:%d..........\n",len(n.serviceFuncs))
// n.serviceFuncs是在各个service在registe时apend的,func (n *Node) Register(constructor ServiceConstructor)
for _, constructor := range n.serviceFuncs {//
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access //没执行
ctx.services[kind] = s
}
// Construct and save the service //这个很关键 用来返回管理该service的结构,如eth的Ethereum struct和 Whisper struct
service, err := constructor(ctx)
fmt.Printf("log:service:%+v\n",service)
if err != nil {
return err
}
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service//收集ProtocolManager.Protocols
}
// Gather the protocols and start the freshly assembled P2P server
fmt.Printf("log:%+v\n",services)
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)//把收集到的Protocol交给p2p.Server对象
}
fmt.Printf("config:protocols:%+v\n",running.Protocols)
// running是 p2p.Server struct
// 节点的启动!!!(geth启动追踪)running是p2p.Server结构,所有远端peer与己方之间的通信都是通过它来管理的
if err := running.Start(); err != nil {//Server.Start() zp 启动Server对象
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
for kind, service := range services {
// 这里每个service的实现,它的start函数都会创建线程,以后的这个服务相关操作都是通过新开的线程来做了
// Start the next service, stopping all previous upon failure
// 比如,这里的service.Start可能会执行whisper的start,ethereum的start(分别是whisper协议和eth协议)
if err := service.Start(running); err != nil {//将Server对象作为参数,逐一启动每个<Service>实现体,这里启动的是(ProtocolManager)的Start方法
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// 把RPC的Service启动起来
// Lastly start the configured RPC interfaces //zp
if err := n.startRPC(services); err != nil { //如:Service: NewPublicTransactionPoolAPI(apiBackend, nonceLock)处理收到的交易请求
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})
return nil
}
注意这里:
for kind, service := range services {
// 这里每个service的实现,它的start函数都会创建线程,以后的这个服务相关操作都是通过新开的线程来做了
// Start the next service, stopping all previous upon failure
// 比如,这里的service.Start可能会执行whisper的start,ethereum的start(分别是whisper协议和eth协议)
if err := service.Start(running); err != nil {//将Server对象作为参数,逐一启动每个<Service>实现体,这里启动的是(ProtocolManager)的Start方法
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
代码第二行,执行service的Start函数,每个serivce的实现体(如eth,whisper)都是从Start开始执行的。Start函数中一般会开启线程,最后该服务的操作都交给来这些线程,如eth服务,它会开启四个线程分别做广播新出现的交易对象;广播新挖掘出来的区块;定时与网络同步;刚与远端节点连接时将新出现的交易对象发给它。whisper服务会开启1+cpu数目个线程,一个做定时更新,其它的做消息处理。执行完Start表明服务正式完全启动。
p2p包中server.go文件
// runPeer runs in its own goroutine for each peer.
// it waits until the Peer logic returns and removes
// the peer.
func (srv *Server) runPeer(p *Peer) {
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
// broadcast peer add
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeAdd,
Peer: p.ID(),
})
fmt.Printf("llllllllllllllllllllllllllllll\n")
// 非常非常的重要!!!!!!
// run the protocol
remoteRequested, err := p.run()
// broadcast peer drop
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeDrop,
Peer: p.ID(),
Error: err.Error(),
})
//fmt.Printf("llllllllllllllllllllllllllllll\n")
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
//fmt.Printf("lllllll%+v\n",srv.PeerCount())
srv.delpeer <- peerDrop{p, err, remoteRequested}
fmt.Printf("dropdrop\n")
}
上面的这个Server的runPeer函数,被Server的run函数调用:
// srv.startListening()中监听连接请求,当RLPX两次握手成功后,会将p2p.conn塞入srv.addpeer通道
// 见本文件大约900行位置func (srv *Server) checkpoint(c *conn, stage chan<- *conn)
case c := <-srv.addpeer://远端peer发来连接请求?新的p2p.conn{} 在这里取出之前压入addpeer的连接对象conn,执行到这里说明握手完成并且通过了节点验证
// At this point the connection is past the protocol handshake.
// Its capabilities are known and the remote identity is verified.
//fmt.Printf(".........................................aaa%+v\n",srv.PeerCount())//似乎不能放这里执行
fmt.Printf("tttttttttttttttttt:<-srv.addpeer\n")
err := srv.protoHandshakeChecks(peers, inboundCount, c)
fmt.Printf("qqqqqqqqqqqqqqq:err:%+v\n",err)
if err == nil {
// The handshakes are done and it passed all checks.
// 创建节点peer对象,传入所有子协议实现,自己实现的子协议就是在这里传入peer的,传入的所有协议通过matchProtocols函数格式化组织
p := newPeer(c, srv.Protocols)//调用newPeer生成新的peer对象,并把Server.Protocols全交给peer zp
// If message events are enabled, pass the peerFeed
// to the peer
if srv.EnableMsgEvents {
p.events = &srv.peerFeed
}
name := truncateName(c.name)
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
go srv.runPeer(p)//最终peer处理逻辑
peers[c.id] = p
if p.Inbound() {
inboundCount++
}
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
// discarded. Unblock the task last.
select {
case c.cont <- err:
fmt.Printf("errerrerrerr................err:%+v\n",err)
case <-srv.quit:
fmt.Printf(".................break running\n")
break running
}
即,当有新的节点与本地节点成功进行来RLPx两次握手后,会调用runPeer函数。
而runPeer函数会调用Peer的run函数,这个Peer结构是连上一个新远端节点时新创建的,它代表与一个特定的远端节点的连接。
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
//peer逻辑里最重要的两个循环逻辑
//收到消息循环,核心逻辑是根据消息的代号proto,err:=p.getProto(msg.Code),取得对应的子协议,然后
//投放到对应协议的读队列proto.in <- msg
go p.readLoop(readErr)
//不停发送ping心跳包到远端peer
go p.pingLoop()
// Start all protocol handlers.
writeStart <- struct{}{}
// 在startProtocols里最终调用我们自定义子协议的Run方法proto.Run(p, rw)
p.startProtocols(writeStart, writeErr)
// Wait for an error or disconnect.
loop:
for {
select {
case err = <-writeErr:
// A write finished. Allow the next write to start if
// there was no error.
if err != nil {
reason = DiscNetworkError
break loop
}
writeStart <- struct{}{}
case err = <-readErr:
if r, ok := err.(DiscReason); ok {
remoteRequested = true
reason = r
} else {
reason = DiscNetworkError
}
break loop
case err = <-p.protoErr:
reason = discReasonForError(err)
break loop
case err = <-p.disc:
break loop
}
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
它创建来2个线程,一个执行读数据循环,不停的读取远端发来的数据;一个执行发送ping心跳包到远端peer。注意,该对节点,他俩通信时,支持的所有协议的数据都是从这个读线程获取的。这个读线程会执行Peer的handle函数,它会将不同协议的数据发到相应的通道上,这样就能区分开发给不同协议的数据了。Peer的run函数本身这个线程也会一直循环,等待收到错误或断开连接。
Peer的run函数还会执行Peer的startProtocols函数,这个函数非常重要。
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
//fmt.Printf(fmt.Sprintf("Starting protocol %s/%d,peer.go", proto.Name, proto.Version))
go func() {
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
}
}
这个函数会开启协议数目个线程,执行每个协议定义的Run函数,如下:
// Protocol represents a P2P subprotocol implementation.
type Protocol struct {
// Name should contain the official protocol name,
// often a three-letter word.
Name string
// Version should contain the version number of the protocol.
Version uint
// Length should contain the number of message codes used
// by the protocol.
Length uint64
// Run is called in a new groutine when the protocol has been
// negotiated(商谈) with a peer. It should read and write messages from
// rw. The Payload for each message must be fully consumed(消费?).
//
// The peer connection is closed when Start returns. It should return
// any protocol-level error (such as an I/O error) that is
// encountered.
Run func(peer *Peer, rw MsgReadWriter) error
// NodeInfo is an optional helper method to retrieve protocol specific metadata
// about the host node.
NodeInfo func() interface{}
// PeerInfo is an optional helper method to retrieve protocol specific metadata
// about a certain peer in the network. If an info retrieval function is set,
// but returns nil, it is assumed that the protocol handshake is still running.
PeerInfo func(id discover.NodeID) interface{}
}
Peer的startProtocols函数如下:
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
proto.wstart = writeStart
proto.werr = writeErr
var rw MsgReadWriter = proto
if p.events != nil {
rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
}
p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
//fmt.Printf(fmt.Sprintf("Starting protocol %s/%d,peer.go", proto.Name, proto.Version))
go func() {
err := proto.Run(p, rw)
if err == nil {
p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
err = errProtocolReturned
} else if err != io.EOF {
p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
}
p.protoErr <- err
p.wg.Done()
}()
}
}
也就是说,当与新的远程节点建立连接后,会执行节点支持的协议中的Run。这里Run相当于一个回调函数。
如:eth的Run:
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
// 还记得p2p里面的Protocol吗,p2p的peer连接成功之后会调用Run方法
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer: //把peer发送到newPeerCh通道,因为这个通道是无缓冲区的,所以程序阻塞在这里,直到该通道中的数据被取出 func (pm *ProtocolManager) syncer()函数中被取出
manager.wg.Add(1)
defer manager.wg.Done()
// ProtocolManager.handle()会在ProtocolManager对象创建时,以回调函数的方式"埋入"每个p2p.Protocol对象中
return manager.handle(peer)//当manager.newPeerCh <- peer
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
eth的Run会执行ProtocolManager的handle函数,该函数也会对这对eth节点做握手验证。如下:
//会在ProtocolManage对象创建时,以回调函数的方式埋入每个p2p.Protocol对象(实现了Protocal.Run方法)
//之后每当有新peer要与己方建立通信时,如果对方能够支持该Protocol,那么双方就可以顺利的建立并开始通信。
// handle is the callback invoked(调用) to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
fmt.Printf("Ethereum peer connected name:%s..........\n",p.Name())
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
//1.握手:td是total difficult,head是当前的区块头,genesis是创世区块的信息,只有创世区块相同才能握手成功
if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
fmt.Printf("Error Ethereum handshake failed:%s..........\n",err.Error())
return err
}
//2.初始化一个读写通道,用以和对方peer相互通信
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
//3.Register the peer locally 注册对方peer,存入己方peer列表,只有handle函数退出时,才会将这个peer移除出列表
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
fmt.Printf("Errot Ethereum peer registration failed:%s..........\n",err.Error())
}
defer pm.removePeer(p.id)
//4.Downloader成员注册这个新peer,Downloader会自己维护一个相邻peer列表
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
//5.调用syncTransactions(),用当前txpool中新累积的tx对象组装成一个txsync{}对象,推送到内部通道txsynCh。
//Start()启动的四个函数中,第四个txsyncLoop()中用以等待txsync{}数据的通道txsyncCh,在这里被推入txsync{}
// Propagate(传播) existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// 验证peer的DAO硬分叉
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// 如果15秒内没有接收到回应,那么断开连接
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
//6.在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
验证成功后,会在无限循环中启动ProtoclManager的handleMsg,处理远程节点发来的消息。
对于whisper,它的Run是:
// 注意这个是每个远端节点都会维护一个!!!!!!!广播时也是一个一个的广播
// 与每个节点连接一次都会执行一次,只有一个节点,节点间没有协商时不会执行
// HandlePeer is called by the underlying P2P layer when the whisper sub-protocol
// connection is negotiated(谈判协商).
func (wh *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
//fmt.Printf("logzp:HandlerPeer.............\n")
// 这里的peer是代表一个whisper协议的一个连接
// Create the new peer and start tracking it
// whisperv5.peer包含了whisper、远端p2p.peer、rw等信息
whisperPeer := newPeer(wh, peer, rw)
wh.peerMu.Lock()
wh.peers[whisperPeer] = struct{}{}
wh.peerMu.Unlock()
defer func() {
wh.peerMu.Lock()
delete(wh.peers, whisperPeer)
wh.peerMu.Unlock()
}()
// Run the peer handshake and state updates
if err := whisperPeer.handshake(); err != nil {
fmt.Printf("log:handshake wrong:%s......\n",err.Error())
return err
}
//fmt.Printf("log:whisper peer size:%d\n",len(wh.peers))
//fmt.Printf("remote nodeID:%s...............\n",string(whisperPeer.ID()))
// whisperPeer.start()开来一个新协程,会执行定时广播操作
whisperPeer.start()
defer whisperPeer.stop()
return wh.runMessageLoop(whisperPeer, rw)
}
同eth,会先进行握手验证,再开启一个线程执行定时广播消息,然后执行whisper消息处理的循环。
从上面可以发现:握手会分2层进行,最开始是RLPx节点连接时,进行的ECDH密钥协商的握手和协议握手(这对节点支持的协议、节点名称、节点版本)。第二层握手是执行每个协议的Run时进行的握手(如果失败,会断开这对节点的连接)。