1、框架设计逻辑
核心组件
1、服务注册与发现(Redis)
- 使用Redis作为服务注册中心
- 以服务名称ApplicationName为key存储节点集群
- 基于时间戳的心跳机制(10秒间隔)
2、任务调度系统
- 数据库作为任务持久化存储
- 守护线程轮询获取新任务
- 基于负载的调度算法(选择负载最小节点)
3、双守护线程模型
- 节点监控守护线程:维护节点健康状态
- 任务发现守护线程:分配计算任务
具体简单时序图如下图所示
2、核心代码实现
2.1、框架核心实现
2.1.1、监听Spring应用启动事件
- 事件驱动:利用 Spring 应用启动事件,在合适时机启动监控功能
- 条件控制:通过配置控制功能是否启用,提高灵活性
- 功能整合:同时启动监控线程和执行类型注册,完成进程监控的初始化
1、启动守护线程
- ProcessDaemonServiceImpl 实现了 ApplicationListener 接口,监听 Spring 应用启动事件;
- 在应用启动完成后启动守护线程,监控节点存活状态和进程状态;
2、注册中心注册
- 获取应用上下文:从事件中获取 ApplicationContext
- 执行注册服务:获取 ProcessTypeRegisterService 并调用 doRegister()
@Slf4j @Component public class ProcessDaemonServiceImpl implements ApplicationListener<ApplicationStartedEvent> { @Autowired ProcessProperties processProperties; @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { if(processProperties.isEnable() && processProperties.getBusinessKeys().size()!=0){ startDaemonThread(); ApplicationContext applicationContext = applicationStartedEvent.getApplicationContext(); applicationContext.getBean(ProcessTypeRegisterService.class).doRegister(); } } // 启动守护线程 线程优先级设置为10(最高优先级) private void startDaemonThread() { Thread daemonThread2 = new Thread(processUtilServiceImpl.nodeAliveWatcher, "nodeAliveWatcher"); daemonThread2.setDaemon(true); daemonThread2.setPriority(10); // 启动线程 daemonThread2.start(); log.info("{},守护线程启动",daemonThread2.getName()); Thread daemonThread = new Thread(processUtilServiceImpl.processStatusWatcher, "processStatusWatcher"); // 设置为守护线程 daemonThread.setDaemon(true); daemonThread.setPriority(10); // 启动线程 daemonThread.start(); log.info("{},守护线程启动",daemonThread.getName()); } }2.1.2、监控器
2.1.2.1、节点保活监控器
无限循环运行的守护线程,负责监控节点的状态信息,分布式锁:使用 ALL_NODE_PROCESS_LOCK_KEY 确保集群中只有一个节点执行监控
策略:
- 定期更新:每 8 秒更新一次节点状态
- 分布式协调:通过分布式锁确保集群节点状态的一致性
- 负载信息维护:更新当前节点的负载信息
重启检测
- 重启标识:初始化时设置 isRestarted 为 true
- 状态同步:向集群其他节点通知当前节点重启状态
@Slf4j @Component public class ProcessUtilServiceImpl implements ProcessUtilService { /*** * 节点保活监视器 **/ public final Runnable nodeAliveWatcher = () -> { StatusDTO statusDTO = new StatusDTO(); statusDTO.setIsRestarted(true); statusDTO.setWeight(null); while (true) { try { ThreadSleepUtil.parkSeconds(8); String timeSlot = MyDateUtils.getTimeSlot(); ALL_NODE_PROCESS_LOCK_KEY = String.format(ALL_NODE_PROCESS_LOCK_KEY, applicationName,timeSlot); redissonDistributeLock.dealWithLock(ALL_NODE_PROCESS_LOCK_KEY, null, nodeProcessLoadServiceImpl.updateThisNodeInfoFunc, (param) -> { log.warn("节点保活监视器无法正常获取锁,无法更新节点状态"); return null; }, statusDTO); } catch (Exception e) { log.error("节点保活监视器异常"); log.error(e.getMessage(), e); } } }; }节点状态
@Data public class StatusDTO { private Random random = new Random(); private Boolean isRestarted = true; Long weight ; }节点状态更新机制
基于redis缓存去更新
- 维护节点状态:更新当前节点的存活状态和负载信息
- 权重管理:根据 StatusDTO 中的权重值调整节点负载
- 节点清理:移除长时间未更新的节点信息
public Function<StatusDTO, Void> updateThisNodeInfoFunc = (statusDTO) -> { try { Long dealingWeight = statusDTO.getWeight(); RMap<String, NodeProcessStastic> nodeDatas = redissonClient.getMap(ALL_NODE_PROCESS_KEY); String localHostIp = IpAddressUtil.getHostIp(); NodeProcessStastic nodeProcessStastic = nodeDatas.getOrDefault(localHostIp, new NodeProcessStastic()); nodeProcessStastic.setTimestamp(DateUtil.now()); nodeProcessStastic.setIpAddress(localHostIp.replaceAll("\\.", "-")); nodeProcessStastic.setSupportBusinessKeys(processProperties.getBusinessKeys()); if (dealingWeight != null) { if ((dealingWeight < 0 && nodeProcessStastic.getDealingProcessWeightSum()>0)|| dealingWeight > 0) { log.info("该机器:{},增加权重得分:{}", localHostIp, dealingWeight); nodeProcessStastic.setDealingProcessWeightSum(nodeProcessStastic.getDealingProcessWeightSum() + dealingWeight); nodeProcessStastic.setLastWeightChangeTimestamp(DateUtil.now()); } }else{ if(statusDTO.getIsRestarted()){ nodeProcessStastic.setDealingProcessWeightSum(0); nodeProcessStastic.setLastWeightChangeTimestamp(null); } } nodeDatas.put(localHostIp, nodeProcessStastic); // 移除可能已经重启了的pod int oriSize = nodeDatas.size(); nodeDatas.values().removeIf(ele -> { long btwTime = DateUtil.between(DateUtil.parse(ele.g