Big data operation and maintenance 4: How to increase the offline speed of nodes or avoid network storms due to node offline?

Background: The old cluster has thousands of nodes and stores 60PB of data. Due to historical reasons, the node configuration is different; in order to save costs, the first batch of nodes mounted 32 disks, the disk space of a single node is about 250TB, the load is about 150TB, and the number of storage blocks About 2.4 million blocks, the cluster bandwidth limit is 480GB/s, the standard node configuration is 12*8T, the current storage data is 50-60TB, and the block is about 800,000.

1. How to avoid network storms due to node dropping or decommissioning?

1.1 What is the difference between a dropped node and a node decommissioned?

Node decommissioning: first add the node to the decommissioning list normally, first tell the namenode and yarn not to submit new tasks and write data; then wait for the data block on the node to be replicated in the cluster; this time the decommissioned node is preferred As the data source of srcNode (the node in decommissioning is preferred as the copy data source src, because it has no write request and low load ), other nodes copy data blocks from the decommissioned node to other nodes, so the load of the node at this time It will be very high. After all data blocks are copied, the node status becomes Decommissioned, which can be checked from the namenodeURL interface. Note that the time for data node decommissioning to start copying is also 10 minutes and 30 seconds later, not because it is actively retired, because nannode and datanode always maintain a simple master-slave relationship, and the namenode node will not actively initiate any IPC to the datanode node. Call, all operations that the datanode node needs to complete with the namenode are returned by the DatanodeCommand carried in the heartbeat response of the two.

Node offline: For example, the datanode is forced to stop, the physical machine is hung up (such as high load, sudden network failure, hardware failure, etc.), these are all nodes that are offline, usually after 10 minutes and 30 seconds by default (mainly controlled by two parameters) ) The namenode will detect that the communication of the node is abnormally dropped. Then the namenode finds out all the blockid of the node and the machine where the corresponding copy is located according to the ip of the node, and arranges data replication through the heartbeat mechanism. At this time, the data is replicated, the data source is not the offline node, but one of multiple copies At the same time, the node where the copy is located also follows the rack awareness and copy shelve strategy at this time.

Screaming reminder: The difference between node offline and decommissioning is not only the data replication method, but also the namenode's data replication strategy for Under-Replicated Blocks (the level of data block replication is divided into 5 types); An extreme example is that the decommissioning data of a single-replica node will not be lost, and the data of a single-replica node will be lost if it is offline;

1.2 How to deal with various storms that occur when nodes are dropped

Nodes with dozens of terabytes, or even hundreds of terabytes, and millions of blocks will go offline, and there will be a lot of RPC storms, especially for large-scale high-load clusters, which is a big challenge for namenodes, which not only affects production performance, but also exists A great hidden danger, especially for clusters with limited bandwidth bottlenecks. Generally speaking, the value for namenode to detect whether the datanode is offline is 10*3s (heartbeat time) + 2*5min (namenode detection time, the parameter is: dfs.namenode.heartbeat.recheck-interval) = 10 minutes 30s. If the bandwidth continues to be full within 10min30s, the RPC request is delayed, and the communication between the datanode and the namenode is not smooth, it is easy to cause other nodes to continue to disconnect, forming a vicious circle. How should this situation be avoided?

1.21 First understand the block replication problem of datanode:

The NameNode maintains a replication priority queue. File blocks with insufficient copies are sorted by priority, and file blocks with only one copy have the highest copy priority. So if you look at the two copies of the cluster from here, as long as one block is abnormal, only one copy is left, which is the highest priority block replication, which will be replicated in storm mode. Poor control can easily affect the performance of the cluster or even cause the cluster to hang. Therefore, it is generally not recommended that the cluster replication factor be 2.

The five priority queues for the block to be replicated are as follows: In fact, it is in the private method getPriority of UnderReplicatedBlocks, which looks like this:

  1. L1 (highest): Blocks that are at risk of data loss, such as: 1. Blocks with only one copy (especially for blocks with 2 copies, one node offline) or these blocks have 0 active copies; 2. A single copy is in the process Blocks owned by the decommissioned node.
  2. L2: The actual value of block copy is much lower than the configured value (for example, 3 copies, 2 missing), that is, if the number of copies is less than 1/3 of the expected value, these blocks will be copied by the second priority. For example, if 3 copies of a block with 4 copies are lost or broken, it will be replicated prior to the loss of 2 blocks with 4 copies.
  3. L3: If there are not enough copies, those copies without high priority L2 will be copied first. The third priority.
  4. L4: The minimum number of copies of the block that meets the requirements. The demand for duplication degree is lower than that of L2-L3.
  5. L5 damaged block, and a non-corrupted copy is currently available

