性能文章>【全网首发】APM工具Skywalking是如何进行JVM监控的>

【全网首发】APM工具Skywalking是如何进行JVM监控的原创

2周前
203511

Skywalking是如何进行JVM监控的

大家都知道Skywalking可以监控Java的JVM情况,包括垃圾回收情况等等,那么它是怎么实现的呢?今天就带大家一探究竟。

通过前几篇的文章我们知道,Skywalking启动的时候,会加载各种BootService实现类,而有关JVM的BootService实现类就是JVMService

JVMService

JVMService可以看做一个定时器,它收集JVM cpu、内存、内存池和gc 信息等等参数,并将收集到的信息通过GRPCChannelManager提供的通道发送给Collector,GRPCChannelManager这个类我们在上篇文章我们就进行了介绍,主要是用来建立连接管理通道的

JVMService实现BootService接口和Runnable接口

我们按照调用方法的顺序分析一下吧

prepare()方法

JVMService的prepare()方法:

public static int BUFFER_SIZE = 60 * 10;
public void prepare() throws Throwable {
        queue = new LinkedBlockingQueue<JVMMetric>(Config.Jvm.BUFFER_SIZE);
        sender = new Sender();
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);
    }

准备阶段就是创建一个LinkedBlockingQueue类型的阻塞队列,队列大小为600,这个队列保存的是JVMMetric对象,然后创建了一个Sender对象,找到GRPCChannelManager对象,并把Sender对象加入监听类中。

boot()方法

JVMService的boot()方法:

public void boot() throws Throwable {
        collectMetricFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("JVMService produces metrics failure.", t);
                }
            }), 0, 1, TimeUnit.SECONDS);
        sendMetricFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("JVMService consumes and upload failure.", t);
                }
            }
            ), 0, 1, TimeUnit.SECONDS);
    }

boot()方法中定义两个定时线程池每隔一秒创建一个线程,一个是生产线程,一个是消费线程,生产者的逻辑对应JVMService的run()方法,而消费者的逻辑在Sender的run()方法中

run()方法

JVMService的run()方法:

public void run() {
        if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
            && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
        ) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();
                jvmBuilder.setTime(currentTimeMillis);
                jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());
                jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());
                jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList());
                jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());

                JVMMetric jvmMetric = jvmBuilder.build();
                if (!queue.offer(jvmMetric)) {
                    queue.poll();
                    queue.offer(jvmMetric);
                }
            } catch (Exception e) {
                logger.error(e, "Collect JVM info fail.");
            }
        }
    }

run()中的逻辑主要是创建JVMMetric对象,创建完成后放入队列中,从准备方法中我们知道队列大小设置是600,当队列满的时候,会取出最久的那个淘汰掉,然后放入新的JVMMetric对象到队尾。

从组装JVMMetric对象的过程我们可以看到,JVMMetric对象中包含了当前时间,CPU指标、内存指标还有垃圾回收指标。这三个指标的获取都是利用枚举实现的单例模式,然后getGCList()方法中调用了GCModule抽象类的getGCList()方法,在获取新生代和老年代垃圾回收器名称的时候定义了抽象方法,具体逻辑每个垃圾回收器重写各自的抽象方法,这是模板方法的体现

Sender

run()方法:

Sender对象作为消费者,它的功能就是通过drainTo()方法来把队列中的数据转移到buffer集合中,然后发送到Collector中。

总结

这篇文章我们讲了Skywalking是怎么进行JVM参数收集的,主要涉及到的类是JVMService,它的类定义了一个LinkedBlockingQueue类型的阻塞队列,长度是600,然后启动方法中启动了两个定时线程池创建了生产者JVMService线程和消费者Sender线程,JVMService作为生产者收集CPU、内存、垃圾回收信息等放入队列,Sender作为消费者从队列获取到所有JVM信息发送给Collector

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏
  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。
  3. 有不当之处欢迎批评指正。
分类:标签:
请先登录,查看1条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

大数据中台之Kafka,到底好在哪里?
Hello,大家好,今天给大家分享一个大数据里面很火的技术——Kafka,Kafka 是一个分布式的消息系统,其高性能在圈内很出名。本人阅读过多个大数据生态的开源技术的源码,个人感觉 Kafka 的源
什么?搞不定Kafka重复消费?
今天我们聊一个话题,如何保证 Kafka 消息不重复消费?在使用 Kafka 的时候一般都会设置重试的次数,但是因为网络的一些原因,设置了重试就有可能导致有些消息重复发送了(当然导致消息重复也有可能是其他原因),那么怎么解决消息重复这个问题呢?
Kafka的生产者优秀架构设计
前言 Kafka 是一个高吞吐量的分布式的发布订阅消息系统,在全世界都很流行,在大数据项目里面使用尤其频繁。笔者看过多个大数据开源产品的源码,感觉 Kafka 的源码是其中质量比较上乘的一个,这得益于
一次 Docker 容器内大量僵尸进程排查分析
前段时间线上的一个使用 Google Puppeteer 生成图片的服务炸了,每个 docker 容器内都有几千个孤儿僵死进程没有回收,如下图所示。这篇文章比较长,主要就讲了下面这几个问题。- 什么情
What?一个 Dubbo 服务启动要两个小时!
前言前几天在测试环境碰到一个非常奇怪的与 ```dubbo``` 相关的问题,事后我在网上搜索了一圈并没有发现类似的帖子或文章,于是便有了这篇。希望对还未碰到或正在碰到的朋友有所帮助。 现象现象是这样
Prometheus时序数据库-报警的计算
前言在前面的文章中,笔者详细的阐述了Prometheus的数据插入存储查询等过程。但作为一个监控神器,报警计算功能是必不可少的。自然的Prometheus也提供了灵活强大的报警规则可以让我们自由去发挥
【全网首发】APM工具Skywalking是如何进行JVM监控的
Skywalking是如何进行JVM监控的大家都知道Skywalking可以监控Java的JVM情况,包括垃圾回收情况等等,那么它是怎么实现的呢?今天就带大家一探究竟。通过前几篇的文章我们知道,Skywalking启动的时候,会加载各种BootService实现类,而有关JVM的BootServ
【全网首发】Skywalking的Trace信息可以跨线程吗
Skywalking的Trace信息可以跨线程吗Trace信息是一个重要的信息,那么Skywalking的trace可以跨线程传播吗?我们先给出答案,它是是可以的跨线程传播的,今天就带大家看一下Trace跨线程的使用和实现原理使用private static final ExecutorSe