博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊elasticsearch的LagDetector
阅读量:6074 次
发布时间:2019-06-20

本文共 10345 字,大约阅读时间需要 34 分钟。

hot3.png

本文主要研究一下elasticsearch的LagDetector

LagDetector

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

/** * A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node * eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by * removing any lagging nodes from the cluster after a timeout. */public class LagDetector {    private static final Logger logger = LogManager.getLogger(LagDetector.class);    // the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster    public static final Setting
CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING = Setting.timeSetting("cluster.follower_lag.timeout", TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final TimeValue clusterStateApplicationTimeout; private final Consumer
onLagDetected; private final Supplier
localNodeSupplier; private final ThreadPool threadPool; private final Map
appliedStateTrackersByNode = newConcurrentMap(); public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer
onLagDetected, final Supplier
localNodeSupplier) { this.threadPool = threadPool; this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings); this.onLagDetected = onLagDetected; this.localNodeSupplier = localNodeSupplier; } public void setTrackedNodes(final Iterable
discoveryNodes) { final Set
discoveryNodeSet = new HashSet<>(); discoveryNodes.forEach(discoveryNodeSet::add); discoveryNodeSet.remove(localNodeSupplier.get()); appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet); discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node))); } public void clearTrackedNodes() { appliedStateTrackersByNode.clear(); } public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) { final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode); if (nodeAppliedStateTracker == null) { // Received an ack from a node that a later publication has removed (or we are no longer master). No big deal. logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion); } else { nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion); } } public void startLagDetector(final long version) { final List
laggingTrackers = appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList()); if (laggingTrackers.isEmpty()) { logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values()); } else { logger.debug("starting lag detector for version {}: {}", version, laggingTrackers); threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() { @Override public void run() { laggingTrackers.forEach(t -> t.checkForLag(version)); } @Override public String toString() { return "lag detector for version " + version + " on " + laggingTrackers; } }); } } @Override public String toString() { return "LagDetector{" + "clusterStateApplicationTimeout=" + clusterStateApplicationTimeout + ", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() + '}'; } // for assertions Set
getTrackedNodes() { return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet()); } private class NodeAppliedStateTracker { private final DiscoveryNode discoveryNode; private final AtomicLong appliedVersion = new AtomicLong(); NodeAppliedStateTracker(final DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; } void increaseAppliedVersion(long appliedVersion) { long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion)); logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion); } boolean appliedVersionLessThan(final long version) { return appliedVersion.get() < version; } @Override public String toString() { return "NodeAppliedStateTracker{" + "discoveryNode=" + discoveryNode + ", appliedVersion=" + appliedVersion + '}'; } void checkForLag(final long version) { if (appliedStateTrackersByNode.get(discoveryNode) != this) { logger.trace("{} no longer active when checking version {}", this, version); return; } long appliedVersion = this.appliedVersion.get(); if (version <= appliedVersion) { logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion); return; } logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion); onLagDetected.accept(discoveryNode); } }}
  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法
  • NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)

Coordinator

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

public class Coordinator extends AbstractLifecycleComponent implements Discovery {	//......    public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,                       NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,                       Supplier
persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection
> onJoinValidators, Random random) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings)); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this.persistedStateSupplier = persistedStateSupplier; this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen); configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider); this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry, this::handlePublishRequest, this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers, this::isInitialConfigurationSet, this::setInitialConfiguration); this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService, this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); } private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { masterService.submitStateUpdateTask("node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), nodeRemovalExecutor, nodeRemovalExecutor); } } } //......}
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

小结

  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法;NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

doc

转载于:https://my.oschina.net/go4it/blog/3049151

你可能感兴趣的文章
04-利用思维导图梳理JavaSE-面向对象(基础篇)
查看>>
【边缘检测 v0.7beta】——献给我的大学
查看>>
创建定性用户画像
查看>>
Dojo:不容忽视的RIA框架
查看>>
cygwin+NDK基本使用
查看>>
File类——File对象常见功能
查看>>
jsp中的类似 if - else 语句 的语法
查看>>
微信小程序开发指引
查看>>
视角 | 微服务的数据一致性解决方案
查看>>
百度停止社招
查看>>
probe wifi traffic in the air
查看>>
HTTP请求头与响应头 实例
查看>>
51CTO专访清无:Nginx_lua的应用及性能对比
查看>>
Python即时网络爬虫项目启动说明
查看>>
svn客户端常用命令
查看>>
Django学习笔记之——Views
查看>>
win32 下oracle 10.2.0.1.0 致命bug:ORA-27300
查看>>
学习笔记:对下拉菜单的简单封装
查看>>
纯手工打造漂亮的垂直时间轴,使用最简单的HTML+CSS+JQUERY完成100个版本更新记录......
查看>>
css/js在线压缩工具
查看>>