性能文章>给 wireshark 写一个 RocketMQ 协议解析的 lua 插件>

给 wireshark 写一个 RocketMQ 协议解析的 lua 插件原创

581505

前言

学习 RocketMQ,需要搞懂两个东西:通信和存储。这里花了一点时间写了一个 RocketMQ 的 wireshark lua 插件,过程挺有意思,写出来记录一下。
通过阅读这篇文章,你会了解到下面这些知识。

wireshark lua 插件的骨架代码如何编写
插件版 Hello World 如何实现
RocketMQ 的基本通信协议格式
RocketMQ 在 PULL 有消息时的 Body 格式是什么样的

初探 Hello World 插件

从 wireshark 的 about 页面可以看到现在它支持的 Lua 版本,下面是我 v3.0.6 版本的 wireshark 对应的页面。
image.png
可以看到,目前支持的 Lua 版本是 5.2.4。下面我们来看一段骨架代码。

-- 声明协议
local NAME = "RocketMQ"
local PORTS = { 9876, 10911 }
local proto = Proto.new(NAME, "RocketMQ Protocol")

-- 声明 dissector 函数,处理包
function proto.dissector(tvb, pinfo, tree)
    print("load plugin...demo")
    pinfo.cols.protocol = proto.name;
    pinfo.cols.info = "Hello, World"
end

-- 注册 dissector 到 wireshark
for _, port in ipairs(PORTS) do
    DissectorTable.get("tcp.port"):add(port, proto)
end

找到 wireshark 插件目录,在我的电脑上这个路径是 /Applications/Wireshark.app/Contents/Resources/share/wireshark/,修改其中的 init.lua 文件

vim /Applications/Wireshark.app/Contents/Resources/share/wireshark/init.lua

增加一行加载上面 lua 文件的 dofile 调用。

...
dofile("/path/to/demo.lua")

执行前后效果如下
image.png

解析 RocketMQ 协议

RocketMQ 的通信协议是比较简单的,整体的协议格式如下所示
image.png
RocketMQ 的通信协议由四部分组成:

第一部分:头 4 个字节表示剩下三部分的总长度(不包括自己这 4 个字节)
第二部分:接下来的 4 个字节表示 Header 部分的长度
第三部分:接下来的 Header Length 长度的内容为协议头,是用 json 序列化后存储,主要用来表示不同的请求响应类型
第四部分:body 内容

以一个实际的包为例:
image.png
头四个字节 00 00 01 9b 表示整个包的长度 411(0x019b),接下来的四个字节 00 00 00 d4 表示 Header Length,这里为 212(0xD4),接下来的 212 个字节表示Header 的内容,可以看到这是一段 json 的字符串,最后的 195(411-4-212) 个字节表示 Body 的真正内容,具体的消息格式下面会再讲到。
接下来我们来写解析的程序。
解析的逻辑在 proto.dissector 方法中进行,它的签名如下所示

function proto.dissector(tvb, pinfo, tree)
end

这些参数的释义如下:

tvb 是 “Testy Virtual Buffer” 的缩写,是包含数据包的 buffer 内容
pinfo 是 Packet Information 的缩写,表示 Packet 包相关的信息,可以获取包的源端口、目标端口等信息。
tree 表示 wireshark UI 界面的展示树,解析包得到的信息都会添加到这个有层级关系的树中。

接下来我们把 RocketMQ 通信的四个部分展示到 wireshark 中。修改 proto.dissector 函数的代码如下所示

function proto.dissector(tvb, pinfo, tree)
    print("load plugin...demo")

    local subtree = tree:add(proto, tvb())
    pinfo.cols.protocol = proto.name;
    pinfo.cols.info = ""

    local length = tvb(0, 4):uint()
    subtree:add("Total Length", length)
    local headerLength = tvb(4, 4):uint()
    subtree:add("Header Length", headerLength)
    local headerData = tvb(8, headerLength):string()
    subtree:add("Header", headerData)
    local bodyDataLen = length - 4 - headerLength
    local bodyData = tvb(8 + headerLength, bodyDataLen):string()
    subtree:add("Body", bodyData)
end

重新加载 lua 脚本,可以看到 Wireshark 中 RocketMQ 协议的几个部分已经显示出来了。
image.png
为了能区分是通信 Request 还是 Response,我们可以通过目标端口号来区分,新增一个方法。

