以太坊C++系列(01)-p2p模块整体运行流程

预览

所有涉及到的类大概如下图所示。当然,为了简化类图,只是将涉及到p2p运转关键流程类里面的变量与方法摘录了出来。注意:ConsensusHost与ConsensusHostPeer类的继承关系跟EthereumHost一致,所以不详细画出。

主要功能

节点获取与监听

我们自己由于加了一些ca证书验证过程,所以使用JuHost类继承了Host,实例出来的对象是JuHost,但分析的时候,不做区分。

从上图看出,由于Host继承了Work类,Work类是一个线程的实现类,所以Host本质是启动一个线程来进行运转的。Host类主要功能有如下:

  • 接收连接请求:在启动线程调用:在线程进行启动之前的调用Host::startedWorking()里面,有调用void Host::runAcceptor()来使用异步的方式去接收其他主机发来的连接请求。一旦收到节点的请求,将数据的接收主动权交给RLPXHandshake类,由这个类去进行进一步的验证,自己继续等待下一个连接的到来。至于握手主要是剔除不是以太坊节点的连接。类似于电影里面的事先商量的暗号,只有暗号对上了,才进行下一步操作。
  • "发现"其他节点:这里的发现两字我是打了引号的。在以太坊节点中,对节点的发现的协议是采用了类Kademlia协议,由于我们是联盟链,所有的节点都是已知的。主要实现在void JuHost::checkPeers(boost::system::error_code const&)。节点的获取大概步骤如下所示:
    1. 调用合约RegisterContract方法getContractAddress(string,string,string,string)获取NodeInfoManager合约地址。调用处:JuHost::readPeers()
    2. 调用合约NodeInfoManager方法getRevision()获取版本号。调用处:JuHost::readPeers()
    3. 调用合约NodeInfoManager方法getEnodeList()获取整个节点信息。调用处:JuHost::readPeers()
    4. 节点存入NodeTable并尝试使用UDP去ping加入的节点。调用处:JuHost::readPeers() -> NodeTable::addNode() -> NodeTable::ping()
    5. 使用TCP去链接节点。这样,发出去的连接请求,将由上面的void Host::runAcceptor()接收到。调用处:JuHost::checkPeers() -> Host::connect()。
    6. 链接成功,开启RLPXHandshake进行握手流程。JuHost::checkPeers() -> Host::connect -> RLPXHandshake::start() -> RLPXHandshake::transition()

当然,联盟链也有可能随时有节点的加入与退出。为了保证及时获取新的节点进行互联,每隔 5s 的时间将会重新去智能合约中读取一次节点信息。具体代码实现如下所示:

void JuHost::run(boost::system::error_code const& error)
{
    Host::run(error);
    if (m_run)
    {
        checkPeers(boost::system::error_code());
        m_timer->expires_from_now(boost::posix_time::milliseconds(m_checkPeerInterval));
        m_timer->async_wait([this](boost::system::error_code const& error) { 
            run(error); 
        });
    }
}

数据的发送与接收处理

前面说到,对于收到的请求会调用RLPXHandshake进行握手验证,如果验证失败自然是断开连接即可。如果握手成功呢,那么肯定要进行数据的交互。这个正是由Session类来完成的。

RLPXHandshake最后一次完成ReadHello状态之后,会在m_host->startPeerSession(m_remote, rlp, move(m_io), m_socket);函数里面调用shared_ptr<SessionFace> ps = make_shared<Session>(this, move(_io), _s, p, psi);来创建Session。值得注意的是,Session类的构造函数的第三个参数为std::shared_ptr<RLPXSocket> const& _s,所以,Session类具有网络通讯的能力。构造完Session类之后,在后面紧接着调用了一下Session类的start函数,即ps->start()

顺着上面的调用,进入Session类的start()函数里面。里面很简单,发了一个ping()消息之后再调用doRead()doRead()函数有点长,简化一下代码如下:

void Session::doRead()
{
    auto self(shared_from_this());
    m_data.resize(h256::size);
    ba::async_read(m_socket->ref(), buffer(m_data, h256::size), [this, self, ...](ec, length)
    {
        // ......
        // 巴拉巴拉一大堆检查不过,直接return;

        // 检查通过,解包
        RLPXFrameInfo header(bytesConstRef(m_data.data(), length));

        auto tlen = hLength + hPadding + h128::size;
        m_data.resize(tlen);

        // 根据头部得到的长度,继续读数据
        ba::async_read(m_socket->ref(), buffer(m_data, tlen), [this, self, ...](ec, length)
        {
            // ......
            // 巴拉巴拉又是一大堆检查不过,直接return;

            // 检查通过,把RLP的数据r丢给readPacket去处理
            bool ok = readPacket(hProtocolId, packetType, r);

            // 循环调用
            doRead();
        });
    });
}

将上面的代码翻译过来就是,每次固定读取32字节的信息,然后根据这32字节信息读取后面再需要读取的信息长度,一旦第二次读取的信息检查通过之后,将数据交给函数readPacket去处理。

进入readPacket函数,简化一下代码如下:

bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
    if (_t < UserPacket)
    {
        return interpret(_t, _r);
    }

    for (auto const& i: m_capabilities)
    {
        if(xxx)
        {
            i.second->interpret(_t - i.second->m_idOffset, _r);
        }
    }
}

翻译一下就是,如果消息小于UserPacket,那么我自己的函数interpret()来处理就好了,如果不是的,那丢给我的成员变量m_capabilities的second的interpret函数去处理吧。其中自己处理的消息函数也简单,收到Ping消息回个Pong消息,其他也都是简单的消息的处理。那么根据上面的一路分析,p2p里面通信交换的重要数据,比如交易信息,块信息等等,毫无疑问都是在Session成员变量m_capabilities里面进行处理了。

