性能文章>高级篇:一次Netty"引发的"诡异old gc问题排查过程>

高级篇:一次Netty"引发的"诡异old gc问题排查过程原创

1年前
2580113

应用:新美大push服务-长连通道sailfish
日推送消息:180亿
QPS峰值: 35W
最大实时在线用户:2200W

push服务简单结构为

客户端sdk<=>长连通道<=>pushServer

1.客户端sdk: 负责提供客户客户端收发push的api
2.长连通道:负责维持海量客户端连接
3.pushServer:负责给业务方提供收发push的rpc服务,与长连通道通过tcp连接,自定义协议,具体的push服务设计另起文章

首先依据这篇文章把push长连通道应用的jvm参数调到最优,见海量连接服务端jvm参数调优杂记 剩下的都是这篇文章之后所发生

2016年9月2号6:00 左右陆续收到两台机器的报警,上去看一下cat监控

改造之前gc情况

发现 在凌晨4:11分左右,这台机器cms old区域到达old gc阀值1525M(old区域设置为2048M, -XX:CMSInitiatingOccupancyFraction=70,所以阀值为1433M,前一分钟为1428.7M),于是进行old gc,结果进行一次old gc之后,啥也没回收掉,接下来一次次old gc,old区不减反增,甚是诡异!

gc日志

在4:10:29开始频繁old gc(其实这是第二次old gc了,之前已经有过一次,不过可以忽略,我就拿这次来分析),发现old gc过后,old区域大小基本没变,所以这个时候可以断定old区里面肯定有一直被引用的对象,猜测为缓存之类的对象

使用 jmap -dump:live,format=b,file=xxx [pid] 先触发一次gc再dump
重点关注这台10.32.145.237

dump 的时候,花了long long的时间,为了不影响线上引用,遂放弃。。。

9月3号早上又发现old gc,于是连忙起床去dump内存,总内存为1.8G,MAT载入分析

堆内存

光这两个家伙就占据了71.24%,其他的可以忽略不计
然后看到NioSocketChannel这个家伙,对应着某条TCP连接,于是追根溯源,找到这条连接对应的机器

NioSocketChannel 堆内存

然后去cmdb里面一查

cmdb

发现是pushServer的机器。长连通道服务器是用netty实现,自带缓冲区,对外连接着海量的客户端,将海量用户的请求转发给pushServer,而pushServer是BIO实现,无IO缓冲区,当pushServer的TCP缓冲区满了之后,TCP滑动窗口为0,那么长连服务器发送给这台机器的消息netty就一直会保存在自带的缓冲区ChannelOutBoundBuffer里,撑大old区。接下来需要进一步验证

9月5号早上,来公司验证,跑到10.12.22.253这台机器看一下tcp底层缓冲区的情况

tcp -antp | grep 9000

发现tcp发送队列积压了这么多数据没发出去,这种情况发生的原因是接收方来不及处理,接收方的接收队列里面数据积压,于是导致发送方发送不出去,接下来就跑到接收方机器上看下tcp的接收队列

 
10.32.177.127$ tcp -antp | grep 9000
 
10.4.210.192$ tcp -antp | grep 9000
 
10.4.210.193$ tcp -antp | grep 9000

果不其然,三台机器接受队列都撑得很大,到这里,问题基本排查出来了,结论是接收方处理速度过慢导致发送方积压消息过多,netty会把要发送的消息保存在ChannelOutboundBuffer里面,随着积压的消息越来越多,导致old区域逐渐扩大,最终old gc,然而这些消息对象都是live的,因此回收不掉,所以频繁old gc

9月5号下午
考虑到pushServer改造nio需要一段时间,长连通道这边又无法忍受频繁old gc而不得不重启应用,于是在通道端做了一点更改,在选择pushServer写的时候,只选择可写的Channel

 public ChannelGroupFuture writeAndFlushRandom(Object message) {
        final int size = super.size();
        if (size <= 0) {
            return super.writeAndFlush(message);
        }

        return super.writeAndFlush(message, new ChannelMatcher() {
            private int index = 0;
            private int matchedIndex = random.nextInt(size());

            @Override
            public boolean matches(Channel channel) {
                return matchedIndex == index++ && channel.isWritable();
            }
        });
    }

以上&& channel.isWritable()为新添加代码,追踪一下isWritable方法的实现,最终是调用到ChannelOutboundBuffer的isWritable方法

AbstractChannel.java

@Override
    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable();
    }

ChannelOutboundBuffer.java

  public boolean isWritable() {
        return unwritable == 0;
    }

而unwritable这个field是在这里被确定

ChannelOutboundBuffer.java

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

其中channel.config().getWriteBufferHighWaterMark()返回的field是ChannelConfig里面对应的writeBufferHighWaterMark,可以看到,默认值为64K, 表示如果你在写之前调用调用isWriteable方法,netty最多给你缓存64K的数据, 否则,缓存就一直膨胀

DefaultChannelConfig.java

private volatile int writeBufferHighWaterMark = 64 * 1024;

由此可见,Channel可写至少是TCP缓冲区+netty缓冲区(默认64K)都没有写满, 我这边的做法就是当某个Channel写满之后,就放弃这条Channel,随机选择其他的Channel。