1.2.2 The parameters of the RPC storm when the control node drops

The three parameters are all parameters in hdfs-site.xml. You can refer to the official apache hadoop website for details. In fact, the copy speed of the block is determined by two aspects, one is the speed of namenode distribution tasks, and the other is the speed of copying between datanodes. The former can be understood as an entrance, and the latter can be regarded as an exit.

1. Entry parameters: control task distribution from the namenode level, this parameter modification must restart the namenode, and does not need to restart the datanode.

dfs.namenode.replication.work.multiplier.per.iteration 这个参数apache hadoop默认值2,cdh集群默认值10

This parameter determines the number of blocks that can be replicated to each DN when the NN and DN perform a heartbeat (3s) to send the task list. For example, if the cluster has 500 nodes, this value is set to 10, then the number of data blocks copied by the datanode can be sent by a heartbeat namnode is 10*500=5000. If a node is offline/decommissioned and 800000 blocks need to be copied, how long does it take for the namenode to distribute the tasks of the blocks to be copied to the datanodes.

极限计算的结果:任务分发时间=待复制block总数/(集群活跃dn*参数值)*心跳时间time=800000/(500*10)=160次心跳*3s/每次心跳=480s=8分钟 所以节点越多,会分发任务越快,分发速度跟节点数和这个参数都成正比


2. Export parameters: Compared with the task distribution control from the nanonode above, the following two use datanode level control, and these two parameters also need to restart the namenode

1.dfs.namenode.replication.max-streams apache hadoop默认值是2,cdh集群默认20。

The meaning of this parameter is to control the maximum number of threads for data replication of the datanode node. From the above we know that the replication priority of the block is divided into five types. This parameter controls the block copy that does not include the highest priority. That is, except for the highest priority copy flow limit

2.dfs.namenode.replication.max-streams-hard-limit 这个值apache hadoop默认值2,cdh集群默认值40

The meaning of this parameter is to control the number of streams copied by all priority blocks of the datanode, including the highest priority; generally the above and above two parameters are used in conjunction with each other.

Screaming summary: The former parameter controls the frequency at which the datanode accepts tasks, and the latter two parameters further limit the maximum amount of parallel thread network transmission that the DataNode can complete at one time. The specific value of the above parameters depends on the scale of the cluster and the configuration of the cluster, and cannot be the same. Generally speaking, it is easier to control from the entrance. For example, a cluster of 500 units, dfs.namenode.replication.work.multiplier.per.iteration=10, then the cluster will distribute 5000 blocks in one heartbeat. If the cluster file storage is all scattered on 500 nodes, each node will replicate 10 at the same time. A block (actually because of the copy shelving strategy, rack awareness, etc., not all nodes will participate in data replication), each block size is 128Mb, then the network load of each node is 128*10/3=546Mb/s, At this time, you have to look at whether there is a bandwidth bottleneck in combination with the actual situation, and whether such a large network IO will affect the calculation of normal tasks. If there is, this value should be reduced.

2. How to quickly go offline

The essence of how to get a node offline quickly is actually to increase the replication speed of the copy. It is mainly controlled by the above three parameters. The first is to control the distribution of namenode tasks, and the second is to control the replication rate of datanodes, provided that it does not affect the progress of normal production tasks. The smaller the cluster size, the slower the offline, for example, because the total number of distributions will be much slower.

For example, for a 500 node dfs.namenode.replication.work.multiplier.per.iteration=10, the value of a 50 node should be set to 100, so that the speed of the two namenode task distribution can be the same. Specifically combined with the actual cluster size settings.

Based on actual experience and usage value, 800,000 blocks of nodes, about 50T of data, 650-scale dn cluster, the data replication time is about ten minutes, the entire node offline time is only half an hour, it is already very fast At the same time, it did not affect the scheduling of normal tasks. It did not reach the bottleneck of the cluster bandwidth limit. It only used 75% of the total bandwidth, and went offline smoothly without affecting tasks.

dfs.namenode.replication.work.multiplier.per.iteration=4dfs.namenode.replication.max-streams=20   dfs.namenode.replication.max-streams-hard-limit=40