序
本文主要研究一下elasticsearch的LagDetector
LagDetector
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
/** * A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node * eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by * removing any lagging nodes from the cluster after a timeout. */public class LagDetector { private static final Logger logger = LogManager.getLogger(LagDetector.class); // the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster public static final SettingCLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING = Setting.timeSetting("cluster.follower_lag.timeout", TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final TimeValue clusterStateApplicationTimeout; private final Consumer onLagDetected; private final Supplier localNodeSupplier; private final ThreadPool threadPool; private final Map appliedStateTrackersByNode = newConcurrentMap(); public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer onLagDetected, final Supplier localNodeSupplier) { this.threadPool = threadPool; this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings); this.onLagDetected = onLagDetected; this.localNodeSupplier = localNodeSupplier; } public void setTrackedNodes(final Iterable discoveryNodes) { final Set discoveryNodeSet = new HashSet<>(); discoveryNodes.forEach(discoveryNodeSet::add); discoveryNodeSet.remove(localNodeSupplier.get()); appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet); discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node))); } public void clearTrackedNodes() { appliedStateTrackersByNode.clear(); } public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) { final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode); if (nodeAppliedStateTracker == null) { // Received an ack from a node that a later publication has removed (or we are no longer master). No big deal. logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion); } else { nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion); } } public void startLagDetector(final long version) { final List laggingTrackers = appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList()); if (laggingTrackers.isEmpty()) { logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values()); } else { logger.debug("starting lag detector for version {}: {}", version, laggingTrackers); threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() { @Override public void run() { laggingTrackers.forEach(t -> t.checkForLag(version)); } @Override public String toString() { return "lag detector for version " + version + " on " + laggingTrackers; } }); } } @Override public String toString() { return "LagDetector{" + "clusterStateApplicationTimeout=" + clusterStateApplicationTimeout + ", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() + '}'; } // for assertions Set getTrackedNodes() { return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet()); } private class NodeAppliedStateTracker { private final DiscoveryNode discoveryNode; private final AtomicLong appliedVersion = new AtomicLong(); NodeAppliedStateTracker(final DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; } void increaseAppliedVersion(long appliedVersion) { long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion)); logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion); } boolean appliedVersionLessThan(final long version) { return appliedVersion.get() < version; } @Override public String toString() { return "NodeAppliedStateTracker{" + "discoveryNode=" + discoveryNode + ", appliedVersion=" + appliedVersion + '}'; } void checkForLag(final long version) { if (appliedStateTrackersByNode.get(discoveryNode) != this) { logger.trace("{} no longer active when checking version {}", this, version); return; } long appliedVersion = this.appliedVersion.get(); if (version <= appliedVersion) { logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion); return; } logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion); onLagDetected.accept(discoveryNode); } }}
- LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
- startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法
- NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)
Coordinator
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
public class Coordinator extends AbstractLifecycleComponent implements Discovery { //...... public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, SupplierpersistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection > onJoinValidators, Random random) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings)); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this.persistedStateSupplier = persistedStateSupplier; this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen); configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider); this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry, this::handlePublishRequest, this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers, this::isInitialConfigurationSet, this::setInitialConfiguration); this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService, this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); } private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { masterService.submitStateUpdateTask("node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), nodeRemovalExecutor, nodeRemovalExecutor); } } } //......}
- Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task
小结
- LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
- startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法;NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)
- Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task