LocalGrainDirectory 详解 LocalGrainDirectory是Orleans分布式系统中负责本地Grain目录管理的核心组件,它实现了分布式哈希表(DHT)风格的Grain定位服务。
类图
LocalGrainDirectory
-ILogger log
-SiloAddress? seed
-ISiloStatusOracle siloStatusOracle
-IInternalGrainFactory grainFactory
-object writeLock
-IServiceProvider _serviceProvider
-DirectoryMembership directoryMembership
-bool Running
-Catalog? _catalog
+SiloAddress MyAddress
+IGrainDirectoryCache DirectoryCache
+LocalGrainDirectoryPartition DirectoryPartition
+RemoteGrainDirectory RemoteGrainDirectory
+RemoteGrainDirectory CacheValidator
+GrainDirectoryHandoffManager HandoffManager
+Start() : void
+StopAsync() : Task
+SiloStatusChangeNotification(SiloAddress, SiloStatus) : void
+RegisterAsync(GrainAddress, int) : Task<AddressAndTag>
+UnregisterAsync(GrainAddress, UnregistrationCause, int) : Task
+LookupAsync(GrainId, int) : Task<AddressAndTag>
+LocalLookup(GrainId, out AddressAndTag) : bool
+CalculateGrainDirectoryPartition(GrainId)
«interface»
ILocalGrainDirectory
+Start() : void
+StopAsync() : Task
+RegisterAsync(GrainAddress, int) : Task<AddressAndTag>
+UnregisterAsync(GrainAddress, UnregistrationCause, int) : Task
+LookupAsync(GrainId, int) : Task<AddressAndTag>
+LocalLookup(GrainId, out AddressAndTag) : bool
«interface»
ISiloStatusListener
+SiloStatusChangeNotification(SiloAddress, SiloStatus) : void
«interface»
ILifecycleParticipant<ISiloLifecycle>
+Participate(ISiloLifecycle) : void
LocalGrainDirectoryPartition
+AddSingleActivation(GrainAddress, GrainAddress?) : AddressAndTag
+RemoveActivation(GrainId, ActivationId, UnregistrationCause) : void
+LookUpActivation(GrainId) : AddressAndTag
+GetItems()
«interface»
IGrainDirectoryCache
+AddOrUpdate(GrainAddress, int) : void
+LookUp(GrainId, out GrainAddress) : bool
+Remove(GrainId) : bool
+KeyValues IEnumerable~(GrainAddress, int)
协作图 Client LocalGrainDirectory LocalGrainDirectoryPartition DirectoryCache RemoteGrainDirectory SiloStatusOracle Grain注册流程 RegisterAsync(grainAddress, hopCount) CalculateGrainDirectoryPartition(grainId) AddSingleActivation(address, previousAddress) AddressAndTag result AddOrUpdate(result.Address, result.VersionTag) 返回注册结果 RegisterAsync(address, previousAddress, hopCount+1) AddressAndTag result AddOrUpdate(result.Address, result.VersionTag) 返回注册结果 alt [当前Silo是Owner] [需要转发到其他Silo] Grain查找流程 LookupAsync(grainId, hopCount) LookUp(grainId, out address) 返回缓存地址 返回查找结果 CalculateGrainDirectoryPartition(grainId) LookUpActivation(grainId) AddressAndTag result 返回查找结果 LookupAsync(grainId, hopCount+1) AddressAndTag result AddOrUpdate(result.Address, result.VersionTag) 返回查找结果 alt [当前Silo是Owner] [需要转发到其他Silo] alt [缓存命中且Silo有效] [缓存未命中或无效] Silo状态变化处理 SiloStatusChangeNotification(updatedSilo, status) AddServer(updatedSilo) AdjustLocalDirectory(silo, dead: false) AdjustLocalCache(silo, dead: false) RemoveServer(updatedSilo, status) AdjustLocalDirectory(silo, dead: true) AdjustLocalCache(silo, dead: true) alt [Silo变为Active状态] [Silo变为Terminating状态] Client LocalGrainDirectory LocalGrainDirectoryPartition DirectoryCache RemoteGrainDirectory SiloStatusOracle
核心功能详解 1. 分布式哈希表(DHT)分区算法 LocalGrainDirectory使用一致性哈希算法来确定每个Grain的目录分区所有者:
public SiloAddress? CalculateGrainDirectoryPartition ( GrainId grainId) { int hash= unchecked ( ( int ) grainId. GetUniformHashCode ( ) ) ; // 在排序的Silo列表中查找第一个哈希值小于等于目标哈希的Silo for ( var index= existing. MembershipRingList. Count- 1 ; index>= 0 ; -- index) { var item= existing. MembershipRingList[ index] ; if ( IsSiloNextInTheRing ( item, hash, excludeMySelf) ) { return item; } } } 2. 请求转发机制 当当前Silo不是Grain的目录所有者时,会进行请求转发:
public SiloAddress? CheckIfShouldForward ( GrainId grainId, int hopCount, string operationDescription) { var owner= CalculateGrainDirectoryPartition ( grainId) ; if ( owneris null || owner. Equals ( MyAddress) ) return null ; if ( hopCount>= HOP_LIMIT) // 跳数限制为6 throw new OrleansException ( $"Hop limit reached" ) ; return owner; // 转发到目标Silo } 3. 缓存管理 LocalGrainDirectory维护本地缓存以提高查找性能:
public bool LocalLookup ( GrainId grain, out AddressAndTag result) { // 首先检查缓存 var address= GetLocalCacheData ( grain) ; if ( address!= default ) { result= new ( address, 0 ) ; return true ; } // 如果是本地分区,检查本地目录 if ( silo. Equals ( MyAddress) ) { result= GetLocalDirectoryData ( grain) ; return result. Address!= null ; } return false ; } 4. 集群成员变化处理 LocalGrainDirectory监听Silo状态变化并相应调整目录和缓存:
public void SiloStatusChangeNotification ( SiloAddress updatedSilo, SiloStatus status) { if ( status. IsTerminating ( ) ) { CacheValidator. WorkItemGroup. QueueAction ( ( ) => RemoveServer ( updatedSilo, status) ) ; } else if ( status== SiloStatus. Active) { CacheValidator. WorkItemGroup. QueueAction ( ( ) => AddServer ( updatedSilo) ) ; } } 关键设计特点 线程安全 :使用writeLock对象确保目录操作的线程安全容错性 :支持Silo故障时的目录分区重新分配性能优化 :本地缓存减少远程查找开销可扩展性 :支持动态集群成员变化一致性 :确保目录信息在集群中的一致性LocalGrainDirectory是Orleans分布式系统的核心组件,它通过分布式哈希表算法实现了高效的Grain定位服务,为整个系统的可扩展性和可靠性提供了基础支撑。