news 2026/5/14 12:50:14

项目1-通过RocketMQ 将短链接统计

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
项目1-通过RocketMQ 将短链接统计

这是一份关于 “短链接访问统计系统”(基于 RocketMQ)的笔记,整合了我们之前讨论的所有核心知识点、代码逻辑、设计思想和技术细节,方便你系统复习和查阅。


短链接访问统计系统(基于 RocketMQ)笔记

一、系统核心目标
  • 核心功能:记录短链接的每一次访问,并进行多维度的统计分析(PV/UV/UIP、地域、设备、浏览器、操作系统等)。
  • 核心挑战
    1. 高性能:短链接跳转是核心入口,必须保证用户访问速度快。
    2. 高并发:可能面临瞬间大量的访问请求(如热点链接)。
    3. 数据可靠:每一次访问的统计数据都不能丢失,也不能重复计算。
    4. 系统解耦:统计功能不能影响核心的跳转功能。
二、技术架构与核心组件

整个系统采用生产者 - 消费者(Producer-Consumer)模式,核心组件如下:

  1. 生产者(Producer)ShortLinkStatsSaveProducer

    • 角色:在短链接被访问时,负责收集访问数据并发送到消息队列。
    • 核心任务:将同步的统计入库操作,转化为异步的消息发送。
  2. 消息队列(Message Queue):RocketMQ

    • 角色:作为生产者和消费者之间的 “桥梁”,存储和转发消息。
    • 核心任务:解耦、削峰填谷、保证消息可靠传输。
  3. 消费者(Consumer)ShortLinkStatsSaveConsumer

    • 角色:监听消息队列,消费统计消息,并将数据持久化到数据库。
    • 核心任务:执行耗时的统计入库操作,保证数据最终一致性。
  4. 幂等处理器(Idempotent Handler)MessageQueueIdempotentHandler

    • 角色:基于 Redis 实现,防止同一条消息被重复消费,导致统计数据重复。
    • 核心任务:保证消费的幂等性。
  5. 分布式锁(Distributed Lock):Redisson RReadWriteLock

    • 角色:在消费者入库时,保证并发场景下数据的一致性。
    • 核心任务:防止在统计过程中,短链接的 GID 被修改,导致数据归属错误。
三、核心流程详解
1. 生产者流程 (ShortLinkStatsSaveProducer)

java

运行

