Skip to content

Latest commit

 

History

History
146 lines (123 loc) · 4.17 KB

2024-03-15-kafka入门使用指南.md

File metadata and controls

146 lines (123 loc) · 4.17 KB
title categories
kafka入门使用指南
工具

kafka入门使用指南

安装

使用的kafka自带的kraft启动服务,这样就不依赖zookeeper进行集群管理和数据存储了。

docker-compose.yaml 文件如下,执行前先创建好${PWD}/data文件夹,并赋予777权限

version: '3'
services:
  kafka:
    image: 'bitnami/kafka:3.7.0'
    container_name: kafka
    volumes:
      - ${PWD}/data:/bitnami/kafka/data
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.132.131:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - '9092:9092'
      - '9093:9093'

基础使用

命令行方式

进入到容器的kafka/bin目录下

docker exec -it kafka bash
cd /opt/bitnami/kafka/bin

获取当前topic列表

sh kafka-topics.sh --bootstrap-server 192.168.132.131:9092 --list 

创建topic

sh kafka-topics.sh --bootstrap-server 192.168.132.131:9092 --create --replication-factor 1 --partitions 1 --topic DEMO_01

replication-factor 副本数量,表示每个主题在 Kafka 集群中保留的副本数量。
partitions 分区数量,分区是 Kafka 中用于将数据分散存储在多个节点上的机制

这里都写成1,生产使用可根据具体情况设置

代码方式

pom依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

application.yaml

# 应用服务 WEB 访问端口
server.port: 8080

spring:
  kafka:
    bootstrap-servers: 192.168.132.131:9092
    producer:
      acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
      retries: 3 # 重试次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
    consumer:
      auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 消息的 value 的反序列化
      properties:
        spring.json.trusted.packages: top.sintang.kafkademo.domain
    listener:
      # ack-mode: manual_immediate # 手动确认消息消费
      missing-topics-fatal: false # 消费者组中不存在 topic 时是否报错

kafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    @Bean
    public KafkaAdmin kafkaAdmin(){
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(config);
    }
}

使用代码获取topic列表

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Test
    public void showTopicList(){
        AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        try {
            listTopicsResult.names().get().forEach(System.out::println);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

    }

使用代码创建topic

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Test
    public void test(){
        kafkaAdmin.createOrModifyTopics(new NewTopic("DEMO_02",1, (short) 1));
    }

本文代码: https://github.com/javajianghu/javajianghu-code/tree/main/kafka/kafka-demo