性能文章>测试Activemq artemis队列生产消费ack>

测试Activemq artemis队列生产消费ack原创

366266

 

 

前言

这个测试主要是测试ActiveMQ Artemis消息中间件,因为业务的特殊性,需要测试消息一来一回算一次,单次统计结果可以参考上一篇测试的结果。

架构设计

还是采用客户端和服务端再到客户端的形式,毕竟只用本地很容易卡影响结果,消息先从本地发送到服务器,然后从服务器发送回本地,完成一次消息完整的流程。也测试了服务器到服务器的场景,但是是单个服务所以相当于左手换右手的感觉。

测试代码

其余代码参考上一篇笔记

@Log4j2
@Component
public class ListenerAndSendMq {

    private static final String UPDATE_API_STRATEGY_AND_BASKET = "localhost";

    public static AtomicInteger atomicInteger = new AtomicInteger();
    public static AtomicInteger atomicIntegerSingle = new AtomicInteger();
    AtomicReference<Long> start = new AtomicReference<>(0L);
    int cores = Runtime.getRuntime().availableProcessors();

    ThreadPoolExecutor threadPoolExecutor
            = new ThreadPoolExecutor(cores + 2, cores + 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());
//            new ArrayBlockingQueue(10), new BlockPolicy());
//              new SynchronousQueue(), new BlockPolicy());
//            new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static int count = 50000;

    @PostConstruct
    public void receiveApiStrategyCache() {

        log.info("====================receiveApiStrategyCache========cores{}============", cores);
        Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET+"/ack", new MessageConsumer() {
            @Override
            public void onMessage(final Message<?> message) {
                if (message instanceof BinaryMessage) {
                    try {
                        final BinaryMessage msg = (BinaryMessage) message;
                        final MessageHeader header = msg.getHeader();
                        final String payload = new String(msg.getPayload());
                        long timestamp = header.getTimestamp();
                        if (atomicIntegerSingle.get() == 0) {
                            start.set(System.nanoTime());
                        }
                        if (atomicIntegerSingle.incrementAndGet() >= count) {
                            log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
                            log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
                        }
                        sendMessage(UPDATE_API_STRATEGY_AND_BASKET, payload, header);
                    } catch (Exception e) {
                        log.error(e);
                    }
                }
            }

            @Override
            public void onError(final Message<?> message) {
                log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
            }
        });
        Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET+"/ack/multiple", new MessageConsumer() {
            @Override
            public void onMessage(final Message<?> message) {
                if (message instanceof BinaryMessage) {
                    threadPoolExecutor.execute(() -> {
                        final BinaryMessage msg = (BinaryMessage) message;
                        final MessageHeader header = msg.getHeader();
                        final String payload = new String(msg.getPayload());
                        long timestamp = header.getTimestamp();
                        if (atomicInteger.get() == 0) {
                            start.set(System.nanoTime());
                        }
                        if (atomicInteger.incrementAndGet() >= count) {
                            log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
                            log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
                        }
                        sendMessage(UPDATE_API_STRATEGY_AND_BASKET+"/multiple", payload, header);
                    });
                }
            }

            @Override
            public void onError(final Message<?> message) {
                log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
            }
        });
    }

    public void sendMessage(String topic, String world, MessageHeader oldHeader) {
        final MessageHeader header = new MessageHeader(topic, oldHeader.getId(), oldHeader.getTimestamp());
        final BinaryMessage message = new BinaryMessage(header, world.getBytes());
        try {
            Messenger.send(message);
        } catch (final IOException e) {
            log.error("error", e);
        }
    }

}

5000数据测试

异步接收

只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
5000 mac 6 12+2 636ms centos 8 8+2 467ms 373ms centos 8 8+2 490ms 404ms
5000 mac 6 12+2 503ms centos 8 8+2 634ms 503ms centos 8 8+2 661ms 533ms
5000 mac 6 12+2 888ms centos 8 8+2 726ms 378ms centos 8 8+2 849ms 509ms
5000 mac 6 12+2 611ms centos 8 8+2 424ms 111ms mac 6 12+2 520ms 200ms
5000 mac 6 12+2 694ms centos 8 8+2 338ms 88ms mac 6 12+2 605ms 362ms
5000 mac 6 12+2 939ms centos 8 8+2 735ms 152ms mac 6 12+2 953ms 369ms