改造完之后,观察了一个多礼拜,old区域已缓慢稳定增长,达到预期效果

改造之后gc情况

可以发现,每次old区域都是1M左右的增长

另外一个问题:每次olg gc的时候重启机器,瞬间异常井喷

TransferToPushServerException 异常井喷

重启三次,三次异常,结合前面的ChannelOutboundBuffer,不难分析,这些写失败的都是之前被堵塞的buffer,重启之后,关闭与pushServer的连接,进入到如下方法

AbstractChannel.java

  public final void close(final ChannelPromise promise) {
            if (!promise.setUncancellable()) {
                return;
            }

            if (outboundBuffer == null) {
                // Only needed if no VoidChannelPromise.
                if (!(promise instanceof VoidChannelPromise)) {
                    // This means close() was called before so we just register a listener and return
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            if (closeFuture.isDone()) {
                // Closed already.
                safeSetSuccess(promise);
                return;
            }

            final boolean wasActive = isActive();
            final ChannelOutboundBuffer buffer = outboundBuffer;
            outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
            Executor closeExecutor = closeExecutor();
            if (closeExecutor != null) {
                closeExecutor.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new OneTimeTask() {
                                @Override
                                public void run() {
                                    // Fail all the queued messages
                                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    // Fail all the queued messages.
                    buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                    buffer.close(CLOSED_CHANNEL_EXCEPTION);
                }
                if (inFlush0) {
                    invokeLater(new OneTimeTask() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

程序进入到outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
然后进入到ChannelOutboundBufferfailFlushed方法

void failFlushed(Throwable cause, boolean notify) {
        // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
        // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
        // indirectly (usually by closing the channel.)
        //
        // See https://github.com/netty/netty/issues/1501
        if (inFail) {
            return;
        }

        try {
            inFail = true;
            for (;;) {
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }

这里的for循环导致remove0 会遍历Entry缓存对象链表

private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, fail and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);

            safeFail(promise, cause);
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // recycle the entry
        e.recycle();

        return true;
    }
private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 指针指向下一个待删除的缓存
            flushedEntry = e.next;
        }
    }

直到所有的缓存对象都被remove掉,remove0 每调用一次都会调用一次safeFail(promise, cause)方法,

 private static void safeFail(ChannelPromise promise, Throwable cause) {
        if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
            logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
        }
    }

然后进入到

DefaultPromise.java

 @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

DefaultPromise.java

static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }

最终一个future回调,调回到用户方法

@Override
    protected void channelRead0(ChannelHandlerContext ctx, TransferFromSdkDataPacket msg) throws Exception {
        TransferToPushServerDataPacket dataPacket = new TransferToPushServerDataPacket();

        dataPacket.setVersion(Constants.PUSH_SERVER_VERSION);
        dataPacket.setData(msg.getData());
        dataPacket.setConnectionId(ctx.channel().attr(AttributeKeys.CONNECTION_ID).get());

        final long startTime = System.nanoTime();

        try {
            ChannelGroupFuture channelFutures = pushServerChannels.writeAndFlushRandom(dataPacket);
            channelFutures.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        CatUtil.logTransaction(startTime, null, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    } else {
                        Channel channel = (Channel) ((ChannelGroupFuture) future).group().toArray()[0];
                        final String pushServer = channel.remoteAddress().toString();
                        TransferToPushServerException e = new TransferToPushServerException(String.format("pushServer: %s", pushServer), future.cause());

                        CatUtil.logTransaction(new CatUtil.CatTransactionCallBack() {
                            @Override
                            protected void beforeComplete() {
                                Cat.logEvent(CatEvents.WriteToPushServerError, pushServer);
                            }
                        }, startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
                    }
                }

            });
        } catch (Exception e) {
            CatUtil.logTransaction(startTime, e, CatTransactions.TransferToPushServer, CatTransactions.TransferToPushServer);
        }
    }

而在用户方法里面,我们包装了一下自定义异常,喷到cat,导致瞬间TransferToPushServerException飙高

如果你觉得看的不过瘾,想系统学习Netty原理,那么你一定不要错过我的Netty源码分析系列视频:https://coding.imooc.com/class/230.html



点赞收藏
分类:标签:
闪电侠

嗯,程序员一枚

请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

【全网首发】一次想不到的 Bootstrap 类加载器带来的 Native 内存泄露分析

【全网首发】一次想不到的 Bootstrap 类加载器带来的 Native 内存泄露分析

记一次“雪花算法”造成的生产事故的排查记录

记一次“雪花算法”造成的生产事故的排查记录

【全网首发】一次疑似 JVM Native 内存泄露的问题分析

【全网首发】一次疑似 JVM Native 内存泄露的问题分析

解读JVM级别本地缓存Caffeine青出于蓝的要诀2 —— 弄清楚Caffeine的同步、异步回源方式

解读JVM级别本地缓存Caffeine青出于蓝的要诀2 —— 弄清楚Caffeine的同步、异步回源方式

单服务并发出票实践

单服务并发出票实践

刺激,线程池的一个BUG直接把CPU干到100%了。

刺激,线程池的一个BUG直接把CPU干到100%了。

13
1