Skip to content

Commit

Permalink
Refactor ClusterContextManagerBuilder (#34031)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 12, 2024
1 parent 36ac4f5 commit 4c03a80
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class ModeConfigurationTest {

@Test
void assertIsCluster() {
ModeConfiguration standaloneModeConfiguration = new ModeConfiguration("Standalone", null);
assertFalse(standaloneModeConfiguration.isCluster());
ModeConfiguration clusterModeConfiguration = new ModeConfiguration("Cluster", null);
assertTrue(clusterModeConfiguration.isCluster());
ModeConfiguration standaloneModeConfig = new ModeConfiguration("Standalone", null);
assertFalse(standaloneModeConfig.isCluster());
ModeConfiguration clusterModeConfig = new ModeConfiguration("Cluster", null);
assertTrue(clusterModeConfig.isCluster());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,30 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
ModeConfiguration modeConfig = param.getModeConfiguration();
ClusterPersistRepositoryConfiguration config = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData(), param.getLabels()), modeConfig, eventBusContext);
ClusterPersistRepository repository = getClusterPersistRepository(config);
repository.init(config, computeNodeInstanceContext);
ClusterPersistRepository repository = getClusterPersistRepository(config, computeNodeInstanceContext);
LockContext<?> lockContext = new GlobalLockContext(new GlobalLockPersistService(repository));
computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()), lockContext);
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext);
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
registerOnline(computeNodeInstanceContext, param, result);
registerOnline(computeNodeInstanceContext, param, result, repository);
return result;
}

private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config) {
private ClusterPersistRepository getClusterPersistRepository(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
ShardingSpherePreconditions.checkNotNull(config, MissingRequiredClusterRepositoryConfigurationException::new);
return TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps());
ClusterPersistRepository result = TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(), config.getProps());
result.init(config, computeNodeInstanceContext);
return result;
}

private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
private void registerOnline(final ComputeNodeInstanceContext computeNodeInstanceContext, final ContextManagerBuilderParameter param, final ContextManager contextManager,
final ClusterPersistRepository repository) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
EventSubscriberRegistry eventSubscriberRegistry = new EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(contextManager).getSubscribers());
eventSubscriberRegistry.register(new ClusterDeliverEventSubscriberRegistry(repository).getSubscribers());
eventSubscriberRegistry.register(new ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.Getter;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collection;
Expand All @@ -33,7 +32,7 @@ public final class ClusterDeliverEventSubscriberRegistry {

private final Collection<EventSubscriber> subscribers;

public ClusterDeliverEventSubscriberRegistry(final ContextManager contextManager) {
subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber((ClusterPersistRepository) contextManager.getPersistServiceFacade().getRepository()));
public ClusterDeliverEventSubscriberRegistry(final ClusterPersistRepository repository) {
subscribers = Collections.singleton(new DeliverQualifiedDataSourceSubscriber(repository));
}
}

0 comments on commit 4c03a80

Please sign in to comment.