SpringBoot_Kafka
第一步
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
第二步
spring:
application:
name: springboot-kafka
kafka:
bootstrap-servers: 192.168.234.128:9092,192.168.234.128:9093,192.168.234.128:9094
topics: topic-1,topic-2,topic-3
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: springboot-kafka
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 50
listener:
concurrency: 3
ack-mode: manual_immediate
type: batch
第三步
@Component
@AllArgsConstructor
@Slf4j
public class KafkaProvider {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessageData(String topic, String data) {
try {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("消息发送失败:" +ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("消息发送成功 :" + result);
}
});
System.out.println("生产消息至Kafka: " + data);
} catch (Exception e) {
log.error("出错!!!!!!!!!!!" + e.getMessage());
e.printStackTrace();
}
System.out.println("线程: {} " + Thread.currentThread().getName());
}
}
第四步
@Component
@Slf4j
public class KafkaConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0, 3)));
return factory;
}
@KafkaListener(topics = {"topic_1"})
public void consumer_1(ConsumerRecord<?, ?> record){
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
@KafkaListener(topics = {"#{'${spring.kafka.topics}'.split(',')}"}, groupId = "${spring.kafka.consumer.group-id}")
public void consumer_2(ConsumerRecord<?, ?> records, Acknowledgment ack) {
System.out.println("消费:"+records.topic()+"-"+records.partition()+"-"+records.value());
Optional<?> kafkaMessage = Optional.ofNullable(records);
kafkaMessage.ifPresent(u -> {
try {
System.out.println("消费的消息为:" + u);
} catch (Exception e) {
e.printStackTrace();
log.error("消费异常,错误消息:{},异常信息:{}", u, e.getMessage());
}
});
ack.acknowledge();
}
@KafkaListener(id = "consumer_3", topics = {"#{'${spring.kafka.topics}'.split(',')}"}, groupId = "${spring.kafka.consumer.group-id}")
public void consumer_3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
long startTime = System.currentTimeMillis();
records.forEach(record -> {
log.info(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", record.topic(), record.partition(),
record.offset(), record.value()));
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(u -> {
try {
log.info(Thread.currentThread().getName() + ":" + u);
System.out.println("消费的消息为:" + u);
} catch (Exception e) {
log.error("消费异常,错误消息:{},异常信息:{}", u, e.getMessage());
}
});
});
log.info("data size:{} kafka消费耗时 {}", records.size(), System.currentTimeMillis() - startTime);
ack.acknowledge();
}
}