以太坊C++系列(02)-p2p网络错误排查

问题描述

最近在做一个项目,需求是有大量的初始数据需要上链。为了加快将这些初始数据上链,我们在一个交易里面打包了大量的数据,我们先假设一个交易大小为1M。然后在交易队列里面不爆满的情况下,不断的发交易给后台。没过多长时间,我就会收到如下的警告错误信息:

WARN >>[p2p] [Session.cpp:420:operator()] frame decrypt failed
WARN >>[p2p] [Session.cpp:271:operator()] Error sending: Broken pipe
WARN >>[p2p] [Session.cpp:459:checkRead] Error reading - Abrupt peer disconnect: End of file
WARN >>[p2p] [Session.cpp:271:operator()] Error sending: Broken pipe
WARN >>[p2p] [Session.cpp:459:checkRead] Error reading - Abrupt peer disconnect: End of file
WARN >>[p2p] [Session.cpp:459:checkRead] Error reading - Abrupt peer disconnect: End of file
WARN >>[p2p] [Session.cpp:271:operator()] Error sending: Operation canceled
WARN >>[p2p] [Session.cpp:459:checkRead] Error reading - Abrupt peer disconnect: End of file
WARN >>[p2p] [Session.cpp:271:operator()] Error sending: Operation canceled
WARN >>[p2p] [Session.cpp:420:operator()] frame decrypt failed
WARN >>[p2p] [Session.cpp:420:operator()] frame decrypt failed
WARN >>[p2p] [Session.cpp:420:operator()] frame decrypt failed
WARN >>[p2p] [Session.cpp:459:checkRead] Error reading - Abrupt peer disconnect: End of file

发生这种问题之后,不仅发送出的交易没有上链,而且导致出现问题的节点无法再与其他节点恢复p2p通讯,最终只能重启所有链才能恢复该链的正常运转。

问题追踪

顺着报错的提示,我们找到了Session.cpp第一次发出警告信息的"frame decrypt failed"函数处,该函数代码如下:

void Session::doRead()
{
    // ignore packets received while waiting to disconnect.
    if (m_dropped)
        return;

    auto self(shared_from_this());
    m_data.resize(h256::size);
    ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
    {
        std::string header;
        for(auto n : m_data)
        {
            header += std::to_string(int(n)) + " ";
        }

        ThreadContext tc(info().id.abridged());
        ThreadContext tc2(info().clientVersion);
        if (!checkRead(h256::size, ec, length))
            return;
        else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
        {
            cwarn << "header decrypt failed";
            drop(BadProtocol); // todo: better error
            return;
        }

        uint16_t hProtocolId;
        uint32_t hLength;
        uint8_t hPadding;
        try
        {
            RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
            hProtocolId = header.protocolId;
            hLength = header.length;
            hPadding = header.padding;
        }
        catch (std::exception const& _e)
        {
            cwarn << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
            drop(BadProtocol);
            return;
        }

        /// read padded frame and mac
        auto tlen = hLength + hPadding + h128::size;
        m_data.resize(tlen);
        ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, hLength, hProtocolId, tlen, header](boost::system::error_code ec, std::size_t length)
        {
            ThreadContext tc(info().id.abridged());
            ThreadContext tc2(info().clientVersion);
            if(length > 64)
            {
                cdebug << "header: " << header << ", except read length " << (tlen + 32) << " , actual read length " << (length + 32);
            }
            if (!checkRead(tlen, ec, length))
                return;
            else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
            {
                cwarn << "frame decrypt failed";
                drop(BadProtocol); // todo: better error
                return;
            }

            bytesConstRef frame(m_data.data(), hLength);
            if (!checkPacket(frame))
            {
                cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
                cwarn << "INVALID MESSAGE RECEIVED";
                disconnect(BadProtocol);
                return;
            }
            else
            {
                auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
                RLP r(frame.cropped(1));
                bool ok = readPacket(hProtocolId, packetType, r);
                (void)ok;
#if ETH_DEBUG
                if (!ok)
                    cwarn << "Couldn't interpret packet." << RLP(r);
#endif
            }
            doRead();
        });
    });
}

