news 2026/3/10 4:42:09

供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

1、框架设计逻辑

核心组件

1、服务注册与发现(Redis)

  • 使用Redis作为服务注册中心
  • 以服务名称ApplicationName为key存储节点集群
  • 基于时间戳的心跳机制(10秒间隔)

2、任务调度系统

  • 数据库作为任务持久化存储
  • 守护线程轮询获取新任务
  • 基于负载的调度算法(选择负载最小节点)

3、双守护线程模型

  • 节点监控守护线程:维护节点健康状态
  • 任务发现守护线程:分配计算任务

具体简单时序图如下图所示

2、核心代码实现

2.1、框架核心实现

2.1.1、监听Spring应用启动事件

  • 事件驱动:利用 Spring 应用启动事件,在合适时机启动监控功能
  • 条件控制:通过配置控制功能是否启用,提高灵活性
  • 功能整合:同时启动监控线程和执行类型注册,完成进程监控的初始化

1、启动守护线程

    • ProcessDaemonServiceImpl 实现了 ApplicationListener 接口,监听 Spring 应用启动事件;
    • 在应用启动完成后启动守护线程,监控节点存活状态和进程状态;

2、注册中心注册

    • 获取应用上下文:从事件中获取 ApplicationContext
    • 执行注册服务:获取 ProcessTypeRegisterService 并调用 doRegister()
@Slf4j @Component public class ProcessDaemonServiceImpl implements ApplicationListener<ApplicationStartedEvent> { @Autowired ProcessProperties processProperties; @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { if(processProperties.isEnable() && processProperties.getBusinessKeys().size()!=0){ startDaemonThread(); ApplicationContext applicationContext = applicationStartedEvent.getApplicationContext(); applicationContext.getBean(ProcessTypeRegisterService.class).doRegister(); } } // 启动守护线程 线程优先级设置为10(最高优先级) private void startDaemonThread() { Thread daemonThread2 = new Thread(processUtilServiceImpl.nodeAliveWatcher, "nodeAliveWatcher"); daemonThread2.setDaemon(true); daemonThread2.setPriority(10); // 启动线程 daemonThread2.start(); log.info("{},守护线程启动",daemonThread2.getName()); Thread daemonThread = new Thread(processUtilServiceImpl.processStatusWatcher, "processStatusWatcher"); // 设置为守护线程 daemonThread.setDaemon(true); daemonThread.setPriority(10); // 启动线程 daemonThread.start(); log.info("{},守护线程启动",daemonThread.getName()); } }

2.1.2、监控器

2.1.2.1、节点保活监控器

无限循环运行的守护线程,负责监控节点的状态信息,分布式锁:使用 ALL_NODE_PROCESS_LOCK_KEY 确保集群中只有一个节点执行监控

策略:

    • 定期更新:每 8 秒更新一次节点状态
    • 分布式协调:通过分布式锁确保集群节点状态的一致性
    • 负载信息维护:更新当前节点的负载信息

重启检测

    • 重启标识:初始化时设置 isRestarted 为 true
    • 状态同步:向集群其他节点通知当前节点重启状态
@Slf4j @Component public class ProcessUtilServiceImpl implements ProcessUtilService { /*** * 节点保活监视器 **/ public final Runnable nodeAliveWatcher = () -> { StatusDTO statusDTO = new StatusDTO(); statusDTO.setIsRestarted(true); statusDTO.setWeight(null); while (true) { try { ThreadSleepUtil.parkSeconds(8); String timeSlot = MyDateUtils.getTimeSlot(); ALL_NODE_PROCESS_LOCK_KEY = String.format(ALL_NODE_PROCESS_LOCK_KEY, applicationName,timeSlot); redissonDistributeLock.dealWithLock(ALL_NODE_PROCESS_LOCK_KEY, null, nodeProcessLoadServiceImpl.updateThisNodeInfoFunc, (param) -> { log.warn("节点保活监视器无法正常获取锁,无法更新节点状态"); return null; }, statusDTO); } catch (Exception e) { log.error("节点保活监视器异常"); log.error(e.getMessage(), e); } } }; }

节点状态

@Data public class StatusDTO { private Random random = new Random(); private Boolean isRestarted = true; Long weight ; }

节点状态更新机制

基于redis缓存去更新

