性能文章>测试Activemq artemis队列生产消费>

测试Activemq artemis队列生产消费原创

265546

@[toc]

概要

ActiveMQ Artemis是一个开源的消息中间件,它实现了JMS规范,支持多种协议和传输方式。它提供了一个高性能、可扩展、可靠的消息传递系统,适用于各种场景,包括云计算、大数据、企业集成等。

  1. 可靠性高:ActiveMQ Artemis使用持久化存储来保证消息的可靠性,如果有任何失败的情况,系统会自动重试。此外,ActiveMQ Artemis还提供了事务支持,保证消息的原子性和一致性。

  2. 性能优秀:ActiveMQ Artemis具有高性能的消息传递能力,支持异步消息传递,可以同时处理数百万个消息。

  3. 高可用性:ActiveMQ Artemis支持主从备份、集群模式等高可用性特性,可以保证消息系统的可用性和稳定性。

  4. 可扩展性强:ActiveMQ Artemis支持水平和垂直扩展,可以根据需求进行扩容,保证系统的可扩展性。

  5. 多种协议支持:ActiveMQ Artemis支持多种协议和传输方式,包括AMQP、OpenWire、STOMP、REST等,可以更好地满足不同场景下的需求。

整体设计流程

从客户端发送到服务端进行消费,主要测试队列的传输情况和服务器消费情况的测试。在测试中,我们将主要发送以下内容:字符串“hello world”、当前时间戳以及一个唯一的UUID。这些信息将作为消息的有效载荷,通过队列进行传输,并由服务器进行接收和消费。通过这种测试,我们可以评估队列系统的传输性能以及服务器的处理能力,以确保在真实环境中能够有效地处理和传递这些信息。

技术代码

客户端

        int cores = Runtime.getRuntime().availableProcessors();
    @Test
    public void testSendMsg() {
      
// String topic = "localhost"+"/multiple";
        String topic = "localhost";
        final MessageHeader header = new MessageHeader(topic);
        String world = new String("hello world");
        final BinaryMessage message = new BinaryMessage(header, world.getBytes());
        try {
      
            Messenger.send(message);
        } catch (final IOException e) {
      
            log.error("error", e);
        }
    }

    @Test
    public void testMultipleSendMsg() throws InterruptedException {
      
        log.info("cores:{}", cores);
        ThreadPoolExecutor threadPoolExecutor
                = new ThreadPoolExecutor(cores+2, cores+2, 60, TimeUnit.SECONDS,
// new LinkedBlockingQueue());
// new ArrayBlockingQueue(10), new BlockPolicy());
                new SynchronousQueue(),new ThreadPoolExecutor.CallerRunsPolicy());
// new SynchronousQueue(), new BlockPolicy());

// String textMessage = MsgConstant.LARGE_MESSAGE;
        long start = System.nanoTime();
        int count = 50000;
        for (int i = 0; i < count; i++) {
      
            threadPoolExecutor.execute(() -> {
      
                testSendMsg();
            });
        }
        log.info("生产{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
        //Wait for sending to complete
// Thread.sleep(500000);
        Thread.currentThread().join();
    }

服务端

@Log4j2
@Component
public class ListenerMq {
      

    private static final String UPDATE_API_STRATEGY_AND_BASKET = "localhost";

    public static AtomicInteger atomicInteger = new AtomicInteger();
    public static AtomicInteger atomicIntegerSingle = new AtomicInteger();
    AtomicReference<Long> start = new AtomicReference<>(0L);
    int cores = Runtime.getRuntime().availableProcessors();

    ThreadPoolExecutor threadPoolExecutor
            = new ThreadPoolExecutor(cores + 2, cores + 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());
// new ArrayBlockingQueue(10), new BlockPolicy());
// new SynchronousQueue(), new BlockPolicy());
// new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static int count = 50000;

    @PostConstruct
    public void receiveApiStrategyCache() {
      

        log.info("====================receiveApiStrategyCache========cores{}============", cores);
        Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET, new MessageConsumer() {
      
            @Override
            public void onMessage(final Message<?> message) {
      
                if (message instanceof BinaryMessage) {
      
                    try {
      
                        final BinaryMessage msg = (BinaryMessage) message;
                        final MessageHeader header = msg.getHeader();
                        final String payload = new String(msg.getPayload());
                        long timestamp = header.getTimestamp();
                        log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
                        if (atomicIntegerSingle.get() == 0) {
      
                            start.set(System.nanoTime());
                        }
                        if (atomicIntegerSingle.incrementAndGet() >= count) {
      
                            log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
                        }
                    } catch (Exception e) {
      
                        log.error(e);
                    }
                }
            }

            @Override
            public void onError(final Message<?> message) {
      
                log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
            }
        });
        Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET+"/multiple", new MessageConsumer() {
      
            @Override
            public void onMessage(final Message<?> message) {
      
                if (message instanceof BinaryMessage) {
      
                    threadPoolExecutor.execute(() -> {
      
                        final BinaryMessage msg = (BinaryMessage) message;
                        final MessageHeader header = msg.getHeader();
// final String payload = new String(msg.getPayload());
                        long timestamp = header.getTimestamp();
                        log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
                        if (atomicInteger.get() == 0) {
      
                            start.set(System.nanoTime());
                        }
                        if (atomicInteger.incrementAndGet() >= count) {
      
                            log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
                        }
                    });
                }
            }

            @Override
            public void onError(final Message<?> message) {
      
                log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
            }
        });
    }

    public static void main(String[] args) {
      
        for (int i = 0; i < 3; i++) {
      
            atomicInteger.get();
            int i1 = atomicInteger.incrementAndGet();
            System.out.println(i1);
        }
    }

}