回过头来,重点看一下变量m_capabilities的定义:

using CapDesc = std::pair<std::string, u256>;
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities;

由代码可知,最终其他数据的处理是由Capability类的函数interpret来处理的。

我们先来找找m_capabilities这个map里面的值是如何加进去的,也简单,Session类里面只有一个函数对m_capabilities有增加,那就是registerCapability()函数,继续搜谁调用了Session类的这个函数的地方,也只有搜到了唯一一处,那就是在类HostCapability的函数newPeerCapability()里面有调用。

继续搜谁调用了HostCapability的函数newPeerCapability()函数。发现有两个地方有调用。

第一处是EthereumHost类的newPeerCapability()调用了,从上面的类图可以看出,因为EthereumHost类是类HostCapability的子类,无非就是调用了一个父类的方法,所以只有真正调用了EthereumHost类的方法newPeerCapability()才能调到父类HostCapability的函数newPeerCapability()

第二个地方是我们前面提到过的类Host函数startPeerSession(),代码如下所示:

unsigned offset = (unsigned)UserPacket;
for (auto const& i: caps)
{
    auto pcap = m_capabilities[i];
    if (!pcap)
        return ps->disconnect(IncompatibleProtocol);

    pcap->newPeerCapability(ps, offset, i);
    offset += pcap->messageCount();
}

排除第一处的调用,那么毫无疑问,Session类里面的m_capabilities变量的值是由Host类里面成员变量m_capabilities的HostCapabilityFace类的方法newPeerCapability()加进去的。此处需要注意Session类与Host类的成语变量,虽然命名一样,但定义是不一样的。Session定义的map<CapDesc, Capability*>,而Host定义的是map<CapDesc, HostCapabilityFace*>

那么现在问题回到了,跟Session一样,Host类里面的成员变量m_capabilities这个map里面的值是如何加进去的。Host里面有个函数registerCapability()是增加m_capabilities这个map里面的值的。搜一下,在Client.cpp(JuClient.cpp)的函数init(),在JuSealEngine.cpp的函数init(),在WebThree.cpp的函数init()均有调用,三处调用如下所示:

auto host = _extNet->registerCapability(make_shared<EthereumHost>(bc(), m_stateDB, m_tq, m_bq, _networkId)); // JuClient.cpp
std::weak_ptr<ConsensusHost> consensusHost = m_client->host()->registerCapability(make_shared<ConsensusHost>(m_client->tq())); // JuSealEngine.cpp
m_whisper = m_net.registerCapability(make_shared<WhisperHost>()); // WebThree.cpp

由传进去的EthereumHost,ConsensusHost,WhisperHost都是继承HostCapability<class PeerCap>,而HostCapability<class PeerCap>是接口类HostCapabilityFace的实现,所以,我门只需要选区一个分析即可,我们选则EthereumHost。(由于我们使用PBFT共识算法,ConsensusHost是以太坊没有的)

由上面的倒推法,p2p的大概流程基本就弄清楚了。综合类图对类的描述,以EthereumHost为例大概流程如下:

  1. 在JuClient.cpp初始化的时候,调用Host类的registerCapability方法,实例化一个EthereumHost类。并将实例化的EthereumHost加入到Host类的成员变量m_capabilities里面,由于EthereumHost类继承了类HostCapability,HostCapability类需要带一个模板参数PeerCap。而EthereumHost类传进去的模板参数是类就是EthereumPeer。查看EthereumHost类的功能,在maintainTransactions()里面对交易进行广播,在maintainBlocks() 里面对块进行广播。而广播块跟交易是由类EthereumPeersealAndSend()方法进行的,这就是为什么EthereumHost在实例化的时候模板参数为EthereumPeer的原因。虽然模板参数为类EthereumPeer,但是实例化EthereumHost的时候是没有实例化EthereumPeer的。这就要回到我们上面说的类Host函数startPeerSession()调用pcap->newPeerCapability(ps, offset, i);,这个pcap正是从m_capabilities里面取出来的。在调用EthereumHost类方法newPeerCapability(),实例化了类EthereumPeer。并将实列化的EthereumPeer加入了Session类的成员变量m_capabilities中。具体代码如下所示:
    virtual std::shared_ptr newPeerCapability(std::shared_ptr const& _s, unsigned _idOffset, CapDesc const& _cap, uint16_t _capID)
    {
      _s->registerFraming(_capID);
      auto p = std::make_shared(_s, this, _idOffset, _cap, _capID);
      _s->registerCapability(_cap, p); // 此处将PeerCap即EthereumPeer加入到Session类的成员变量m_capabilities中
      return p;
    }
  2. EthereumPeer是如何完成数据的广播的?从上面我们对Session类的分析,数据的接收与传送都是通过它来完成的。这就要回到EthereumPeer的构造了,它的构造函数如下所示:
    EthereumPeer(std::shared_ptr _s, p2p::HostCapabilityFace* _h, ...);

    由类图可知,SessionFace是Session类的父类,HostCapabilityFace是EthereumHost类的父类。所以,等于它可以调用Session类来进行对交易的发送。

  3. 上面我们在分析Session类调用readPacket函数的时候,我们说到,大于UserPacket消息类型都是由Session里面的成员变量m_capabilities的Capability来处理的。它是如何分发给各个Capability的呢?答案在实列化Capability(比如EthereumPeer)的时候需要传一个m_idOffset,根据这个m_idOffset,就能计算出是这个消息该给哪个Capability去处理。接下来的处理,就交给对应的以Capability类为父类的子类(如EthereumPeer)的函数interpret()来处理了。

自此,p2p整个流程对数据的发送与接收分析完毕!

暂无评论

发送评论 编辑评论


				
上一篇
下一篇