【IM】如何保证消息有序性?
即时通讯系统引入保证消息有序的机制,务必会影响性能,增加开发复杂度。 引入 IM 大佬的一句话: 1. 理想状态 服务端接收到消息的顺序与用户点击发送消息的按钮顺序完全一致,用户接收到的消息的顺序也和发送时的顺序完全一致。 这是几乎不可能实现的!为什么? 1. 网络延迟 不可能按下发送消息的按钮后,消息就会马上到达服务器。每个用户的网速可能不一样。 OK,那我按发送时的时间戳来排序不就行了? 2. 时钟不一致 其实每个人本地的时钟不一定都是同一个,任何物理时钟都有漂移,任何时钟同步协议都受制于网络延迟。这导致我们对时间的测量永远存在一个不确定的误差区间,区间内的事件顺序无法判定。 既然如此,那我们退一步,一个即时通讯系统,以微信举例,是不是真的需要保证消息的全局有序? 答案是不用。我们只需要保证每个会话内的消息有序就行,也就是用户1和用户2的这个聊天会话中,我们看到的消息是一样的,保证两个人发送的消息在这个会话内有序! 再退一步,那我只保证每个人发送的消息有序行不行? 答案是不行!比如用户A发送了两条消息MSG1和MSG2,用户B发送了MSG3,如果在页面上显示了MSG3 -> MSG1 -> MSG2,这样也算是乱序。 2. 如何实现会话内有序? 路径1:客户端 -> 服务端 1. 建立 TCP 长连接 我们都知道,TCP 长连接保证消息可靠性、有序性。 但是,实际并不能只依靠 TCP 连接来保证消息的发送顺序!在弱网环境,比如火车、电梯,可能会快速断网、切网,原来的 TCP 连接断开,连上一条新的TCP 连接,我发送消息MSG1在旧连接上没发出去,消息MSG2 在新的TCP 连接上发送成功了,这就导致了消息乱序。 2. 客户端生成序列号 学习 TCP 保证消息有序的方案,发送消息前,客户端生成本地递增序列号localSeq,这个序列号在本地存储的,服务端也会记录上一条接收成功了localSeq,然后在服务端判断当前消息是不是有序的。 如果乱序,可以通知客户端重发消息。 也就是:客户端发消息 ->服务端接收并校验localSeq -> 服务端发送ack或要求重发 如果一来一回严重影响性能,可以参考使用批量ack等。 路径2:服务端处理入库 因为我们的目标是保证会话有序,所以消息到达服务端时的第一件事应该是分发消息,按照会话的 ID 进行分发,在我的项目中,私聊会话格式:private_{user_id1}_{user_id2},user_id1 小于 user_id2。 分发最简单的就是把相同会话的消息放到一个队列,按顺序处理。 这里可以引入消息队列这个中间件,像 Kafka 的 Partition 分区机制 和 RocketMQ Queue 机制。 ...
如何自定义一个注解,并在代码中应用
1)通用自定义注解 1.创建自定义注解 首先,定义一个自定义注解。注解可以用于类、方法、字段或参数等,具体取决于注解的使用场景。注解本质上是一个普通的Java接口,只是它带有 @interface 关键字。 例如,创建一个自定义注解 @MyAnnotation: import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) // 注解作用的目标,比如方法、字段、类等 @Retention(RetentionPolicy.RUNTIME) // 注解的保留策略,决定注解在哪个阶段可用 public @interface MyAnnotation { String value() default "default value"; // 注解的属性,允许提供默认值 } 解释: @Target:指定注解可以应用于哪些元素,例如 ElementType.METHOD 表示该注解可用于方法上。 @Retention:指定注解的保留策略。RetentionPolicy.RUNTIME 表示该注解在运行时可用(通常Spring的处理器需要在运行时使用它)。 value:这是注解的一个属性,可以自定义多个属性,并提供默认值。 2. 创建注解处理器 为了让Spring能够识别并处理这个注解,你可以通过 @Component 或者其他方式将其注册为Spring Bean。以下是一个基于 AOP(面向切面编程)的处理器示例,处理自定义注解。 例如,使用 AspectJ 实现一个切面,处理带有 @MyAnnotation 的方法: import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.springframework.stereotype.Component; @Aspect @Component public class MyAnnotationAspect { @Before("@annotation(myAnnotation)") // 这里的 @annotation 表示匹配有这个注解的方法 public void beforeMethod(MyAnnotation myAnnotation) { // 通过myAnnotation获取注解信息并进行相应处理 System.out.println("执行带有MyAnnotation注解的方法,注解的值:" + myAnnotation.value()); } } 3. 使用自定义注解 现在,你可以在项目的某个方法上使用 @MyAnnotation: ...
SPI + 工厂设计模式
1. 什么是 SPI ?有什么用? SPI(Service Provider Interface)是Java提供的一种用于实现可扩展性和插件机制的接口规范。它允许开发者定义或使用服务提供者(Service Providers),并且可以在运行时动态地加载这些服务,而不需要在编译时将服务的实现绑定到应用程序中。这种机制特别适合那些需要根据配置或条件加载不同服务实现的场景,例如数据库驱动、加密算法等。 1.1. SPI 的核心概念: 服务接口(Service Interface):定义服务提供者需要实现的接口。例如,java.sql.Driver就是一个常见的服务接口,所有的数据库驱动都必须实现这个接口。 服务提供者(Service Provider):服务接口的具体实现。例如,不同数据库厂商的驱动程序就是服务提供者。 服务加载器(Service Loader):用于发现和加载服务提供者的机制。Java 提供了 java.util.ServiceLoader 来查找服务提供者。 配置文件:SPI 使用一个特殊的配置文件来声明服务提供者的实现类。配置文件放置在 META-INF/services/ 目录下,文件名为服务接口的全限定名,文件内容为实现类的全限定名。例如,META-INF/services/java.sql.Driver 这个文件可能包含多个 JDBC 驱动实现类。 1.2. SPI的工作原理: 应用程序通过 ServiceLoader 查找指定服务接口的实现。 JVM在 META-INF/services/ 目录下查找对应的服务配置文件。 服务加载器解析配置文件并实例化指定的服务提供者类。 2. 实现 具体实现方法如下: 指定 SPI 的配置目录,并且将配置再分为系统内置 SPI 和用户自定义 SPI,便于区分优先级和维护。 编写 SpiLoader 加载器,实现读取配置、加载实现类的方法。 用 Map 来存储已加载的配置信息 键名 => 实现类。 通过 Hutool 工具库提供的 ResourceUtil.getResources 扫描指定路径,读取每个配置文件,获取到 键名 => 实现类 信息并存储在 Map 中。 定义获取实例方法,根据用户传入的接口和键名,从 Map 中找到对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可。 重构序列化器工厂,改为从 SPI 加载指定的序列化器对象。 使用静态代码块调用 SPI 的加载方法,在工厂首次加载时,就会调用 SpiLoader 的 load 方法加载序列化器接口的所有实现类,之后就可以通过调用 getInstance 方法获取指定的实现类对象了。 ...
im:私聊消息端到端加密
概括:通过非对称加密算法获取共享密钥,通过对称加密算法对消息进行加密解密。 一、(非对称算法)获取共享密钥 采用 ECDH 算法获取共享密钥。 1. 公钥私钥什么时候生成? 当前设备第一次登录的时候,会生成公钥私钥,公钥保存到服务端,私钥保存在本地。 注册的时候 2. 共享密钥什么时候生成? 刚加好友的时候/刚进一个群的时候。 发现本地没有与一个好友/群的共享密钥的时候。 1. 私聊 发送消息时,需要对消息加密,加密用的是共享密钥。 共享密钥生成过程: A和B 都会生成各自的公钥和私钥,公钥存在服务端数据库中,私钥保存在本地。 根据 ECDH 算法,A 只需要获取到 B 的公钥,可以算得一个共享密钥,这个共享密钥和 B 用 A 的公钥算到的共享密钥是一样的。 共享密钥生成后,转为 AES 算法加密的密钥格式 2. 群聊 用户加入一个群的时候,会与这个群交换共享密钥,而不是与群内的所有用户交换共享密钥。服务端保存着 群与每个群用户的共享密钥,所以说,一个群有多少个用户,就会生成多少条共享密钥。 创建群聊的时候,群聊的公钥和私钥都存在服务端数据库。 用户A给群G发送消息时,用A与G的共享密钥加密;服务端获取到消息后,用 A与G 的共享密钥解密,然后把原文存到数据库;转发给群内用户的时候,用G与其他用户的共享密钥加密后再发送。 二、(对称算法)消息加密 采用对称加密算法 AES 进行加密,即用什么加密,就用什么解密。 服务端获取到私聊消息的时候,不用加密解密,因为服务端没有用户的共享密钥,直接存储消息加密后的二进制格式。 服务端收到群聊消息的时候,需要加密解密,存的是消息原文。
IM 系统:RabbitMQ + Redis 实现消息重发机制
【消息未收到 ack 的情况】 发送一条消息后,将消息体存入到 redis 中,并将消息的【uniqueId 和 重发次数】放入 RabbitMQ 消息队列中。 延迟队列中的 【uniqueId 和 重发次数】在规定客户端ack时间内, 会被放入到死信队列,用于检查是否 收到ack消息。 死信队列收到 【uniqueId 和 重发次数】 到 redis 中检查这条消息体是否还存在,存在则说明还没有收到ack 再检查 重发次数,如果重发次数已达上限,报异常【重试发送多次消息失败】 如果还没到重发次数上限,就进入消息重发。 【消息收到 ack 后的处理】 (私信) 删除 redis 中的等待ack 的消息体,这样死信队列在检查的时候没有查到这条消息体,说明消息已经ack 修改消息状态 为【已送达】。 (群聊) 修改用户的 last_ack_id 删除消息体缓存
双 Token 认证,无感更新
一、流程 用户首次登录后,服务端给客户端发两个 token ,分别为 accessToken 和 refreshToken。 accessToken 的生存时间较短,用于后续客户访问接口时使用。 refreshToken 则用于更新 accessToken 并返回给客户端, 更新后 refreshToken 也需要刷新。 更新或生成accessToken时,需要更新用户所有的 refreshToken 的过期时间。 二、token 说明 过期时间【测试】 accessToken :2天 refreshToken :15天 存储信息: accessToken: 用户基本信息:用户Id,用户的权限 refreshToken: 过期时间 用户id redis 缓存内容: 使用 hash 数据类型保存用户的token信息,因为一个用户可能会多地登录。 Key:【user_token : [userId]】 Value【hash】: key : accessToken value: refreshToken expireTime(refreshToken的过期时间) 三、社交登录 社交登录简单基本流程:【以gitee为例】 在第三方应用【gitee】中注册应用,获取 client_id 和 client_secret 引导用户到第三方【gitee】认证页面,此时请求带上 client_id 用户授权后,带着第三方【gitee】生成的 code 到回调地址。 服务端带着 code,client_id,client_secret,redirect_uri 信息,请求第三方【gitee】获取access_token 服务端带着 access_token 可以获取用户的信息 可以使用用户信息进行 注册/登录 四、问题及解决 1、token 过期后无法解析 使用以下 jwt 工具进行生成jwt和解析jwt。 ...
XXL-JOB 发布定时文章
一、提交定时文章,待审核 (ArticleController —-> publishScheduled)接收参数 ScheduledTask4ArticleTo,里面包含两个属性。 ScheduledTaskTo(定时任务信息):保存发布时间、发布用户id、文章id等信息 ArticleSubmitDTO(文章信息):需要发布的文章具体信息。 (ArticleController —-> publishScheduled)调用 ArticleService 的 submitArticle 方法,提交文章,也就是存到数据库中,需要人工审核。 (ArticleController —-> publishScheduled)保存文章到数据库后,文章有了id,赋值给定时任务中文章id。调用 ScheduledTaskFeignClient 的 saveScheduledTask 方法,保存定时任务信息。 二、审核 (ArticleService —–> auditArticle)如果文章审核通过且为定时发布文章,就把定时任务信息放入 RabbitMQ,并把文章状态设置为定时文章待发布状态。 三、发布定时任务 案例中发现:发布定时任务到xxl-job 服务器中注意用到以下几个参数 public class XxlJobInfoBo { private int id = 0; // 0 private int jobGroup; // 2 private String jobDesc; //任务名 private String author = "Vanky"; // 负责人 private String scheduleType = "CRON"; // CRON private String scheduleConf; // cron表达式 private String misfireStrategy = "DO_NOTHING"; private String executorRouteStrategy = "FIRST"; // FIRST private String executorHandler; // 执行器,任务Handler名称 private String executorBlockStrategy = "SERIAL_EXECUTION"; // SERIAL_EXECUTION private String glueType = "BEAN"; // BEAN private String glueRemark = "GLUE代码初始化"; // GLUE代码初始化 } 需要赋值的有: ...
ElasticSearch(2)使用篇
一、创建对象映射 ①创建ElasticSearch中对象 不参与索引的可以加上 "index": false, "doc_values": false PUT article { "mappings": { "properties": { "id":{ "type": "long", "index": false, "doc_values": false }, "title":{ "type": "text", "analyzer": "ik_smart" }, "createTime":{ "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }, "images":{ "type": "keyword", "index": false, "doc_values": false } } } } ②java对象映射 注意LocalDateTime的处理。 @Data public class ArticleEsModel { /** * id */ private Long id; /** * 标题 */ private String title; /** * 时间 */ @Field(type = FieldType.Date,format = DateFormat.basic_date_time_no_millis,pattern = "yyyy-MM-dd HH:mm:ss") @JsonSerialize(using = LocalDateTimeSerializer.class) @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonFormat(shape = JsonFormat.Shape.STRING,timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") private LocalDateTime createTime; /** * 图片 */ private List<String> images; } 二、批量插入 import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import com.vanky.community.api.search.to.ArticleEsModel; import com.vanky.community.search.constant.EsContent; import com.vanky.community.search.service.ArticleSearchService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; /** * @author vanky */ @Slf4j @Service public class ArticleSearchServiceImpl implements ArticleSearchService { @Resource private ElasticsearchClient elasticsearchClient; @Override public boolean saveArticleBatch(List<ArticleEsModel> articleEsModels) throws IOException { BulkRequest.Builder br = new BulkRequest.Builder(); for (ArticleEsModel model : articleEsModels) { br.operations(op -> op .index(idx -> idx .index(EsContent.ARTICLE_INDEX) .id(model.getId().toString()) .document(model) ) ); } //执行操作,并返回响应结果 BulkResponse bulkResponse = elasticsearchClient.bulk(br.build()); //处理错误 if(bulkResponse.errors()){ List<String> collect = Arrays.stream(bulkResponse.items().toArray(new BulkResponseItem[0])) .map(item -> item.id()) .collect(Collectors.toList()); log.error("文章保存到es出错:{}", collect); return false; } return true; } } 三、查询 ElasticSearch —- DSL查询语句 ...
ElasticSearch(1)安装与基础篇
一、安装es ①拉取镜像 ②启动 docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" \ -e ES_JAVA_OPTS="-Xms64m -Xmx128m" \ -v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \ -v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \ -v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \ -d elasticsearch:8.11.3 ③编写 elasticsearch.yml cluster.name: "docker-cluster" network.host: 0.0.0.0 xpack.security.enabled: false cluster.name: "docker-cluster" 这个配置参数定义了Elasticsearch集群的名称。在分布式环境下,所有希望加入同一集群的节点都必须配置相同的集群名称。这里将集群命名为 “docker-cluster”,意味着所有带有这个配置的Elasticsearch节点将会组成一个名为“docker-cluster”的集群。 network.host: 0.0.0.0 此配置指定了Elasticsearch节点监听请求的网络接口地址。设置为 “0.0.0.0” 表示节点将在所有可用网络接口上监听请求,这意味着该节点将对来自任何IP地址的客户端请求做出响应。这对于Docker容器环境尤其常见,因为它允许从宿主机或其他容器内部访问此Elasticsearch服务。 xpack.security.enabled: false X-Pack是Elasticsearch提供的一个安全、监控、警报和图形化界面等扩展功能集合。xpack.security.enabled 参数控制X-Pack安全模块是否启用。当设置为 false 时,表示Elasticsearch节点启动时不启用任何安全认证和授权机制。这样做的结果是Elasticsearch服务将以无安全验证的方式运行,这在开发或测试环境中可能比较方便,但在生产环境中强烈建议启用并正确配置安全性以保护数据和系统不受未经授权的访问。 二、安装kibana ①拉取镜像 ②启动 docker run --name kibana -e ELASTICSEARCH_HOSTS=http://192.168.200.134:9200 -p 5601:5601 \ -d kibana:8.11.3 三、使用elasticsearch index 相当于数据库,type 相当于表 Elasticsearch 是一个分布式、开源的搜索和分析引擎,主要用于全文搜索、日志数据分析、实时数据分析等应用。它是由 Elasticsearch N.V. 公司维护的开源项目,建立在 Apache Lucene 基础之上。以下是 Elasticsearch 的一些主要特性和用途: ...
使用Guava 工具实现驼峰命名相互转换
1.引入依赖 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>33.1.0-jre</version> </dependency> 2.下划线 -> 驼峰 String columnName = "student_name"; String resultStr = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName); // resultStr输出 ---> studentName 3.驼峰 -> 下划线 String resultStr = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, "studentName"); // resultStr输出 ---> student_name