SpringBoot_Kafka


SpringBoot_Kafka

第一步

  • pom.xml 引入对应的架包
<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

第二步

  • 配置文件添加配置
spring:
  application:
    name: springboot-kafka

  kafka:
    # 集群地址,多个使用逗号隔开
    bootstrap-servers: 192.168.234.128:9092,192.168.234.128:9093,192.168.234.128:9094
    # 消费的主题,通过配置项来实现,多个逗号隔开,中间不要有空格
    topics: topic-1,topic-2,topic-3
    producer:
      # 重试次数,producer 两次重试之间会停顿一段时间,以防止频繁地重试对系统带来冲击。这段时间是可以配置的,由参数 retry.backoff.ms 指定,默认是 100 毫秒
      # 重试可能造成消息的重复发送,为了应对这一风险, Kafka 要求用户在 consumer 端必须执行去重处理
      # 重试可能造成消息的乱序,producer 提供了 max.in.flight.requets.per.connection 参数 一旦用户将此参数设置成 1, producer 将确保某一时刻只能发送一个请求
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      # acks=0 : producer 不等kafka是否接收成功,立即开始其他工作 -> 提高吞吐量,单会丢失数据
      # acks=1 : producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果 producer ,而无须等待 ISR 中其他副本写入该消息。能保证吞吐量,也能保证一定的持久性。
      # acks=all/-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待 ISR 中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给producer。吞吐量极低,但不会丢失数据。
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      group-id: springboot-kafka
      # 是否自动提交 consumer offset,批量的时候要改为 false,代码手动维护 offset 的也需要设置成 false
      enable-auto-commit: false
      # 假设你首次运行一个 consumer group 并且指定从头消费。显然该 group 会从头消费所有数据,因为此时该 group 还没有任何位移信息。一旦该 group 成功提交位移后,你重启了 group ,依然指定从头消费。此时你会发现该 group 并不会真的从头消费,因为 Kafka 己经保存了该 group位移信息,因此它会无视 auto.offset.reset 的设置。
      # latest(默认值)指定从最新处位移开始消费
      # earliest :指定从最早的位移开始消费,注意这里最早的位移不一定就是0
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量消费的条数
      max-poll-records: 50

    listener:
      # 设置消费的并发数
      concurrency: 3
      # RECORD
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # MANUAL
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL_IMMEDIATE
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      ack-mode: manual_immediate
      # 批量消费
      type: batch

第三步

  • 定义消息发送接口
@Component
@AllArgsConstructor
@Slf4j
public class KafkaProvider {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessageData(String topic, String data) {
        try {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("消息发送失败:" +ex.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    System.out.println("消息发送成功 :" + result);
                }
            });
            System.out.println("生产消息至Kafka: " + data);
        } catch (Exception e) {
            log.error("出错!!!!!!!!!!!" + e.getMessage());
            e.printStackTrace();
        }
        System.out.println("线程: {} " + Thread.currentThread().getName());
    }
}

第四步

  • 定义消息的消费者
@Component
@Slf4j
public class KafkaConsumer {

    /**
     * 引入 Kafka 的死信队列
     * 这里使用的是 Kafka 默认的监听容器类,也可以使用自定义的,使用自定义的只需要在 @KafkaListener 注解之中引入 containerFactory = “” 指定即可
     * 使用自定义的时候,对应的 Bean 最好做成配置类注入到 Spring 中
     * @param configurer
     * @param kafkaConsumerFactory
     * @param template
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0, 3)));
        return factory;
    }

    /**
     * 指定对应的主题消费
     * @param record
     */
    @KafkaListener(topics = {"topic_1"})
    public void consumer_1(ConsumerRecord<?, ?> record){
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    /**
     * 消费单条信息
     * 主题通过配置类指定消费,并且手动维护数据偏移量
     * @param records
     * @param ack
     */
    @KafkaListener(topics = {"#{'${spring.kafka.topics}'.split(',')}"}, groupId = "${spring.kafka.consumer.group-id}")
    public void consumer_2(ConsumerRecord<?, ?> records, Acknowledgment ack) {
        System.out.println("消费:"+records.topic()+"-"+records.partition()+"-"+records.value());
            Optional<?> kafkaMessage = Optional.ofNullable(records);
            kafkaMessage.ifPresent(u -> {
                try {
                    System.out.println("消费的消息为:" + u);
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("消费异常,错误消息:{},异常信息:{}", u, e.getMessage());
                }
            });
        ack.acknowledge();
    }

    /**
     * 批量消费信息,批量消费的时候需要在配置信息里面添加批量消费的配置
     * 主题通过配置类指定消费,并且手动维护数据偏移量
     * @param records
     * @param ack
     */
    @KafkaListener(id = "consumer_3", topics = {"#{'${spring.kafka.topics}'.split(',')}"}, groupId = "${spring.kafka.consumer.group-id}")
    public void consumer_3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        long startTime = System.currentTimeMillis();
        records.forEach(record -> {
            log.info(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", record.topic(), record.partition(),
                    record.offset(), record.value()));
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            kafkaMessage.ifPresent(u -> {
                try {
                    log.info(Thread.currentThread().getName() + ":" + u);
                    System.out.println("消费的消息为:" + u);
                } catch (Exception e) {
                    log.error("消费异常,错误消息:{},异常信息:{}", u, e.getMessage());
                }
            });
        });
        log.info("data size:{} kafka消费耗时 {}", records.size(), System.currentTimeMillis() - startTime);
        ack.acknowledge();
    }
}

文章作者: L Q
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 L Q !
  目录