使用canal 同步mysql数据到redis

0. 流程说明 ①更新 mysql 数据 ②canal 获取 binlog,java 的canal客户端感知到。 ③将更新的数据 进行 封装(操作类型,数据库名,数据库表名,操作行的属性。。) ④将封装好的数据存入到 MQ 中 作为缓冲。 ⑤MQ 处理 数据:将数据更新到redis 中 1. 安装canal 根据 github 文档,将canal安装到 linux。 2. 客户端代码 @Component @Slf4j public class CanalRedisClient { @Resource @Lazy private CanalRedisProxy canalRedisProxy; @Value("${canal.address}") private String canalAddress; @Value("${canal.port}") private int canalPort; @Value("${canal.destination}") private String canalDestination; @PostConstruct public void canalExample() { // 创建链接 CompletableFuture.runAsync(() -> { CanalConnector connector = CanalConnectors .newSingleConnector(new InetSocketAddress(canalAddress, canalPort), canalDestination, "", ""); log.info("连接 canal 成功!"); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; log.info("empty count : {}", emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; //把修改消息进行打包 toJsonString(message.getEntries()); } connector.ack(batchId); // 提交确认 } log.warn("empty too many times, exit"); } finally { connector.disconnect(); } }); } /** * 一行数据封装为一个 Canal 类 * @param entries */ private void toJsonString(List<Entry> entries){ for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalModel canalModel = new CanalModel(); canalModel.setDbName(entry.getHeader().getSchemaName()); canalModel.setTableName(entry.getHeader().getTableName()); canalModel.setEventType(rowChage.getEventType().toString()); for (RowData rowData : rowChage.getRowDatasList()) { String dataId = getDataId(rowData.getBeforeColumnsList()); if (StringUtils.hasText(dataId)){ canalModel.setDataId(dataId); }else { canalModel.setDataId(getDataId(rowData.getAfterColumnsList())); } canalModel.setBefore(columnToJsonString(rowData.getBeforeColumnsList())); canalModel.setAfter(columnToJsonString(rowData.getAfterColumnsList())); //把jsonString 放入rabbitmq中 canalRedisProxy.putMQ(canalModel); } } } /** * 获取数据的id,用于 redis 的 存储key * @param columns * @return */ private String getDataId(List<Column> columns){ for (Column column : columns){ if ("id".equals(column.getName())){ return column.getValue(); } } return ""; } /** * 把表中的行转换为json格式,用于存储到redis * @param columns * @return */ private String columnToJsonString(List<Column> columns){ HashMap<String, String> map = new HashMap<>(); for (Column column : columns) { String columnName = column.getName(); String resultStr = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName); map.put(resultStr, column.getValue()); } String jsonString = JSONObject.toJSONString(map); return jsonString; } } 3.处理同步的数据 放入 MQ 中做缓冲。 @Component @Slf4j public class CanalRedisProxy { @Resource private RabbitTemplate rabbitTemplate; public void putMQ(CanalModel canalModel) { String eventType = canalModel.getEventType(); if (!StringUtils.hasText(eventType)){ throw new NullPointerException("内容为空!"); } String routingKey = ""; if ("DELETE".equals(eventType)){ routingKey = "canal.redis.delete"; }else if ("UPDATE".equals(eventType)){ routingKey = "canal.redis.update"; } else if ("INSERT".equals(eventType)) { routingKey = "canal.redis.insert"; } rabbitTemplate.convertAndSend("canal-redis-exchange", routingKey, canalModel); } } 更新redis数据 package com.vanky.chat.mq.service; import com.rabbitmq.client.Channel; import com.vanky.chat.common.bo.CanalModel; import com.vanky.chat.common.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author vanky * @create 2024/4/21 20:35 */ @Component @Slf4j public class CanalRedisService { /** * 插入数据 */ @RabbitListener(queues = {"canal.redis.insert.queue"}) public void CanalRedisInsertListener(Message message, Channel channel, CanalModel canalModel){ String key = "im:data:" + canalModel.getTableName() + ":" + canalModel.getDataId(); log.info("redis 【插入】 数据:{}", key); RedisUtil.put(key, canalModel.getAfter()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { throw new RuntimeException(e); } } /** * 更新数据 */ @RabbitListener(queues = {"canal.redis.update.queue"}) public void CanalRedisUpdateListener(Message message, Channel channel, CanalModel canalModel){ String key = "im:data:" + canalModel.getTableName() + ":" + canalModel.getDataId(); log.info("redis 【更新】 数据:{}", key); RedisUtil.put(key, canalModel.getAfter()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { throw new RuntimeException(e); } } /** * 删除数据 */ @RabbitListener(queues = {"canal.redis.delete.queue"}) public void CanalRedisDeleteListener(Message message, Channel channel, CanalModel canalModel){ String key = "im:data:" + canalModel.getTableName() + ":" + canalModel.getDataId(); log.info("redis 【删除】 数据:{}", key); RedisUtil.del(key); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { throw new RuntimeException(e); } } }

四月 24, 2024 · 3 分钟

使用 kafka 作为消息队列 做流量削峰

1.优点 高吞吐量:Kafka 能够处理非常高的消息吞吐量,适用于需要处理大量数据的场景,比如日志收集、实时数据处理等。 持久性:Kafka 的消息是持久化存储的,可以保证消息不会丢失。即使消费者消费消息失败,消息也可以被重新消费。 水平扩展性:Kafka 的设计允许在集群中添加更多的节点来扩展容量和吞吐量,而不需要对现有系统进行太多改动。 分布式存储:Kafka 使用分布式的日志存储来存储消息,这意味着消息被分割成多个分区并分布在不同的节点上,提高了性能和容错性。 多订阅者:Kafka 支持多个消费者组,每个消费者组可以独立消费消息,这使得 Kafka 适合于广播消息给多个订阅者的场景。 实时数据处理:由于 Kafka 具有低延迟和高吞吐量的特点,它非常适合用于实时数据处理和流处理场景,比如事件驱动架构、实时分析等。 2.安装,java集成kafka 参考:https://cloud.tencent.com/developer/article/2393694 需要注意springboot 和 kafka 版本对应。 ①依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.0.12</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.2</version> </dependency> ②yml配置 spring: kafka: consumer: group-id: foo auto-offset-reset: earliest bootstrap-servers: localhost:9092 producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.type.mapping: foo:com.vanky.chat.server.kafkatest.Foo,bar:com.vanky.chat.server.kafkatest.Bar ③kafka 配置类 @Configuration public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); //factory.setErrorHandler(new SeekToCurrentErrorHandler( // new DeadLetterPublishingRecoverer(template), 3)); return factory; } // 当传输的是个实体类时,进行消息格式转换 @Bean public RecordMessageConverter converter() { StringJsonMessageConverter converter = new StringJsonMessageConverter(); DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); typeMapper.addTrustedPackages("com.vanky.chat.server.kafkatest"); Map<String, Class<?>> mappings = new HashMap<>(); mappings.put("foo", Foo.class); mappings.put("bar", Bar.class); typeMapper.setIdClassMapping(mappings); converter.setTypeMapper(typeMapper); return converter; } @Bean public NewTopic foos() { return new NewTopic("foo", 1, (short) 1); } @Bean public NewTopic bars() { return new NewTopic("bar", 1, (short) 1); } } ④保存消息到kafka @RestController @RequestMapping("/kafkatest") @AllArgsConstructor @Tag(name = "kafka") @Slf4j public class Example2Controller { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Autowired private SendService sendService; @PostMapping("/foo") @Operation(summary = "foo") public void send(@RequestBody Foo foo){ KafkaCompletableFuture<SendResult<String, Object>> future = (KafkaCompletableFuture<SendResult<String, Object>>) kafkaTemplate.send("foo", "modelOne", foo); future.thenAccept((result) -> { log.info("生产者成功发送消息到" + result.getProducerRecord().topic() + "-> " + result.getProducerRecord().value().toString()); }); } @PostMapping("/bar") @Operation(summary = "bar") public void send(@RequestBody Bar bar){ KafkaCompletableFuture<SendResult<String, Object>> future = (KafkaCompletableFuture<SendResult<String, Object>>) kafkaTemplate.send("bar", bar); future.thenAccept((result) -> { log.info("生产者成功发送消息到" + result.getProducerRecord().topic() + "-> " + result.getProducerRecord().value().toString()); }); } //异步 @PostMapping("/ansyc") @Operation(summary = "ansyc") public void sendAnsyc(@RequestBody Bar bar){ sendService.sendAnsyc(bar); } //同步 @PostMapping("/sync") @Operation(summary = "sync") public void sendSync(@RequestBody Bar bar){ sendService.sendSync(bar); } } ⑤listener / handler @KafkaListener:用于将方法标记为 Kafka 消息侦听器。可以在方法上使用此注解来指定要侦听的主题或主题模式,以及其他配置选项,例如组ID和线程数。 @KafkaHandler:在使用 @KafkaListener 注解时,如果类中有多个方法,可以使用 @KafkaHandler 注解标记其中一个方法,以指定作为特定消息类型的处理程序。 @Component public class Example3Listenter { @KafkaListener(topics = "ansyc") public void listenAnsyc(Bar bar) { System.out.println(bar); } @KafkaListener(topics = "sync") public void listenSync(Bar bar) { System.out.println(bar); } } /** * 或者写成handler形式 */ @Component @KafkaListener(id = "handler", topics = {"foo", "bar"}) public class ListenHandler { @Autowired private KafkaTemplate<Object, Object> kafkaTemplate; @KafkaHandler public void foo(@Payload Foo foo, @Header(KafkaHeaders.RECEIVED_KEY) String key) { System.out.println("key:" + key); System.out.println("foo:" + foo.toString()); } @KafkaHandler public void foo(Bar bar) { System.out.println("bar:" + bar.toString()); } }

四月 21, 2024 · 2 分钟

常见设计模式及代码示例

一、七大设计原则 1.开放-封闭原则 (Open-Closed Principle, OCP) 原则描述:软件实体(类、模块、函数等)应当对扩展开放,对修改关闭。也就是说,当需求变化时,应尽量通过添加新代码来实现功能增强,而不是修改现有的代码。 举例:在开发图形库时,如果需要支持多种形状的绘制,可以创建一个抽象的Shape接口,并为每种形状(如Circle、Rectangle)提供具体实现类。当要增加新的形状时,只需新增一个遵循Shape接口的类即可,无需修改已有代码。 2.单一职责原则 (Single Responsibility Principle, SRP) 原则描述:一个类或模块应该有且仅有一个改变它的原因。简单来说,每个类都应该专注于做一件事,并把它做好。 举例:在员工管理系统中,Employee类只负责处理与员工基本信息相关的操作,而薪资计算的功能则由另一个SalaryCalculator类负责,这样修改员工信息不会影响到薪资计算逻辑,反之亦然。 3.里氏替换原则 (Liskov Substitution Principle, LSP) 原则描述:子类型必须能够替换它们的基类型。即继承体系中的子类在父类出现的地方能够无二义性地替代父类工作,而不改变程序正确性。 举例:假设有一个Shape基类和派生出的Square子类,Square是一个特殊的Rectangle,但在使用Shape的地方,无论传入的是Rectangle还是Square,都能正常执行面积计算等方法,而不会因多态调用产生错误结果。 4.依赖倒置原则 (Dependency Inversion Principle, DIP) 原则描述:高层模块不应该依赖于低层模块,两者都应依赖于抽象。抽象不应依赖于细节,细节应依赖于抽象。 举例:在应用程序中,控制器类(高层模块)不直接依赖于数据库访问类(低层模块),而是共同依赖于一个数据访问接口(抽象)。这样,在更换数据库技术时,只需重新实现接口,而不用修改控制器。 5.接口隔离原则 (Interface Segregation Principle, ISP) 原则描述:客户端不应该被迫依赖它不需要的方法。接口应该小而专,每个接口代表一种独立的角色或职责。 举例:一个大的UserService接口可以拆分为更细粒度的UserAuthentication, UserProfileManagement, UserPermissionControl等接口,各个组件根据实际需求选择合适的接口进行依赖,避免接口方法的冗余和浪费。 6.合成/聚合复用原则 (Composition/Aggregate Reuse Principle, CRP) 原则描述:优先使用组合或者聚合关系而非继承来达到复用的目的。换句话说,尽量使用“拥有”关系(包含实例)而不是“是”关系(继承)来复用行为。 举例:在构建一个系统时,可以创建一个Car类,其中包含多个部件对象(如Engine、Wheel等),而不是让Car从这些部件继承。这样,部件的变化不会直接影响到Car类,增强了系统的灵活性。 7.迪米特法则 (Law of Demeter, LoD) 原则描述:也称为最少知识原则,即一个对象应当尽可能少地了解其他对象。一个对象只需要知道与其直接交互的对象即可。 举例:在一个订单系统中,Order类只需与Customer类交互获取必要的信息,而不直接访问Customer的地址信息,而是通过Customer.getAddress()间接获取,这样减少了对象之间的耦合度。 二、UML类图 UML类图(Unified Modeling Language Class Diagram)是面向对象分析与设计中最为常用的图形化建模工具之一,它主要用于描述系统的静态结构。在UML类图中,主要元素包括类、接口以及它们之间的关系。下面是一些关键概念和组成部分: 类(Class): 类名:通常以粗体表示,在顶部书写。 属性(Attributes):类的成员变量或数据字段,描述类所拥有的状态信息,一般在类名称下方列出,并可标注可见性(+公共、-私有、#保护)和其他属性如数据类型、初始值等。 操作(Operations):类的方法或行为,位于类定义的底部,同样可以标记可见性和参数列表。 关系(Relationships): 继承(Inheritance):用一个带空心三角形箭头的直线表示,箭头指向基类,用于表示子类对父类的继承关系。 实现(Realization):类实现接口时,使用虚线箭头,箭头指向接口,表示类实现了接口的所有方法。 关联(Association):用来表示类之间的连接,可以通过线条上的角色名和多重性(0.., 1.., 1, etc.)来详细说明关联关系。 聚合(Aggregation):特殊的关联关系,表示整体与部分的关系,通常通过空心菱形箭头表示(箭头从整体指向部分)。 组合(Composition):比聚合更强的关系,表示部分不能脱离整体存在,用实心菱形箭头表示。 依赖(Dependency):表示一种较弱的“使用”关系,即一个元素的变化可能会影响到另一个元素,用带箭头的虚线表示。 接口(Interface): ...

三月 26, 2024 · 23 分钟