测试结果

1万数据

异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
10000 mac 6 12+2 971ms centos 8 1 1046ms 471ms
10000 mac 6 12+2 911ms centos 8 1 735ms 199ms
10000 mac 6 12+2 902ms centos 8 1 674ms 162ms
10000 mac 6 12+2 810ms centos 8 8+2 542ms  
10000 mac 6 12+2 832ms centos 8 8+2 628ms  
10000 mac 6 12+2 888ms centos 8 8+2 674ms  

异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
10000 mac 6 12+2 922ms centos 8 1 758ms 244ms
10000 mac 6 12+2 922ms centos 8 1 625ms 96ms
10000 mac 6 12+2 1101ms centos 8 1 742ms 182ms
10000 mac 6 12+2 1134ms centos 8 8+2 694ms 90ms
10000 mac 6 12+2 1055ms centos 8 8+2 773ms 216ms
10000 mac 6 12+2 956ms centos 8 8+2 811ms 206ms
10000 mac 6 10 1057ms mac 6 20 1224ms 659ms
10000 mac 6 10 965ms mac 6 20 1020ms 438ms
10000 mac 6 10 996ms mac 6 20 1359ms 792ms

2万数据

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
20000 mac 6 12+2 1355ms centos 8 8+2 2093ms 1139ms
20000 mac 6 12+2 1442ms centos 8 8+2 2821ms 1728ms

3万数据

异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
30000 mac 6 12+2 1841ms centos 8 1 6181ms 4757ms
30000 mac 6 12+2 1874ms centos 8 1 6707ms 5356ms
30000 mac 6 12+2 1849ms centos 8 1 5548ms 4087ms
30000 mac 6 12+2 2028ms centos 8 1 6852ms 5238ms
30000 mac 6 12+2 1617ms centos 8 1 5644ms 4526ms
30000 mac 6 12+2 2049ms centos 8 1 6872ms 5289ms
30000 mac 6 12+2 1925ms centos 8 8+2 1685ms  
30000 mac 6 12+2 1639ms centos 8 8+2 1442ms  
30000 mac 6 12+2 1639ms centos 8 8+2 1489ms  

异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
30000 mac 6 12+2 1937ms centos 8 1 5592ms 4088ms
30000 mac 6 12+2 2243ms centos 8 1 6245ms 4563ms
30000 mac 6 12+2 1953ms centos 8 1 6874ms 5409ms
30000 mac 6 12+2 1905ms centos 8 8+2 6490ms 4975ms
30000 mac 6 12+2 2020ms centos 8 8+2 5307ms 3857ms
30000 mac 6 12+2 1811ms centos 8 8+2 3847ms 5329ms
30000 mac 6 10 1980ms centos 8 20 7113ms 5523ms
30000 mac 6 10 2059ms centos 8 20 7730ms 6289ms
30000 mac 6 10 2286ms centos 8 20 8035ms 6376ms
30000 mac 6 10 1539ms mac 6 20 5623ms 4551ms
30000 mac 6 10 1678ms mac 6 20 3520ms 2264ms
30000 mac 6 10 2258ms mac 6 20 3440ms 1643ms

5万数据

异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
50000 mac 6 12+2 2752ms centos 8 1 12130ms 9964ms
50000 mac 6 12+2 2690ms centos 8 1 13970ms 11699ms
50000 mac 6 12+2 2460ms centos 8 1 13399ms 11294ms
50000 mac 6 12+2 2285ms centos 8 8+2 2345ms  
50000 mac 6 12+2 2420ms centos 8 8+2 2475ms  
50000 mac 6 12+2 2014ms centos 8 8+2 2382ms  
80000 mac 6 12+2 2891ms centos 8 8+2 4542ms  
60000 mac 6 12+2 3179ms centos 8 8+2 3241ms  
70000 mac 6 12+2 2610ms centos 8 8+2 3179ms  

异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。

数据量 是否处理消息 客户端类型 cpu 线程数 发送耗时 服务器端类型 cpu 线程数 接收耗时 最后消息延时
50000 mac 6 12+2 2681ms centos 8 1 12319ms 10197ms
50000 mac 6 12+2 2577ms centos 8 1 13705ms 11529ms
50000 mac 6 12+2 2752ms centos 8 1 14943ms 12713ms
50000 mac 6 12+2 2750ms centos 8 8+2 13109ms 10797ms
50000 mac 6 12+2 2730ms centos 8 8+2 13906ms 10681ms
50000 mac 6 12+2 2102ms centos 8 8+2 11829ms 8540ms
50000 mac 6 10 3343ms centos 8 20 17352ms 14520ms
50000 mac 6 10 2802ms centos 8 20 15144ms 12736ms
50000 mac 6 10 2836ms mac 6 20 6517ms 4089ms
50000 mac 6 12 3403ms mac 6 12 6027ms 3183ms

总结

根据测试结果,我们可以得出以下结论:

  • 在考虑消费情况时,1万数据量可以轻松处理,没有任何压力,但这只是单机测试的结果,只能作为参考。
  • 如果消费没有耗时,3万数据量也可以处理,但前提是所有消费都堆积在缓存中。

因此,在实际应用中,需要根据具体场景和需求来确定数据量的大小和处理方式。同时,还需要考虑系统的其他性能指标,如响应时间、错误率等来综合评估系统的性能表现。

点赞收藏
查拉图斯特拉说

让世界因你而不同

请先登录,查看4条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

日常Bug排查-连接突然全部关闭

日常Bug排查-连接突然全部关闭

6
4