@[toc]
概要
ActiveMQ Artemis是一个开源的消息中间件,它实现了JMS规范,支持多种协议和传输方式。它提供了一个高性能、可扩展、可靠的消息传递系统,适用于各种场景,包括云计算、大数据、企业集成等。
-
可靠性高:ActiveMQ Artemis使用持久化存储来保证消息的可靠性,如果有任何失败的情况,系统会自动重试。此外,ActiveMQ Artemis还提供了事务支持,保证消息的原子性和一致性。
-
性能优秀:ActiveMQ Artemis具有高性能的消息传递能力,支持异步消息传递,可以同时处理数百万个消息。
-
高可用性:ActiveMQ Artemis支持主从备份、集群模式等高可用性特性,可以保证消息系统的可用性和稳定性。
-
可扩展性强:ActiveMQ Artemis支持水平和垂直扩展,可以根据需求进行扩容,保证系统的可扩展性。
-
多种协议支持:ActiveMQ Artemis支持多种协议和传输方式,包括AMQP、OpenWire、STOMP、REST等,可以更好地满足不同场景下的需求。
整体设计流程
从客户端发送到服务端进行消费,主要测试队列的传输情况和服务器消费情况的测试。在测试中,我们将主要发送以下内容:字符串“hello world”、当前时间戳以及一个唯一的UUID。这些信息将作为消息的有效载荷,通过队列进行传输,并由服务器进行接收和消费。通过这种测试,我们可以评估队列系统的传输性能以及服务器的处理能力,以确保在真实环境中能够有效地处理和传递这些信息。
技术代码
客户端
int cores = Runtime.getRuntime().availableProcessors();
@Test
public void testSendMsg() {
// String topic = "localhost"+"/multiple";
String topic = "localhost";
final MessageHeader header = new MessageHeader(topic);
String world = new String("hello world");
final BinaryMessage message = new BinaryMessage(header, world.getBytes());
try {
Messenger.send(message);
} catch (final IOException e) {
log.error("error", e);
}
}
@Test
public void testMultipleSendMsg() throws InterruptedException {
log.info("cores:{}", cores);
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(cores+2, cores+2, 60, TimeUnit.SECONDS,
// new LinkedBlockingQueue());
// new ArrayBlockingQueue(10), new BlockPolicy());
new SynchronousQueue(),new ThreadPoolExecutor.CallerRunsPolicy());
// new SynchronousQueue(), new BlockPolicy());
// String textMessage = MsgConstant.LARGE_MESSAGE;
long start = System.nanoTime();
int count = 50000;
for (int i = 0; i < count; i++) {
threadPoolExecutor.execute(() -> {
testSendMsg();
});
}
log.info("生产{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
//Wait for sending to complete
// Thread.sleep(500000);
Thread.currentThread().join();
}
服务端
@Log4j2
@Component
public class ListenerMq {
private static final String UPDATE_API_STRATEGY_AND_BASKET = "localhost";
public static AtomicInteger atomicInteger = new AtomicInteger();
public static AtomicInteger atomicIntegerSingle = new AtomicInteger();
AtomicReference<Long> start = new AtomicReference<>(0L);
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(cores + 2, cores + 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
// new ArrayBlockingQueue(10), new BlockPolicy());
// new SynchronousQueue(), new BlockPolicy());
// new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
public static int count = 50000;
@PostConstruct
public void receiveApiStrategyCache() {
log.info("====================receiveApiStrategyCache========cores{}============", cores);
Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET, new MessageConsumer() {
@Override
public void onMessage(final Message<?> message) {
if (message instanceof BinaryMessage) {
try {
final BinaryMessage msg = (BinaryMessage) message;
final MessageHeader header = msg.getHeader();
final String payload = new String(msg.getPayload());
long timestamp = header.getTimestamp();
log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
if (atomicIntegerSingle.get() == 0) {
start.set(System.nanoTime());
}
if (atomicIntegerSingle.incrementAndGet() >= count) {
log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
}
} catch (Exception e) {
log.error(e);
}
}
}
@Override
public void onError(final Message<?> message) {
log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
}
});
Messenger.subscribe(UPDATE_API_STRATEGY_AND_BASKET+"/multiple", new MessageConsumer() {
@Override
public void onMessage(final Message<?> message) {
if (message instanceof BinaryMessage) {
threadPoolExecutor.execute(() -> {
final BinaryMessage msg = (BinaryMessage) message;
final MessageHeader header = msg.getHeader();
// final String payload = new String(msg.getPayload());
long timestamp = header.getTimestamp();
log.info("offset:{}ms", System.currentTimeMillis() - timestamp);
if (atomicInteger.get() == 0) {
start.set(System.nanoTime());
}
if (atomicInteger.incrementAndGet() >= count) {
log.info("消费{}数据总共耗时:{}ms", count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start.get()));
}
});
}
}
@Override
public void onError(final Message<?> message) {
log.error("from TOPIC {} error: {}", message.getHeader().getTopic(), message.getPayload());
}
});
}
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
atomicInteger.get();
int i1 = atomicInteger.incrementAndGet();
System.out.println(i1);
}
}
}
测试结果
1万数据
异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
10000 | 否 | mac | 6 | 12+2 | 971ms | centos | 8 | 1 | 1046ms | 471ms |
10000 | 否 | mac | 6 | 12+2 | 911ms | centos | 8 | 1 | 735ms | 199ms |
10000 | 否 | mac | 6 | 12+2 | 902ms | centos | 8 | 1 | 674ms | 162ms |
10000 | 否 | mac | 6 | 12+2 | 810ms | centos | 8 | 8+2 | 542ms | |
10000 | 否 | mac | 6 | 12+2 | 832ms | centos | 8 | 8+2 | 628ms | |
10000 | 否 | mac | 6 | 12+2 | 888ms | centos | 8 | 8+2 | 674ms |
异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
10000 | 是 | mac | 6 | 12+2 | 922ms | centos | 8 | 1 | 758ms | 244ms |
10000 | 是 | mac | 6 | 12+2 | 922ms | centos | 8 | 1 | 625ms | 96ms |
10000 | 是 | mac | 6 | 12+2 | 1101ms | centos | 8 | 1 | 742ms | 182ms |
10000 | 是 | mac | 6 | 12+2 | 1134ms | centos | 8 | 8+2 | 694ms | 90ms |
10000 | 是 | mac | 6 | 12+2 | 1055ms | centos | 8 | 8+2 | 773ms | 216ms |
10000 | 是 | mac | 6 | 12+2 | 956ms | centos | 8 | 8+2 | 811ms | 206ms |
10000 | 是 | mac | 6 | 10 | 1057ms | mac | 6 | 20 | 1224ms | 659ms |
10000 | 是 | mac | 6 | 10 | 965ms | mac | 6 | 20 | 1020ms | 438ms |
10000 | 是 | mac | 6 | 10 | 996ms | mac | 6 | 20 | 1359ms | 792ms |
2万数据
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
20000 | 是 | mac | 6 | 12+2 | 1355ms | centos | 8 | 8+2 | 2093ms | 1139ms |
20000 | 是 | mac | 6 | 12+2 | 1442ms | centos | 8 | 8+2 | 2821ms | 1728ms |
3万数据
异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
30000 | 否 | mac | 6 | 12+2 | 1841ms | centos | 8 | 1 | 6181ms | 4757ms |
30000 | 否 | mac | 6 | 12+2 | 1874ms | centos | 8 | 1 | 6707ms | 5356ms |
30000 | 否 | mac | 6 | 12+2 | 1849ms | centos | 8 | 1 | 5548ms | 4087ms |
30000 | 否 | mac | 6 | 12+2 | 2028ms | centos | 8 | 1 | 6852ms | 5238ms |
30000 | 否 | mac | 6 | 12+2 | 1617ms | centos | 8 | 1 | 5644ms | 4526ms |
30000 | 否 | mac | 6 | 12+2 | 2049ms | centos | 8 | 1 | 6872ms | 5289ms |
30000 | 否 | mac | 6 | 12+2 | 1925ms | centos | 8 | 8+2 | 1685ms | |
30000 | 否 | mac | 6 | 12+2 | 1639ms | centos | 8 | 8+2 | 1442ms | |
30000 | 否 | mac | 6 | 12+2 | 1639ms | centos | 8 | 8+2 | 1489ms |
异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
30000 | 是 | mac | 6 | 12+2 | 1937ms | centos | 8 | 1 | 5592ms | 4088ms |
30000 | 是 | mac | 6 | 12+2 | 2243ms | centos | 8 | 1 | 6245ms | 4563ms |
30000 | 是 | mac | 6 | 12+2 | 1953ms | centos | 8 | 1 | 6874ms | 5409ms |
30000 | 是 | mac | 6 | 12+2 | 1905ms | centos | 8 | 8+2 | 6490ms | 4975ms |
30000 | 是 | mac | 6 | 12+2 | 2020ms | centos | 8 | 8+2 | 5307ms | 3857ms |
30000 | 是 | mac | 6 | 12+2 | 1811ms | centos | 8 | 8+2 | 3847ms | 5329ms |
30000 | 是 | mac | 6 | 10 | 1980ms | centos | 8 | 20 | 7113ms | 5523ms |
30000 | 是 | mac | 6 | 10 | 2059ms | centos | 8 | 20 | 7730ms | 6289ms |
30000 | 是 | mac | 6 | 10 | 2286ms | centos | 8 | 20 | 8035ms | 6376ms |
30000 | 是 | mac | 6 | 10 | 1539ms | mac | 6 | 20 | 5623ms | 4551ms |
30000 | 是 | mac | 6 | 10 | 1678ms | mac | 6 | 20 | 3520ms | 2264ms |
30000 | 是 | mac | 6 | 10 | 2258ms | mac | 6 | 20 | 3440ms | 1643ms |
5万数据
异步接收
只考虑接收的情况,假定队列消费很顺畅的情况,不存在处理消息导致消费耗时的情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
50000 | 否 | mac | 6 | 12+2 | 2752ms | centos | 8 | 1 | 12130ms | 9964ms |
50000 | 否 | mac | 6 | 12+2 | 2690ms | centos | 8 | 1 | 13970ms | 11699ms |
50000 | 否 | mac | 6 | 12+2 | 2460ms | centos | 8 | 1 | 13399ms | 11294ms |
50000 | 否 | mac | 6 | 12+2 | 2285ms | centos | 8 | 8+2 | 2345ms | |
50000 | 否 | mac | 6 | 12+2 | 2420ms | centos | 8 | 8+2 | 2475ms | |
50000 | 否 | mac | 6 | 12+2 | 2014ms | centos | 8 | 8+2 | 2382ms | |
80000 | 否 | mac | 6 | 12+2 | 2891ms | centos | 8 | 8+2 | 4542ms | |
60000 | 否 | mac | 6 | 12+2 | 3179ms | centos | 8 | 8+2 | 3241ms | |
70000 | 否 | mac | 6 | 12+2 | 2610ms | centos | 8 | 8+2 | 3179ms |
异步消费
这里主要是边处理消息和边统计的情况,假定按照正常的消费的耗时情况。
数据量 | 是否处理消息 | 客户端类型 | cpu | 线程数 | 发送耗时 | 服务器端类型 | cpu | 线程数 | 接收耗时 | 最后消息延时 |
---|---|---|---|---|---|---|---|---|---|---|
50000 | 是 | mac | 6 | 12+2 | 2681ms | centos | 8 | 1 | 12319ms | 10197ms |
50000 | 是 | mac | 6 | 12+2 | 2577ms | centos | 8 | 1 | 13705ms | 11529ms |
50000 | 是 | mac | 6 | 12+2 | 2752ms | centos | 8 | 1 | 14943ms | 12713ms |
50000 | 是 | mac | 6 | 12+2 | 2750ms | centos | 8 | 8+2 | 13109ms | 10797ms |
50000 | 是 | mac | 6 | 12+2 | 2730ms | centos | 8 | 8+2 | 13906ms | 10681ms |
50000 | 是 | mac | 6 | 12+2 | 2102ms | centos | 8 | 8+2 | 11829ms | 8540ms |
50000 | 是 | mac | 6 | 10 | 3343ms | centos | 8 | 20 | 17352ms | 14520ms |
50000 | 是 | mac | 6 | 10 | 2802ms | centos | 8 | 20 | 15144ms | 12736ms |
50000 | 是 | mac | 6 | 10 | 2836ms | mac | 6 | 20 | 6517ms | 4089ms |
50000 | 是 | mac | 6 | 12 | 3403ms | mac | 6 | 12 | 6027ms | 3183ms |
总结
根据测试结果,我们可以得出以下结论:
- 在考虑消费情况时,1万数据量可以轻松处理,没有任何压力,但这只是单机测试的结果,只能作为参考。
- 如果消费没有耗时,3万数据量也可以处理,但前提是所有消费都堆积在缓存中。
因此,在实际应用中,需要根据具体场景和需求来确定数据量的大小和处理方式。同时,还需要考虑系统的其他性能指标,如响应时间、错误率等来综合评估系统的性能表现。