1.优点
- 高吞吐量:Kafka 能够处理非常高的消息吞吐量,适用于需要处理大量数据的场景,比如日志收集、实时数据处理等。
- 持久性:Kafka 的消息是持久化存储的,可以保证消息不会丢失。即使消费者消费消息失败,消息也可以被重新消费。
- 水平扩展性:Kafka 的设计允许在集群中添加更多的节点来扩展容量和吞吐量,而不需要对现有系统进行太多改动。
- 分布式存储:Kafka 使用分布式的日志存储来存储消息,这意味着消息被分割成多个分区并分布在不同的节点上,提高了性能和容错性。
- 多订阅者:Kafka 支持多个消费者组,每个消费者组可以独立消费消息,这使得 Kafka 适合于广播消息给多个订阅者的场景。
- 实时数据处理:由于 Kafka 具有低延迟和高吞吐量的特点,它非常适合用于实时数据处理和流处理场景,比如事件驱动架构、实时分析等。
2.安装,java集成kafka
参考:https://cloud.tencent.com/developer/article/2393694
需要注意springboot 和 kafka 版本对应。
①依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
②yml配置
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.type.mapping: foo:com.vanky.chat.server.kafkatest.Foo,bar:com.vanky.chat.server.kafkatest.Bar
③kafka 配置类
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//factory.setErrorHandler(new SeekToCurrentErrorHandler(
// new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
// 当传输的是个实体类时,进行消息格式转换
@Bean
public RecordMessageConverter converter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.vanky.chat.server.kafkatest");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Bar.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
@Bean
public NewTopic foos() {
return new NewTopic("foo", 1, (short) 1);
}
@Bean
public NewTopic bars() {
return new NewTopic("bar", 1, (short) 1);
}
}
④保存消息到kafka
@RestController
@RequestMapping("/kafkatest")
@AllArgsConstructor
@Tag(name = "kafka")
@Slf4j
public class Example2Controller {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private SendService sendService;
@PostMapping("/foo")
@Operation(summary = "foo")
public void send(@RequestBody Foo foo){
KafkaCompletableFuture<SendResult<String, Object>> future = (KafkaCompletableFuture<SendResult<String, Object>>)
kafkaTemplate.send("foo", "modelOne", foo);
future.thenAccept((result) -> {
log.info("生产者成功发送消息到" + result.getProducerRecord().topic() + "-> " +
result.getProducerRecord().value().toString());
});
}
@PostMapping("/bar")
@Operation(summary = "bar")
public void send(@RequestBody Bar bar){
KafkaCompletableFuture<SendResult<String, Object>> future = (KafkaCompletableFuture<SendResult<String, Object>>)
kafkaTemplate.send("bar", bar);
future.thenAccept((result) -> {
log.info("生产者成功发送消息到" + result.getProducerRecord().topic() + "-> " +
result.getProducerRecord().value().toString());
});
}
//异步
@PostMapping("/ansyc")
@Operation(summary = "ansyc")
public void sendAnsyc(@RequestBody Bar bar){
sendService.sendAnsyc(bar);
}
//同步
@PostMapping("/sync")
@Operation(summary = "sync")
public void sendSync(@RequestBody Bar bar){
sendService.sendSync(bar);
}
}
⑤listener / handler
- @KafkaListener:用于将方法标记为 Kafka 消息侦听器。可以在方法上使用此注解来指定要侦听的主题或主题模式,以及其他配置选项,例如组ID和线程数。
- @KafkaHandler:在使用 @KafkaListener 注解时,如果类中有多个方法,可以使用 @KafkaHandler 注解标记其中一个方法,以指定作为特定消息类型的处理程序。
@Component
public class Example3Listenter {
@KafkaListener(topics = "ansyc")
public void listenAnsyc(Bar bar) {
System.out.println(bar);
}
@KafkaListener(topics = "sync")
public void listenSync(Bar bar) {
System.out.println(bar);
}
}
/**
* 或者写成handler形式
*/
@Component
@KafkaListener(id = "handler", topics = {"foo", "bar"})
public class ListenHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@KafkaHandler
public void foo(@Payload Foo foo, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
System.out.println("key:" + key);
System.out.println("foo:" + foo.toString());
}
@KafkaHandler
public void foo(Bar bar) {
System.out.println("bar:" + bar.toString());
}
}