异步消费

这里主要是边处理消息和边统计的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
5000 mac 6 12+2 821ms centos 8 8+2 742ms 402ms centos 8 8+2 768ms 433ms
5000 mac 6 12+2 664ms centos 8 8+2 834ms 614ms centos 8 8+2 884ms 673ms
5000 mac 6 12+2 759ms centos 8 8+2 1105ms 783ms centos 8 8+2 1200ms 882ms
5000 mac 6 12+2 600ms centos 8 8+2 680ms 561ms mac 6 12+2 666ms 435ms
5000 mac 6 12+2 577ms centos 8 8+2 674ms 528ms mac 6 12+2 688ms 369ms
5000 mac 6 12+2 720ms centos 8 8+2 963ms 718ms mac 6 12+2 992ms 578ms

1万数据测试

异步接收

只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
10000 mac 6 12+2 929ms centos 8 8+2 841ms 460ms centos 8 8+2 894ms 532ms
10000 mac 6 12+2 805ms centos 8 8+2 767ms 482ms centos 8 8+2 883ms 623ms
10000 mac 6 12+2 1013ms centos 8 8+2 1005ms 692ms centos 8 8+2 1047ms 761ms
10000 mac 6 12+2 831ms centos 8 8+2 913ms 622ms centos 8 8+2 1077ms 788ms
10000 mac 6 12+2 877ms centos 8 8+2 725ms 133ms mac 6 12+2 1122ms 527ms
10000 mac 6 12+2 860ms centos 8 8+2 767ms 245ms mac 6 12+2 1277ms 747ms
10000 mac 6 12+2 857ms centos 8 8+2 854ms 340ms mac 6 12+2 1058ms 552ms

异步消费

这里主要是边处理消息和边统计的情况。

注意这里第一阶段1w的数据已经超过1秒了,是因为包含了转发消息的时间,也就是ack

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
10000 mac 6 12+2 953ms centos 8 8+2 1336ms 832ms centos 8 8+2 1418ms 915ms
10000 mac 6 12+2 847ms centos 8 8+2 1322ms 1031ms centos 8 8+2 1411ms 1130ms
10000 mac 6 12+2 1038ms centos 8 8+2 1419ms 862ms centos 8 8+2 1524ms 975ms
10000 mac 6 12+2 820ms centos 8 8+2 1665ms 1297ms mac 6 12+2 1736ms 1224ms
10000 mac 6 12+2 945ms centos 8 8+2 1812ms 1399ms mac 6 12+2 1802ms 1222ms
10000 mac 6 12+2 1241ms centos 8 8+2 1812ms 1043ms mac 6 12+2 1246ms 878ms

3万数据测试

异步接收

只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
30000 mac 6 12+2 1534ms centos 8 8+2 2313ms 1298ms centos 8 8+2 2576ms 1572ms
30000 mac 6 12+2 1623ms centos 8 8+2 2606ms 1671ms centos 8 8+2 2656ms 1725ms
30000 mac 6 12+2 1781ms centos 8 8+2 2599ms 1369ms centos 8 8+2 2732ms 1514ms
30000 mac 6 12+2 1656ms centos 8 8+2 2769ms 1630ms centos 8 8+2 3150ms 2018ms
30000 mac 6 12+2 1716ms centos 8 8+2 2445ms 1041ms mac 6 12+2 3600ms 2194ms
30000 mac 6 12+2 1561ms centos 8 8+2 1829ms 775ms mac 6 12+2 3377ms 2327ms
30000 mac 6 12+2 1447ms centos 8 8+2 2164ms 1032ms mac 6 12+2 3246ms 2144ms

异步消费

这里主要是边处理消息和边统计的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
30000 mac 6 12+2 2128ms centos 8 8+2 4104ms 2520ms centos 8 8+2 4440ms 2861ms
30000 mac 6 12+2 1596ms centos 8 8+2 4268ms 3256ms centos 8 8+2 4678ms 3667ms
30000 mac 6 12+2 1618ms centos 8 8+2 3452ms 2299ms mac 6 12+2 3699ms 2374ms
30000 mac 6 12+2 1933ms centos 8 8+2 4345ms 3046ms mac 6 12+2 6470ms 5017ms
30000 mac 6 12+2 1502ms centos 8 8+2 3489ms 2647ms mac 6 12+2 3940ms 2925ms