public void send(Map<String, String> producerMap) { // 1. 生成唯一的消息Key(UUID),用于幂等性保证 String keys = UUID.randomUUID().toString(); producerMap.put("keys", keys); // 2. 构建RocketMQ消息,设置消息体和消息头 Message<Map<String, String>> build = MessageBuilder .withPayload(producerMap) // 消息体:包含统计数据 .setHeader(MessageConst.PROPERTY_KEYS, keys) // 消息头:设置消息Key .build(); try { // 3. 同步发送消息到指定的Topic SendResult sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info("消息发送成功,ID: {}, Keys: {}", sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error("消息发送失败", ex); // 可扩展:发送失败后的重试或告警逻辑 } }
  • 关键操作
    • 生成消息 Key:使用UUID.randomUUID(),确保每条消息的唯一性,是实现幂等的基础。
    • 同步发送 (syncSend):最可靠的发送方式。生产者会等待 Broker 返回发送结果,确保消息至少被 Broker 接收一次。
    • 设置超时时间2000L(2 秒),防止生产者无限期阻塞。
2. 消费者流程 (ShortLinkStatsSaveConsumer)

java

运行

@Override public void onMessage(Map<String, String> producerMap) { String keys = producerMap.get("keys"); // ========== 核心步骤1:幂等校验 ========== if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 消息已处理完成,直接返回 } throw new ServiceException("消息处理中,需要重试"); // 触发MQ重试 } try { String fullShortUrl = producerMap.get("fullShortUrl"); if (StrUtil.isNotBlank(fullShortUrl)) { String gid = producerMap.get("gid"); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); // ========== 核心步骤2:执行业务逻辑 ========== actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error("消费异常", ex); try { // ========== 核心步骤3:异常处理 ========== messageQueueIdempotentHandler.delMessageProcessed(keys); // 删除幂等标识,允许重试 } catch (Throwable remoteEx) { log.error("删除幂等标识失败", remoteEx); } throw ex; // 抛出异常,触发RocketMQ重试 } // ========== 核心步骤4:标记完成 ========== messageQueueIdempotentHandler.setAccomplish(keys); // 标记消息处理完成 }
  • 关键操作
    • 幂等校验:通过MessageQueueIdempotentHandler确保消息只被处理一次。
    • 执行业务逻辑:调用actualSaveShortLinkStats方法,将统计数据入库。
    • 异常处理
      • 消费失败时,必须删除幂等标识,否则消息将无法被重试。
      • 抛出异常,触发 RocketMQ 的重试机制。
    • 标记完成:消费成功后,标记消息为 “已完成”,防止后续重复消费。
3. 实际入库逻辑 (actualSaveShortLinkStats)

java

运行

public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { // ========== 核心步骤1:加分布式读锁 ========== RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock = readWriteLock.readLock(); rLock.lock(); // 加锁 try { // 1. 补全GID(如果生产者未传入) if (StrUtil.isBlank(gid)) { ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl)); gid = shortLinkGotoDO.getGid(); } // 2. 解析时间维度(小时、星期) int hour = DateUtil.hour(new Date(), true); int weekValue = DateUtil.dayOfWeekEnum(new Date()).getIso8601Value(); // ========== 核心步骤2:多维度统计入库 ========== // a. PV/UV/UIP统计 LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()...build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); // 自定义的增量更新方法 // b. 地域统计(调用高德API) Map<String, Object> localeParamMap = new HashMap<>(); localeParamMap.put("key", statsLocaleAmapKey); localeParamMap.put("ip", statsRecord.getRemoteAddr()); String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); // ... 解析结果并入库 ... // c. 操作系统、浏览器、设备、网络等统计(类似) LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()...build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); // ... // d. 原始访问日志 LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()...build(); linkAccessLogsMapper.insert(linkAccessLogsDO); // e. 更新短链接核心表的总统计 shortLinkMapper.incrementStats(gid, fullShortUrl, 1, ...); // f. 今日统计 LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()...build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error("统计入库异常", ex); } finally { // ========== 核心步骤3:释放锁 ========== rLock.unlock(); // 最终释放锁,避免死锁 } }
  • 关键操作
    • 加分布式读锁
      • 锁的 KeyLOCK_GID_UPDATE_KEY + fullShortUrl,保证锁的粒度是单个短链接,避免全局锁。
      • 读锁 (Read Lock)
        • 允许多个读操作并发执行(多个统计线程可以同时处理同一个短链接)。
        • 阻塞写操作(修改 GID 的操作),保证在统计过程中 GID 不会被修改。
    • 多维度统计入库
      • 增量更新:大部分统计表(如link_access_stats)使用自定义的shortLinkStats方法,实现 “不存在则插入,存在则更新(累加)” 的逻辑,避免全量更新的性能问题。
      • 原始日志link_access_logs表直接插入原始访问记录,用于后续的明细查询和数据分析。
    • 释放锁:在finally块中释放锁,确保无论代码是否异常,锁都能被释放,防止死锁。
4. 幂等处理器 (MessageQueueIdempotentHandler)

基于 Redis 的SETNXsetIfAbsent)命令实现,保证分布式环境下的原子性。

方法名作用Redis KeyValue核心逻辑
isMessageProcessed判断消息是否可被处理short-link:idempotent:{keys}0使用setIfAbsent尝试设置 Key。-true:Key 不存在,消息可处理,设置 Value 为0(处理中)。-false:Key 已存在,消息不可处理。
isAccomplish判断消息是否处理完成short-link:idempotent:{keys}1检查 Key 对应的 Value 是否为1
setAccomplish标记消息处理完成short-link:idempotent:{keys}1将 Value 设置为1,并设置过期时间。
delMessageProcessed删除幂等标识short-link:idempotent:{keys}-删除 Key,允许消息被重试。
  • 核心思想先占坑,后处理
    1. 处理前,用SETNX占坑(Value=0)。
    2. 处理中,其他线程看到坑被占,要么等待要么拒绝。
    3. 处理成功,将坑标记为完成(Value=1)。
    4. 处理失败,把坑让出来(删除 Key)。
四、为什么选择 RocketMQ?
  1. 异步解耦:将耗时的统计入库操作从同步的跳转流程中剥离出来,极大提升了核心接口的响应速度。
  2. 削峰填谷:面对突发的高并发访问,RocketMQ 可以缓冲大量消息,避免直接冲击数据库,保证系统稳定。
  3. 消息可靠性
    • 生产者同步发送:确保消息至少被 Broker 接收一次。
    • Broker 持久化:消息存储在磁盘,即使 Broker 宕机,消息也不会丢失。
    • 消费者重试机制:消费失败时,RocketMQ 会自动重试,保证消息最终被处理。
  4. 负载均衡:RocketMQ 的消费者组(Consumer Group)机制,可以轻松实现多个消费者实例共同消费一个 Topic 的消息,提高处理能力。
  5. 可扩展性
    • 水平扩展:可以通过增加 Broker 节点和消费者实例来提升系统的吞吐量。
    • 功能扩展:RocketMQ 支持定时消息、事务消息等高级特性,便于未来功能扩展。
五、核心技术亮点与设计模式
  1. 读写锁分离
    • 使用 Redisson 的RReadWriteLock,统计入库时加读锁,修改 GID 时加写锁
    • 好处:允许多个统计操作并发执行,同时保证 GID 不被并发修改,兼顾了性能和数据一致性。
  2. 幂等性设计
    • 基于 Redis 的SETNX命令,是分布式系统中实现幂等的经典方案。
    • 好处:有效防止了因网络抖动或 MQ 重试导致的重复消费问题,保证了统计数据的准确性。
  3. 增量更新
    • 统计表的INSERT OR UPDATE操作(如linkAccessStatsMapper.shortLinkStats)。
    • 好处:相比先查询后更新,减少了一次数据库交互,提升了入库性能。
  4. 最小锁粒度
    • 锁的 Key 是fullShortUrl,而不是全局锁。
    • 好处:只对同一个短链接的操作进行同步,不同短链接的操作互不影响,最大化了并发性能。
  5. 异常处理与重试
    • 消费失败时,删除幂等标识并抛出异常,触发 MQ 重试。
    • 好处:保证了消息的最终一致性,即使中间环节出错,数据也不会丢失。
六、总结

这套短链接统计系统是一个高性能、高可用、高并发的分布式系统设计典范。它巧妙地运用了 RocketMQ 实现异步解耦和削峰填谷,通过 Redis 实现了分布式锁和幂等性保证,最终达到了 “用户访问快、统计数据准、系统运行稳” 的目标。

核心设计思想可以概括为:将同步操作异步化,将串行操作并行化,在性能和数据一致性之间找到最佳平衡点。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/10 7:14:46

KMeans、DBSCAN聚类与TF-IDF文本特征提取

在机器学习领域&#xff0c;聚类算法是无监督学习的核心应用&#xff0c;而文本特征提取则是自然语言处理的基础步骤。本文将结合实战代码&#xff0c;详细拆解KMeans、DBSCAN两种经典聚类算法及TF-IDF文本特征提取方法&#xff0c;帮你快速掌握核心原理与落地技巧。一、KMeans…

作者头像 李华
网站建设 2026/5/9 14:02:46

Wincc报表模板:功能与实现深度剖析

Wincc报表模板 1、数据库存储全局脚本。 存储时间自由设置 2、报表查询VBS脚本&#xff0c;带下拉框&#xff0c;组合框&#xff0c;查询内容自由选择&#xff0c;时间自由选择。 3、导出到本地EXECL。 并打印。 4、各类控件&#xff0c;语音报警控件&#xff0c;动画控件。 实…

作者头像 李华
网站建设 2026/5/10 7:03:28

PyTorch-CUDA-v2.6镜像支持哪些NVIDIA显卡型号?兼容性列表公布

PyTorch-CUDA-v2.6镜像支持哪些NVIDIA显卡型号&#xff1f;兼容性列表公布 在深度学习项目开发中&#xff0c;最让人头疼的往往不是模型设计本身&#xff0c;而是环境配置——明明代码没问题&#xff0c;却因为CUDA版本不匹配、驱动太旧或cuDNN缺失导致torch.cuda.is_available…

作者头像 李华
网站建设 2026/5/9 13:20:11

MyBatis 入门到实践:一篇文章带你掌握核心用法

一、什么是 MyBatis&#xff1f;MyBatis 是一款优秀的 Java 持久层框架&#xff0c;它通过 XML 或注解 的方式&#xff0c;将 SQL 语句与 Java 对象进行映射&#xff0c;简化了 JDBC 操作&#xff0c;同时又保留了 SQL 的灵活性。一句话总结&#xff1a;MyBatis JDBC 封装 SQ…

作者头像 李华
网站建设 2026/5/9 10:20:37

【开题答辩全过程】以 基于微信小程序的医院自助挂号系统为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

作者头像 李华
网站建设 2026/5/10 5:58:28

自动化测试脚本调试:Chrome Driver实用技巧

如何真正用好 Chrome Driver&#xff1f;一个测试工程师的实战手记 最近在做 CI/CD 流水线中的自动化回归测试&#xff0c;又一次被“元素找不到”、“页面加载超时”、“莫名其妙失败”这些老问题缠上。翻日志、截图、重跑脚本……一顿操作下来&#xff0c;时间过去了两小时&…

作者头像 李华