训练营学员

傲来操作系统训练营报告

Arm平台上的Kafka测试

刘玥骁 发布于 # 2024 年第一期

1. 我的任务

我的任务是进行大数据组件Kafka在ARM架构Kunpeng 920服务器和RISC-V架构SG2042服务器上的功能支持与性能测试。我完成了Kafka在Kunpeng 920服务器上的性能测试,记录了表格中要求的所有性能数据,并进行了Topic相关的功能测试;然而在RISC-V架构遇见了JDK环境问题导致Kafka无法运行,没有进行性能测试和功能测试。

2. 前置知识

2.1 消息队列

kafka是一个分布式的基于发布/订阅模式的消息队列,我通过阅读博客和实践,重点了解了消息队列概念、Kafka简介和在性能测试中使用到的逻辑概念topic,内容如下:

消息队列就是将需要传输的数据存放在队列中,可用于解耦复杂系统,属于生产者-消费者模型,有以下两种模式:

  1. 点对点模式:每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;image-20240701225825908

  2. 发布/订阅模式:每个消息可以有多个订阅者;发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;image-20240701225835987

2.2 Kafka简介

我们通常将Apache Kafka用在两类程序:1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据;2. 构建实时流应用程序,以转换或响应数据流。模型如下:image-20240701225859103

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。

  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。

  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到数据库中。

  4. Stream Processors:流处理器可以Kafka拉取数据,也可以将数据写入到Kafka中。

Kafka 的 topic 是一个逻辑概念,用于对消息进行分类。在 Kafka 中,消息以流的形式组织,而 topic 就是这些消息流的名称。每个 topic 可以被视为一个消息类别或者消息队列。用途如下:

  1. 消息分类:topic 允许你将消息按照某种特定的类别进行分组。例如,一个公司的 Kafka 集群可能会有多个 topic,分别用于存储用户行为日志、系统监控数据、交易记录等。

  2. 数据持久化:Kafka 是一个持久化的消息系统,存储在 topic 中的消息会保留在磁盘上,直到它们被消费或者超过了设置的保留策略。

  3. 分区:为了能够横向扩展和并行处理,每个 Kafka topic 可以被分割成多个分区(partitions)。分区允许多个生产者和消费者并发地读写数据,提高了系统的吞吐量。

  4. 复制:为了提高数据的可靠性和可用性,每个分区可以被复制到多个 broker 上。Kafka 通过复制因子(replication factor)来定义每个分区的副本数量。

  5. 消费者组:Kafka 支持消费者组(consumer groups)的概念,允许多个消费者实例协调工作,共同消费同一个 topic 中的消息。

  6. 消息顺序:在同一个分区内,Kafka 保证消息的顺序。如果应用需要严格的顺序保证,可以通过单分区或者有序的分区键来实现。

  7. 保留策略:Kafka 允许你设置消息的保留时间或大小,超过这个时间或大小的消息将被删除。这有助于控制存储使用。

  8. 创建和删除:你可以使用 Kafka 提供的命令行工具或 API 来创建和删除 topic。

  9. 配置:topic 可以有自己的配置,如保留时间、清理策略、最小和最大消息大小等。

3. ARM架构

3.1 安装Kafka和Zookeeper

  1. 安装与解压
wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -zxvf kafka_2.12-2.2.0.tgz
#openjdk使用的毕昇自带的8u412
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
  1. 在~/.bash_profile中添加环境变量
export KAFKA_HOME=~/kafka_2.12-2.2.0
export PATH=$KAFKA_HOME/bin:$PATH
export ZOOKEEPER_HOME=~/zookeeper-3.4.6
export PATH=$ZOOKEEPER_HOME/bin:$PATH
  1. 配置config/server.properties,修改的内容如下:

image-20240628232627644

3.2 运行Kafka

  1. 先启动Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties

  2. 然后启动Kafka服务:bin/kafka-server-start.sh config/server.properties

image-20240628215828665

  1. 测试是否成功启动:在服务器上打开一个生产者,然后把输入的每行数据发送到Kafka中,再打开一个消费者,当有数据往Kafka的test主题发送消息,这边就会进行消费。

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    #后面光标提示数据数据,然后回车就会发送到kafka中了
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    image-20240628220021575

    image-20240628220055254

3.3 性能测试