问题报错在上述代码的第60行处。从字面意思来理解,是解包出错。解包出错,当时我猜测有以下可能:

  1. p2p传输过程中,头部数据传输错误,导致解出来的后续需要接收的body的数据长度有误,然后解包出错。
  2. p2p在传输过程中,传输数据混乱。比如我需要传两个包数据给对方,第一次传输的时候,传输的是第一个包的30%给对方,第二次传输的是第二个包的50%给对方。这样,对方收到的数据就不是第一个包了,而是第一个包的30% + 第二个包的50%。
  3. p2p使用的网络传输库boost::asio传输的数据长度有限。比如p2p传输一个完整包是15M,而boost::asio每次传输的数据不能超过10M。

首先我通过阅读boost::asio相关文档排除了第3个猜测,boost::asio的读取函数async_read是会通过async_read_some反复读取数据,直到接收完毕才将数据给回调函数的。

然后第2个猜测也很快就排除了,因为p2p相关源码在发送的时候,是将需要发送的数据都放在一个队列里面,然后从队列里面取出确认将发送的数据发送完毕了,才会发送下一个。而且取数据的时候是有加锁的。

那么,当时就开始着手验证第1个猜测。由于解包的地方,需要先解32个头部字节,根据这解出来的头部,获取需要读取的body。于是,我在发送数据的地方,将头部的数据打印了出来。为了对比接收的地方头部,我也在接收处打印了出来(见上面代码12 ~ 15,54地方处)。发送出的代码如下所示:

void Session::write()
{
    bytes const* out = nullptr;
    DEV_GUARDED(x_framing)
    {
        m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);
        out = &m_writeQueue[0];
    }
    auto self(shared_from_this());
    if(out->size() > 96)
    {
        std::string header;
        for(int i=0; i<32; i++)
        {
            header += std::to_string(out->at(i)) + " ";
        }
        cdebug << "header: " << header << ", write length " << out->size();
    }

    ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
    {
        ThreadContext tc(info().id.abridged());
        ThreadContext tc2(info().clientVersion);
        // must check queue, as write callback can occur following dropped()
        if (ec)
        {
            cwarn << "Error sending: " << ec.message();
            drop(TCPError);
            return;
        }

        DEV_GUARDED(x_framing)
        {
            m_writeQueue.pop_front();
            if (m_writeQueue.empty())
                return;
        }
        write();
    });
}

结果打印出来之后,发送处日志如下:

header: 65 172 249 207 44 156 249 109 196 92 216 125 161 109 114 156 251 177 148 248 32 226 246 49 61 108 113 182 16 45 245 254 , write length 936464
header: 232 114 63 255 240 15 114 103 151 253 156 197 200 109 93 2 200 212 193 110 70 80 176141 140 224 31 228 129 50 50 201 , write length 104112
header: 41 85 46 3 121 13 66 254 142 219 188 193 110 50 150 229 65 210 83 235 252 34 195 1251 6 79 19 106 133 223 234 , write length 104864
header: 171 139 67 173 241 215 112 87 122 183 215 180 1 164 122 204 243 29 80 60 111 171 19574 25 204 234 114 253 194 90 5 , write length 416240
header: 190 204 228 33 32 24 10 10 17 220 110 143 42 119 53 128 121 222 132 76 130 224 179 196 96 81 243 134 63 243 89 16 , write length 26219392

接收处的日志如下:

header: 65 172 249 207 44 156 249 109 196 92 216 125 161 109 114 156 251 177 148 248 32 226 246 49 61 108 113 182 16 45 245 254 , except read length 936464  , actual read length 936464
header: 232 114 63 255 240 15 114 103 151 253 156 197 200 109 93 2 200 212 193 110 70 80 176 141 140 224 31 228 129 50 50 201 , except read length 104112  , actual read length 104112
header: 41 85 46 3 121 13 66 254 142 219 188 193 110 50 150 229 65 210 83 235 252 34 195 125 1 6 79 19 106 133 223 234 , except read length 104864  , actual read length 104864
header: 171 139 67 173 241 215 112 87 122 183 215 180 1 164 122 204 243 29 80 60 111 171 195 74 25 204 234 114 253 194 90 5 , except read length 416240  , actual read length 416240
header: 190 204 228 33 32 24 10 10 17 220 110 143 42 119 53 128 121 222 132 76 130 224 179 196 96 81 243 134 63 243 89 16 , except read length 9442176  , actual read length 9442176