function isRequest(pinfo)
    local dstPort = pinfo.dst_port;
    for _, port in ipairs(PORTS) do
        if (dstPort == port) then
            return true
        end
    end
    return false
end

在 proto.dissector 中新增对请求和响应的区分,增加更可读的描述

if (isRequest(pinfo)) then
    pinfo.cols.info:append("[REQUEST]" .. "↑↑↑")
else
    pinfo.cols.info:append("[RESPONSE]" .. "↓↓↓")
end

效果如下所示
image.png
接下来我们要做的就是把 json 做解析,展示的更好看一点,先来看 header 和 body 为 json 格式时请求和响应。增加一个递归的方法,统一处理 json 格式的数据

-- k,v 分别表示 json 的 key 和 value,tree 表示 UI 树
function parseAndAddTree(k, v, tree)
    if (type(v) == 'table') then
        local sizeStr = ""
        if (#v > 0) then
            sizeStr = "size: " .. #v
        end;
        local childTree = tree:add(k, sizeStr, tree)
        for key, value in pairs(v) do
            parseAndAddTree(key, value, childTree)
        end
    else
        tree:add(k .. ":", json.stringify(v))
    end
end

在 proto.dissector 方法中增加 Header 的解析,如下所示

local subtree = tree:add(protoMQ, tvb())
local headerTree = subtree:add("Header", "")

-- 解析 json
local header = json.parse(headerData, 1, "}")

for k, v in pairs(header) do
    parseAndAddTree(k, v, headerTree)
end

重新加载运行上面的代码,效果如下所示
image.png
同时我们也可以在 RocketMQ 的源码中找到请求和响应 code 对应的更可读的字符串表示,

local requestCodeMap = {
    [10] = "SEND_MESSAGE",
    [11] = "PULL_MESSAGE",
    [12] = "QUERY_MESSAGE",
    ...
}

local responseCode = {
    [0] = "SUCCESS",
    [1] = "SYSTEM_ERROR",
    [2] = "SYSTEM_BUSY",
}

如果 Body 是 json 字符串的话也可以用这种方式来处理,如下所示
image.png
但是在一些情况下,Body 并不是用 json 字符串来表示的,比如在 PULL 消息的时候,如果服务器有返回可消费的消息,这时 Body 中存储的并不是字符串,而是 RocketMQ 自定义的消息格式,如下所示
image.png
写这段解析是个体力活,我参照 RocketMQ 的 Java 源码实现了一个 lua 版本,完整的代码如下所示,

function decodeMessageExt(bodyTree, pinfo, bodyData)
    local bodyTree = bodyTree:add("Body", "")

    pinfo.cols.info:append(">>>>#FOUND#")

    local offset = 0;

    bodyTree:add("totalSize", bodyData(offset, 4):int())
    offset = offset + 4;

    local magicCode = string.format("0X%8.8X", bodyData(offset, 4):uint())
    bodyTree:add("magicCode", magicCode)
    offset = offset + 4;

    bodyTree:add("bodyCRC", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("queueId", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("flag", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("queueOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    bodyTree:add("physicOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    bodyTree:add("sysFlag", bodyData(offset, 4):int())
    offset = offset + 4;


    bodyTree:add("bornTimeStamp", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    local bornHost = bodyData(offset, 1):uint()
            .. "." .. bodyData(offset + 1, 1):uint()
            .. "." .. bodyData(offset + 2, 1):uint()
            .. "." .. bodyData(offset + 3, 1):uint()

    bodyTree:add("bornHost", bornHost)
    offset = offset + 4;

    bodyTree:add("port", bodyData(offset, 4):int())
    offset = offset + 4;
    bodyTree:add("storeTimestamp", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    local storeHost = bodyData(offset, 1):uint()
            .. "." .. bodyData(offset + 1, 1):uint()
            .. "." .. bodyData(offset + 2, 1):uint()
            .. "." .. bodyData(offset + 3, 1):uint()
    bodyTree:add("storeHost", storeHost)
    offset = offset + 4;

    bodyTree:add("storePort", bodyData(offset, 4):int())
    offset = offset + 4;

    --13 RECONSUMETIMES
    bodyTree:add("reconsumeTimes", bodyData(offset, 4):int())
    offset = offset + 4;
    --14 Prepared Transaction Offset
    bodyTree:add("preparedTransactionOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;
    --15 BODY
    local bodyLen = bodyData(offset, 4):int()
    --            bodyTree:add("bodyLen", bodyLen)
    offset = offset + 4;

    bodyTree:add("body:", bodyData(offset, bodyLen):string())
    offset = offset + bodyLen;

    --16 TOPIC
    local topicLen = bodyData(offset, 1):int()
    offset = offset + 1;
    --            bodyTree:add("topicLen", topicLen)
    local topic = bodyData(offset, topicLen):string()
    bodyTree:add("topic:", topic)
    pinfo.cols.info:append(" topic:" .. topic)

    offset = offset + topicLen;

    --17 properties
    local propertiesLength = bodyData(offset, 2):int()
    offset = offset + 2;
    bodyTree:add("propertiesLength", propertiesLength)

    if (propertiesLength > 0) then
        local propertiesStr = bodyData(offset, propertiesLength):string()
        offset = offset + propertiesLength
        local propertiesTree = bodyTree:add("propertiesStr", "size: " .. propertiesLength)
        for k, v in string.gmatch(propertiesStr, "(%w+)\1(%w+)") do
            propertiesTree:add(k, v)
        end
    end
end

运行的效果如下所示
image.png
完整的代码我放在了 github.com/arthur-zhan…, 有兴趣的同学可以看看。除了前面文章中的那些功能,还有实现将 topic 等有用的信息提取到 Info 那一栏,方便查看通信的过程~

后记

没事折腾折腾还挺有意思的,在后台开发中 Lua 这门胶水语言除了在 OpenResty、Redis 中有不少用处之外,还有不少有趣的用途等待我们去发掘。
通过写这个插件,我自己对 RocketMQ 通信的细节更加清楚~

分类:标签:
请先登录,感受更多精彩内容
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

从Linux源码看Socket(TCP)的listen及连接队列
前言如果能知道从应用到框架再到操作系统的每一处代码,是一件Exciting的事情。 今天笔者是从Linux源码的角度看下Server端的Socket在进行listen的时候到底做了哪些事情(基于Lin
记一次对端机器宕机后的tcp行为
前言机器一般过质保之后,就会因为各种各样的问题而宕机。而这一次的宕机,让笔者观察到了平常观察不到的tcp在对端宕机情况下的行为。经过详细跟踪分析原因之后,发现可以通过调整内核tcp参数来减少宕机造成的
记一次调用外网服务概率性失败问题的排查
前言和外部联调一直是令人困扰的问题,尤其是一些基础环境配置导致的问题。笔者在一次偶然情况下解决了一个调用外网服务概率性失败的问题。在此将排查过程发出来,希望读者遇到此问题的时候,能够知道如何入手。 起
RocketMQ 在使用上的一些排坑和优化
前言:RocketMQ 在我们的项目中使用非常广泛,在使用的过程中,也遇到了很多的问题。比如没有多环境的隔离,在多个版本同时开发送测的情况下,互相干扰严重。RocketMQ 的投递可能会失败,导致丢失
给 wireshark 写一个 RocketMQ 协议解析的 lua 插件
前言学习 RocketMQ,需要搞懂两个东西:通信和存储。这里花了一点时间写了一个 RocketMQ 的 wireshark lua 插件,过程挺有意思,写出来记录一下。通过阅读这篇文章,你会了解到下
再聊 TCP backlog
这篇文章我们以 backlog 参数来深入研究一下建连的过程。通过阅读这篇文章,你会了解到下面这些知识:- backlog、半连接队列、全连接队列是什么- linux 内核是如何计算半连接队列、全连接
网络 IO 高级篇:一次有趣的 Docker 网络问题排查
前段时间公司的安卓打包服务出现问题,现象是在上传 360 服务器进行加固的时候,非常大概率会卡在上传阶段,长时间重试最后失败。我对这个情况进行了一些排查分析,解决了这个问题,写了这篇长文复盘了排查的经
分享一次排查CLOSE_WAIT过多的经验
作者:踩刀诗人原文链接:https://www.cnblogs.com/chopper-poet/p/14618391.html 问题背景某日下午有测试人员急匆匆的跑来跟我反馈:“有客户反馈供应商附件