参考教程

  1. 创建topic:创建名为test-rep-one,分区为6,复制因子为1的主题。数据的保留时间为1小时。

    $ bin/kafka-topics.sh \
    --create \
    --topic test-rep-one \
    --partitions 6 \
    --replication-factor 1 \
    --config retention.ms=86400000 \
    --bootstrap-server localhost:9092
    
  2. 使用不同的生产者性能测试脚本。以下示例将使用上面创建的主题来存储300万条数据,每条消息的大小为1KB。-throughput设为-1时表示消息会尽快生成。

    $ bin/kafka-producer-perf-test.sh \
    --topic test-rep-one \
    --throughput -1 \
    --num-records 3000000 \
    --record-size 1024 \
    --producer-props acks=all bootstrap.servers=localhost:9092
    

    image-20240628221218147

  3. 测试消费者性能。这样根据第2、3步,得到了非公共的性能指标(黄色部分)。

    $ bin/kafka-consumer-perf-test.sh \
    --topic test-rep-one \
    --broker-list localhost:9092 \
    --messages 3000000
    

image-20240628222317161image-20240701230418479image-20240701230426540

  1. 加入公共指标:对于kafka-producer-perf-test.sh和kafka-consumer-perf-test.sh,将第2、3步的命令添加在脚本后即可;而对于kafka-topics.sh,我选择了列举topics的命令进行测试:performance_counter_920.sh "bin/kafka-topics.sh --list --bootstrap-server localhost:9092" ./。因为每次测试的结果都不同,我未记录第2、3步中的结果,公共和非公共性能指标均记录自本次结果,运行截图如下:

    1. kafka-topics.sh

      image-20240628232228635

    2. kafka-producer-perf-test.sh:image-20240628230657507

    3. kafka-consumer-perf-test.sh:image-20240628223551680

因为组长在bash_profile配了这个,所以系统 会在该路径找sh脚本,就无需进入performance目录下执行了image-20240629002535910

3.3.1 测试结果

比较类目CPUARM(鲲鹏920-7260)
kafka-topics.shduration_time1780170866
kafka-topics.shtask-clock3267.100
kafka-topics.shcycles7202976617
kafka-topics.shinstructions7892467747
kafka-topics.shcache-references2260246244
kafka-topics.shcache-misses35076202
kafka-topics.shbranches<not
kafka-topics.shbranch-misses55787680
kafka-topics.shL1-dcache-loads2260246244
kafka-topics.shL1-dcache-load-misses35076202
kafka-topics.shLLC load misses1992446
kafka-topics.shLLC load31061877
kafka-topics.shIPC1.095
kafka-producer-perf-test.shduration_time20618526199
kafka-producer-perf-test.shtask-clock65506.01
kafka-producer-perf-test.shcycles150698855761
kafka-producer-perf-test.shinstructions151115983122
kafka-producer-perf-test.shcache-references40041625736
kafka-producer-perf-test.shcache-misses590104492
kafka-producer-perf-test.shbranches<not
kafka-producer-perf-test.shbranch-misses373990050
kafka-producer-perf-test.shL1-dcache-loads40041625736
kafka-producer-perf-test.shL1-dcache-load-misses590104492
kafka-producer-perf-test.shLLC load misses146267539
kafka-producer-perf-test.shLLC load482275224
kafka-producer-perf-test.shIPC1.002
kafka-producer-perf-test.shrecords sent3000000 records sent
kafka-producer-perf-test.shaverage throughput155771.327691 records/sec
kafka-producer-perf-test.shthroughput rate152.12MB/sec
kafka-producer-perf-test.shavg latency191.20ms
kafka-producer-perf-test.shmax latency350.00ms
kafka-producer-perf-test.sh50th latency182ms
kafka-producer-perf-test.sh95th latency272ms
kafka-producer-perf-test.sh99th latency316ms
kafka-producer-perf-test.sh99.9th latency348ms
kafka-consumer-perf-test.shduration_time8571797759
kafka-consumer-perf-test.shtask-clock36013.13
kafka-consumer-perf-test.shcycles78698107896
kafka-consumer-perf-test.shinstructions45077685143
kafka-consumer-perf-test.shcache-references13658220619
kafka-consumer-perf-test.shcache-misses26480514
kafka-consumer-perf-test.shbranches<not
kafka-consumer-perf-test.shbranch-misses235871806
kafka-consumer-perf-test.shL1-dcache-loads13658220619
kafka-consumer-perf-test.shL1-dcache-load-misses264580514
kafka-consumer-perf-test.shLLC load misses59076086
kafka-consumer-perf-test.shLLC load300456781
kafka-consumer-perf-test.shIPC0.572
kafka-consumer-perf-test.shaverage throughput428877.7698 records/sec
kafka-consumer-perf-test.shthroughput rate418.8259MB/sec