分析日志之后,发现他们的头部数据完全是一致的,只有除了最后一条,发送的数据是26219392,而那边接收到的却是9442176。

当时看到这个结果有点懵逼。因为头部发送的数据是正确的,而且当时最后根据头部解出来的数据应收9442176数据,实际上也是收到了这么多。问题到底出在了哪里了呢?

当时我就想到,是不是存body的len的长度被截断了呢?然后我找到了存长度的变量uint32_t hLength,定义的uint32_t远远大于26219392的长度。

当时又想到,由于以太坊的数据都是以RLP格式编码存储的,是不是因为RLP编码本身的存储他有长度限制呢?这个咨询了相关同事之后,也给出了否定的答案。

后面,抱着试一试的态度,打开了在解头部的时候用到的结构体的构造函数。代码如下:

RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header):
    length((_header[0] * 256 + _header[1]) * 256 + _header[2]),
    padding((16 - (length % 16)) % 16),
    data(_header.cropped(3).toBytes()),
    header(RLP(data, RLP::ThrowOnFail | RLP::FailIfTooSmall)),
    protocolId(header[0].toInt<uint16_t>()),
    multiFrame(header.itemCount() > 1),
    sequenceId(multiFrame ? header[1].toInt<uint16_t>() : 0),
    totalLength(header.itemCount() == 3 ? header[2].toInt<uint32_t>() : 0)
{}

然后一切都明白了,保存body长度的变量hLength,是由RLPXFrameInfo类的成员变量length的来的,而length的赋值为length((_header[0] * 256 + _header[1]) * 256 + _header[2]),而传进来的bytesConstRef里面存的是uint_8,所以,length的最大长度(_header[0, 1, 2] 按最大值256算)只有可能是:(256 * 256 + 256) * 256 + 256 === 16843008。由此可以得出,给body的长度描述只用了3个字节来保存。一旦超过这个长度,后面解出来的body的长度一定是错误的。而上面的日志也证明了这一点,试图传26219392长度数据就出错了,前面因为长度小于16843008都正确传输并解析了。

解决方案

找出问题出在哪里之后,有两种办法可以解决这个问题:

  1. 将描述body长度的字节增加,突破每次可以传输的长度。
  2. 每次传输的长度不超过16843008。

在以太坊数据传输过程中,数据都是RLP存储的,而RLP的最小单位是uint_8。 所以第一种解决方案可以增加一个字节来存储body的长度,或者,在RLPXFrameInfo类里面的用其他变量的某些没用到的bit位来存储。这种方案打包的地方都要改,改动的也比较多。

考虑到以太坊规定每个块能包含的最大交易数只能是1024,而传输交易的时候,最大只能是256个。所以我们决定让块跟传输交易的时候让它不超过15M,代码改起来也很简单,就是在交易队列里面取交易的时候,做一下限制,代码如下:

Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid, std::size_t _originalLen) const
{
    ReadGuard l(m_lock);
    Transactions ret;
    std::size_t dataTotal = _originalLen;
    const std::size_t data15M = 1024 * 1024 * 15; // p2p传播的时候,由于描述body的len是3字节,所以每次传数的数据不能大于2的24次方16777216
    for (auto t = m_current.begin(); dataTotal < data15M && ret.size() < _limit && t != m_current.end(); ++t)
    {
        if (!_avoid.count(t->transaction.sha3()))
        {
            dataTotal += t->transaction.data().size();
            // 防止最后一次超过15M
            if(dataTotal < data15M)
            {
                ret.push_back(t->transaction);
            }
        }
    }
    return ret;
}

至此,问题终于得到解决!

其他

为什么发送26219392,对方解出来只有9442176了呢?上面说到,存储body的长度是3字节,也就是24bit,所以能存储的最大长度是2的24次方为 16777216,而 16777216 + 9442176 刚好就是 26219392。再一次印证了是由于存储body的长度不够发生了字段的截取!

相关资料

The RLPx Transport Protocol

暂无评论

发送评论 编辑评论


				
上一篇
下一篇