  • 维护节点状态:更新当前节点的存活状态和负载信息
  • 权重管理:根据 StatusDTO 中的权重值调整节点负载
  • 节点清理:移除长时间未更新的节点信息
public Function<StatusDTO, Void> updateThisNodeInfoFunc = (statusDTO) -> { try { Long dealingWeight = statusDTO.getWeight(); RMap<String, NodeProcessStastic> nodeDatas = redissonClient.getMap(ALL_NODE_PROCESS_KEY); String localHostIp = IpAddressUtil.getHostIp(); NodeProcessStastic nodeProcessStastic = nodeDatas.getOrDefault(localHostIp, new NodeProcessStastic()); nodeProcessStastic.setTimestamp(DateUtil.now()); nodeProcessStastic.setIpAddress(localHostIp.replaceAll("\\.", "-")); nodeProcessStastic.setSupportBusinessKeys(processProperties.getBusinessKeys()); if (dealingWeight != null) { if ((dealingWeight < 0 && nodeProcessStastic.getDealingProcessWeightSum()>0)|| dealingWeight > 0) { log.info("该机器:{},增加权重得分:{}", localHostIp, dealingWeight); nodeProcessStastic.setDealingProcessWeightSum(nodeProcessStastic.getDealingProcessWeightSum() + dealingWeight); nodeProcessStastic.setLastWeightChangeTimestamp(DateUtil.now()); } }else{ if(statusDTO.getIsRestarted()){ nodeProcessStastic.setDealingProcessWeightSum(0); nodeProcessStastic.setLastWeightChangeTimestamp(null); } } nodeDatas.put(localHostIp, nodeProcessStastic); // 移除可能已经重启了的pod int oriSize = nodeDatas.size(); nodeDatas.values().removeIf(ele -> { long btwTime = DateUtil.between(DateUtil.parse(ele.g
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/8 17:59:58

PyTorch镜像中如何安装特定版本的CUDA驱动?

PyTorch镜像中如何安装特定版本的CUDA驱动&#xff1f; 在深度学习项目开发中&#xff0c;最让人头疼的往往不是模型设计&#xff0c;而是环境配置——尤其是当团队成员各自报告“我这边能跑&#xff0c;你那边报错”时。GPU资源明明存在&#xff0c;torch.cuda.is_available(…

作者头像 李华
网站建设 2026/3/10 0:28:02

PyTorch镜像运行Jupyter时密码如何设置?安全指南

PyTorch镜像运行Jupyter时密码如何设置&#xff1f;安全指南 在现代AI开发中&#xff0c;一个常见的场景是&#xff1a;你刚刚拉取了最新的 pytorch-cuda:v2.8 镜像&#xff0c;准备开始训练模型。执行 docker run -p 8888:8888 --gpus all pytorch-cuda:v2.8 jupyter noteboo…

作者头像 李华
网站建设 2026/3/5 10:53:31

cnn特征图可视化方法:在PyTorch-CUDA-v2.8中绘制中间层输出

CNN特征图可视化方法&#xff1a;在PyTorch-CUDA环境中高效绘制中间层输出 在深度学习模型日益复杂的今天&#xff0c;我们常常面临一个根本性问题&#xff1a;模型到底“看到”了什么&#xff1f; 尤其是在图像分类、目标检测等任务中&#xff0c;尽管卷积神经网络&#xff08…

作者头像 李华
网站建设 2026/3/10 1:54:24

基于COMSOL模拟的双重介质注浆模型研究:浆液在裂隙与多孔介质中的流动与扩散特性分析

用COMSOL 模拟双重介质注浆模型&#xff0c;浆液在多孔介质和裂隙中流动。 裂隙为浆液流动的优势通道&#xff0c;明显快与无裂隙的基质通道。 裂隙为随机均匀分布。 注&#xff1a;本算例考虑浆液的渗滤效应。 浆液粘度随扩散距离增加而降低在模拟地下工程注浆过程时&#xff…

作者头像 李华
网站建设 2026/3/8 21:52:59

ssh隧道加密传输:保障PyTorch-CUDA-v2.8数据安全

SSH隧道加密传输&#xff1a;保障PyTorch-CUDA-v2.8数据安全 在深度学习项目日益依赖远程GPU服务器的今天&#xff0c;一个常见的场景是&#xff1a;你正在云上运行一个搭载了 PyTorch 与 CUDA 的 Docker 容器&#xff0c;准备调试模型训练代码。你启动了 Jupyter Notebook&…

作者头像 李华
网站建设 2026/3/4 3:56:22

cuda安装后无法识别GPU?检查驱动版本与PyTorch-CUDA-v2.8匹配

CUDA安装后无法识别GPU&#xff1f;检查驱动版本与PyTorch-CUDA-v2.8匹配 在深度学习项目中&#xff0c;你是否经历过这样的场景&#xff1a;明明装好了CUDA、配置了环境变量&#xff0c;运行 torch.cuda.is_available() 却返回 False&#xff1f;更令人抓狂的是&#xff0c;nv…

作者头像 李华