5万数据测试

异步接收

只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时  
50000 mac 6 12+2 25ms centos 8 8+2 4247ms 2704ms centos 8 8+2 4448ms 2928ms  
50000 mac 6 12+2 49ms centos 8 8+2 4101ms 2654ms centos 8 8+2 4253ms 2851ms  
50000 mac 6 12+2 60ms centos 8 8+2 5105ms 3003ms centos 8 8+2 6157ms 4064ms  
50000 mac 6 12+2 2050ms centos 8 8+2 4397ms 2885ms centos 8 8+2 4555ms 3048ms  
50000 mac 6 12+2 2461ms centos 8 8+2 6208ms 4479ms centos 8 8+2 6368ms 4715ms  
50000 mac 6 12+2 2777ms centos 8 8+2 3866ms 1618ms centos 8 8+2 4008ms 1774ms  
50000 mac 6 12+2 2506ms centos 8 8+2 4335ms 2378ms centos 8 8+2 4597ms 2696ms  
50000 mac 6 12+2 2474ms centos 8 8+2 5352ms 3353ms   mac 6 12+2 7933ms 5852ms
50000 mac 6 12+2 2649ms centos 8 8+2 4247ms 1957ms   mac 6 12+2 6064ms 3777ms
50000 mac 6 12+2 2285ms centos 8 8+2 3399ms 1566ms   mac 6 12+2 6006ms 4165ms
50000 mac 6 12+2 2120ms centos 8 8+2 3445ms 1657ms   mac 6 12+2 5289ms 3501ms

异步消费

这里主要是边处理消息和边统计的情况。

数据量 是否处理消息 是否二次投递消息 客户端类型 客户端是否缓存发送消息 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
50000 mac 6 12+2 2083ms centos 8 8+2 9263ms 7614ms centos 8 8+2 9634ms 8340ms
50000 mac 6 12+2 3216ms centos 8 8+2 7933ms 5177ms centos 8 8+2 8586ms 5840ms
50000 mac 6 12+2 2086ms centos 8 8+2 8101ms 6373ms centos 8 8+2 8794ms 7069ms
50000 mac 6 12+2 1857ms mac 6 12+2 12410ms 10930ms mac 6 12+2 12622ms 277ms
50000 mac 6 12+2 1905ms centos 8 8+2 7354ms 5958ms mac 6 12+2 7950ms 6399ms
50000 mac 6 12+2 1984ms centos 8 8+2 7316ms 5931ms mac 6 12+2 7567ms 6023ms
50000 mac 6 12+2 1866ms centos 8 8+2 6693ms 5338ms mac 6 12+2 6946ms 5436ms

发送大小

1KB
生产10000数据总共耗时:937ms
offset:244ms
消费10000数据总共耗时:717ms
2KB
生产10000数据总共耗时:1552ms
offset:106ms
消费10000数据总共耗时:1248ms
3KB
生产10000数据总共耗时:1929ms
offset:170ms
消费10000数据总共耗时:1642ms
5KB
生产10000数据总共耗时:3000ms
offset:203ms
消费10000数据总共耗时:2753ms
8KB
生产10000数据总共耗时:4484ms
offset:187ms
消费10000数据总共耗时:4220ms
10KB
生产10000数据总共耗时:5501ms
offset:190ms
消费10000数据总共耗时:5239ms
20KB
生产10000数据总共耗时:11443ms
offset:207ms
消费10000数据总共耗时:11190ms
100KB
30KB
生产10000数据总共耗时:17048ms
offset:228ms
消费10000数据总共耗时:16810ms

总结

之前的一篇测试结果发现单个消息1秒一万多条消息是没有压力的,超过了可能产生堆积的情况,最明显的区别就是发送时间和处理时间。

这次主要模拟处理消息然后回复ack的场景,发现大打折扣,消息处理只能达到5千多点。毕竟是一来一回这样的情况,在原来的基础上又发了一次。

另外发现当消息的大小不超过1kb的时候是符合上面的测试结果的,但是超过了1kb,发送也会很耗时。

点赞收藏
分类:标签:
查拉图斯特拉说

让世界因你而不同

请先登录,查看6条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

6
6