3.4 功能测试

因为时间的原因,我只测试了topic相关的部分功能,运行截图如下:

image-20240629110237598

image-20240629110257968

image-20240629110422825

image-20240629111347623

3.4.1 测试结果

CPU功能测试项introductionnotecommandPASSARM-result
topics
topics alter修改topic修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报错bin/kafka-topics.sh —alter —bootstrap-server localhost:9092 —topic test-rep-one —partitions 8Y没有返回值,则说明修改成功
topics config创建topic的覆盖配置bin/kafka-topics.sh —create —topic test-rep-one —partitions 6 —replication-factor 1 —config retention.ms=86400000 —bootstrap-server localhost:9092Y
topics create创建topicbin/kafka-topics.sh —create —topic test-rep-one —partitions 6 —replication-factor 1 —config retention.ms=86400000 —bootstrap-server localhost:9092Y没有返回值,则说明创建成功
topics delete删除topicbin/kafka-topics.sh —delete —bootstrap-server localhost:9092 —topic test-rep-twoY没有返回值,则说明删除成功
topics delete-config删除topic的覆盖配置bin/kafka-topics.sh —zookeeper localhost:9092 —alter —topic test-rep-one —delete-config retention.msY
topics describe查看指定topic明细bin/kafka-topics.sh —describe —bootstrap-server localhost:9092 —topic test-rep-oneYTopic:test-rep-one PartitionCount:6 ReplicationFactor:1 Configs:segment.bytes=1073741824,retention.ms=86400000 Topic: test-rep-one Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: test-rep-one Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-rep-one Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Topic: test-rep-one Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Topic: test-rep-one Partition: 4 Leader: 1 Replicas: 1 Isr: 1 Topic: test-rep-one Partition: 5 Leader: 1 Replicas: 1 Isr: 1
topics help显示kafka-topics.sh的帮助信息bin/kafka-topics.sh —helpY太长了,为了格式已删除
topics list查看 Topic 列表bin/kafka-topics.sh —list —bootstrap-server localhost:9092Y__consumer_offsets test test-rep-one
topics partitions指定分区数bin/kafka-topics.sh —create —topic test-rep-one —partitions 6 —replication-factor 1 —config retention.ms=86400000 —bootstrap-server localhost:9092Y没有返回值,则说明创建成功
topics replica-assignment
topics replication-factor指定复制因子bin/kafka-topics.sh —create —topic test-rep-one —partitions 6 —replication-factor 1 —config retention.ms=86400000 —bootstrap-server localhost:9092Y没有返回值,则说明创建成功
topics topic
topics topics-with-overrides
topics unavailable-partitions
topics under-replicated-partitions
topics zookeeperbin/kafka-topics.sh —zookeeper localhost:9092 —alter —topic test-rep-one —delete-config retention.msY

3.5 问题与解决

  1. 安装完kafka集群后,程序创建topic时报错could not be established. Broker may not be available:https://www.cnblogs.com/charkey/p/16143422.html

  2. kafka启动出错:原因是没有杀死进程https://blog.csdn.net/qq_38612955/article/details/81206284

4. RISC-V架构

4.1 问题

环境配置上除了config/server.properties中的IP地址不同,其他都一样。

在运行时首先遇见了”egrep is obsolescent using grep -e”的报错信息,将kafka-run-class.sh中的egrep替换成了grep -E后解决。

然后就一直有cannot execute binary file: Exec format error的报错,使用su root无法解决。应该是使用了其他架构编译好的二进制包的问题,试图下载gradle编译源代码,发现gradle也有环境不兼容的问题。可以看到是因为RV上的jdk环境有问题。

image-20240629005025167

发现虽然有自带的jdk环境,但是使用不了:image-20240629125404861

试图安装欧拉镜像的jdk,然后替换环境变量,但是安装rpm有问题,最后失败,未配置好Kafka环境。

image-20240629103345529

4.2 部分解决

4.1 问题与解决

  1. rv环境下用不了ls等命令:直接执行:export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

  2. 使用grep查找文件内容,定位问题:

image-20240629004728952

5. 实验总结

我的心得是在不同架构下部署同一框架是困难的,不同指令集与体系架构都有不同的特性,这也是做对比实验的意义所在。

我的不足是对RISC-V环境下软件的部署与运行依然较陌生,比如交叉编译;以及时间管理有问题,导致功能测试只做了一小部分,也没有写自动化脚本。