[Data Open] Dark Horse-Big Data Development

Dark Horse Big Data Development-Course Notes

1. Hadoop

Insert picture description here

1.1. Introduction to Hadoop

  • Broadly speaking, Hadoop usually refers to a broader concept-the Hadoop ecosystem.
  • In a narrow sense, Hadoop refers to Apache, an open source framework. Its core components are:
  • HDFS (Distributed File System): Solve massive data storage
  • YARN (the framework of job scheduling and cluster resource management): solve resource task scheduling
  • MAPREDUCE (distributed computing programming framework): Solve massive data calculations

1.2, Hadoop feature advantages

  • Scalable: Hadoop distributes data and completes computing tasks among available computer clusters. These clusters can be easily expanded to thousands of nodes.
  • Low cost (Economical): Hadoop distributes and processes data by forming a cluster of ordinary cheap machines to distribute and process data, so that the cost is very low.
  • Efficient: Through concurrent data, Hadoop can dynamically move data in parallel between nodes, making it very fast.
  • Reliability (Rellable): It can automatically maintain multiple copies of data, and can automatically redeploy computing tasks after the task fails. Therefore, Hadoop's ability to store and process data by bit is worthy of people's trust.

1.3. Which processes need to be started in Hadoop cluster in Hadoop cluster, and what are their roles?

  • namenode => HDFS daemon, responsible for maintaining the entire file system, storing the metadata information of the entire file system, image+edit log
  • datanode => is the working node of the specific file system. When we need some data, the namenode tells us where to find it, and it directly communicates with the background process of the server corresponding to that DataNode. The DataNode performs data retrieval and then performs specific Read/write operation
  • secondarynamenode => a daemon process, which is equivalent to a backup mechanism for namenode metadata. It is updated regularly, communicates with namenode, merges image and edits on namenode, and can be used as a backup for namenode
  • resourcemanager => is the daemon process of the yarn platform, responsible for the allocation and scheduling of all resources, and the client’s request is responsible for monitoring nodemanager
  • nodemanager => is the resource management of a single node, executes specific tasks and commands from resourcemanager
  • When DFSZKFailoverController is highly available, it is responsible for monitoring the status of NN and writing status information to ZK in a timely manner. It uses an independent thread to periodically call a specific interface on the NN to obtain the health status of the NN. FC also has the right to choose who is the Active NN, because there are only two nodes at most, and the current selection strategy is relatively simple (first come first served, rotation).
  • JournalNode stores the editlog file of namenode under high availability

1.4, Hadoop main configuration files


  • What is set in the file are the environment variables needed for Hadoop to run. JAVA_HOME must be set. Even if JAVA_HOME is set in our current system, it is not recognized, because even if Hadoop is executed on the local machine, it also regards the current execution environment as a remote server.


Set the file system address of Hadoop



Specify the number of HDFS replicas

The ip and port of the host where the secondary namenode is located




Specify the mr runtime framework, here is specified on yarn, the default is local



Specify the address of YARN's main role (ResourceManager)


1.5, Hadoop cluster important commands


  • hadoop namenode –format

Start dfs

  • start-dfs.sh

Start yarn

  • start-yarn.sh

Start the task history server

  • mr-jobhistory-daemon.sh start historyserver

one-button start

  • start-all.sh

After successful startup:

  • NameNode http://nn_host:port/ default 50070.
  • ResourceManagerhttp://rm_host:port/ default 8088
Insert picture description here
Insert picture description here
Option nameUse formatmeaning
-ls-ls <path>View the current directory structure of the specified path
-lsr-lsr <path>Recursively view the directory structure of the specified path
-du-du <path>Count the size of the file in the directory
-dus-dus <path>File (folder) size in summary statistics directory
-count-count [-q] <path>Count the number of files (folders)
-mv-mv <source path> <destination path>mobile
-cp-cp <source path> <destination path>copy
-rm-rm [-skipTrash] <path>Delete file/blank folder
-rmr-rmr [-skipTrash] <path>Delete recursively
-put-put <multiple files on linux> <hdfs path>upload files
-copyFromLocal-copyFromLocal <multiple files on linux> <hdfs path>Copy from local
-moveFromLocal-moveFromLocal <multiple files on linux> <hdfs path>Move from local
-getmerge-getmerge <source path> <linux path>Merge to local
-cat-cat <hdfs path>View file content
-text-text <hdfs path>View file content
-copyToLocal-copyToLocal [-ignoreCrc] [-crc] [hdfs source path] [linux destination path]Copy from local
-moveToLocal-moveToLocal [-crc] <hdfs source path> <linux destination path>Move from local
-mkdir-mkdir <hdfs path>Create a blank folder
-setrep-setrep [-R] [-w] <number of copies> <path>Modify the number of copies
-touchz-touchz <file path>Create blank file
-stat-stat [format] <path>Show file statistics
-tail-tail [-f] <file>View information at the end of the file
-chmod-chmod [-R] <permission mode> [path]Modify permissions
-chown-chown [-R] [owner][:[group]] pathModify owner
-chgrp-chgrp [-R] The path of the group nameModify group
-help-help [command option]help

1.6, HDFS trash can mechanism

Modify core-site.xml


This time is in minutes, for example 1440=24h=1 day. The default configuration attribute of HDFS garbage collection is 0, which means that if you accidentally delete something by mistake, this operation is unrecoverable.

1.7, HDFS write data process

Insert picture description here

Detailed steps :

  • 1) The client requests the namenode to upload a file through the Distributed FileSystem module, and the namenode checks whether the target file already exists and whether the parent directory exists.
  • 2) The namenode returns whether it can be uploaded.
  • 3) Which datanode servers the client requests to upload the first block to.
  • 4) Namenode returns 3 datanode nodes, namely dn1, dn2, and dn3.
  • 5) The client requests dn1 to upload data through the FSDataOutputStream module. When dn1 receives the request, it will continue to call dn2, and then dn2 will call dn3 to complete the establishment of the communication channel.
  • 6) dn1, dn2, and dn3 respond to the client level by level.
  • 7) The client starts to upload the first block to dn1 (first read the data from the disk and put it into a local memory cache), in the unit of packet (the size is 64k), dn1 receives a packet and then transmits it to dn2, and dn2 transmits it. To dn3; each packet transmitted by dn1 will be put into a reply queue to wait for a reply.
  • 8) After the transfer of a block is completed, the client again requests the namenode to upload the second block to the server.

1.8, Hadoop read data process

Insert picture description here

Detailed steps :

  • 1) The client requests the namenode to download the file through the Distributed FileSystem, and the namenode finds the address of the datanode where the file block is located by querying the metadata.
  • 2) Pick a datanode (the nearest principle, then random) server and request to read the data.
  • 3) The datanode starts to transmit data to the client (read the data input stream from the disk, check it in the unit of packet, the size is 64k).
  • 4) The client receives it in packets, first caches it locally, and then writes it to the target file. Author: Li Xiaoli's Road

1.9, the role of SecondaryNameNode

​ The responsibility of the NameNode is to manage metadata information, and the responsibility of the DataNode is to be responsible for the specific storage of data, so what is the role of the SecondaryNameNode?

Answer: Its responsibility is to merge the edit logs of the NameNode into the fsimage file .

​ Every time the trigger condition is reached [up to an hour, or the number of things reaches 1 million], the secondary namenode will download all edits accumulated on the namenode and a latest fsimage to the local, and load it into the memory for merge (this process is called checkpoint ),As shown below:

Insert picture description here

1.10, HDFS expansion and contraction (interview)

1. Dynamic expansion

​ With the growth of the company's business, the amount of data is getting larger and larger, and the capacity of the original datanode node can no longer meet the needs of storing data. It is necessary to dynamically add new data nodes on the basis of the original cluster. It is also commonly known as dynamic expansion .

​ Sometimes the old server needs to be retired and replaced, and the service is suspended. It may be necessary to stop the hadoop service on some machines in the current cluster, commonly known as dynamic scaling .

1.1. Basic preparation

In the basic preparation part, it is mainly to set up the system environment of hadoop running

Modify the hostname of the new machine system (modify through /etc/sysconfig/network)

Insert picture description here

Modify the hosts file and configure the hosts of all nodes in the cluster (all nodes in the cluster maintain the unified hosts file)

Insert picture description here

Set up password-free login from NameNode to DataNode (implemented by ssh-copy-id command)

Modify the slaves file of the master node and add the ip information of the new node (used with the one-click startup script when the cluster is restarted)

Insert picture description here

Upload and decompress a new Hadoop installation package on the new machine, and scp all Hadoop configuration files from the master node machine to the new node.

1.2. Add datanode

  • On the machine where the namenode is located

Create a dfs.hosts file in the /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop directory

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim dfs.hosts






  • Add the dfs.hosts property to the hdfs-site.xml configuration file of the namenode machine

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim hdfs-site.xml


​ The meaning of the dfs.hosts property: Name a file that contains a list of hosts allowed to connect to the namenode. The full path name of the file must be specified. If the value is empty, all hosts are allowed. It is equivalent to a whitelist, and it does not need to be configured.

Start the datanode separately on the new machine: hadoop-daemon.sh start datanode

[External link image transfer failed. The source site may have an anti-leech link mechanism. It is recommended to save the image and upload it directly (img-mlaAAmj3-1622638088185)(assert/1582387814523.png)]

Refresh the page and you can see that new nodes have joined in

[External link image transfer failed. The source site may have an anti-leech link mechanism. It is recommended to save the image and upload it directly (img-ZGq5Z7xg-1622638088186)(assert/1582387823813.png)]

1.3. datanode load balancing service

The newly added nodes have no storage of data blocks, making the overall load of the cluster unbalanced. Therefore, finally you need to set the balance of hdfs load, because the default data transmission bandwidth is relatively low, it can be set to 64M, that is, hdfs dfsadmin -setBalancerBandwidth 67108864.

The threshold of the default balancer is 10%, that is, the difference between the total storage usage of each node and the cluster is no more than 10%, we can set it to 5%. Then start Balancer,

sbin/start-balancer.sh -threshold 5, wait for the cluster self-balancing to complete.

1.4. Add nodemanager

Start nodemanager separately on the new machine:

yarn-daemon.sh start nodemanager

Insert picture description here

In ResourceManager, check the cluster status through yarn node -list

Insert picture description here

2. Dynamic shrink

2.1. Add decommissioned node

Create a dfs.hosts.exclude file in the hadoop configuration directory etc/hadoop of the server where the namenode is located, and add the name of the host that needs to be decommissioned.

Note: The real host name or ip address must be written in this file, not node-4


Add the dfs.hosts.exclude property to the hdfs-site.xml configuration file of the namenode machine

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim hdfs-site.xml


The meaning of the dfs.hosts.exclude property: Name a file that contains a list of hosts that are not allowed to connect to the namenode. The full path name of the file must be specified. If the value is empty, no host is excluded.

2.2. Refresh cluster

Execute the following commands on the machine where the namenode is located, refresh the namenode and refresh the resourceManager.

hdfs dfsadmin -refreshNodes

yarn rmadmin –refreshNodes

Insert picture description here

Wait for the status of the decommissioned node to be decommissioned (all blocks have been copied), stop the node and the node resource manager. Note: If the number of replicas is 3 and the number of serving nodes is less than or equal to 3, the decommissioning cannot be successful. You need to modify the number of replicas before you can decommission.

node-4 execute the following command to stop the node process

cd /export/servers/hadoop-2.6.0-cdh5.14.0

sbin/hadoop-daemon.sh stop datanode

sbin/yarn-daemon.sh stop nodemanager

The node where the namenode is located execute the following command to refresh the namenode and resourceManager

hdfs dfsadmin –refreshNodes

yarn rmadmin –refreshNodes

The node where the namenode is located execute the following command to balance the load

cd /export/servers/hadoop-2.6.0-cdh5.14.0/


1.11, HDFS security mode

​ The security mode is a special state of HDFS. In this state, the file system only accepts data read requests, but does not accept changes such as deletions and modifications. It is a protection mechanism to ensure data in the cluster. Block security.

​ When the NameNode master node starts, HDFS first enters the safe mode, and the cluster starts to check the integrity of the data block. When the DataNode is started, it will report the available block information to the namenode. When the entire system reaches the security standard, HDFS will automatically leave the security mode.

Manually enter safe mode

hdfs dfsadmin -safemode enter

Manually leave safe mode

hdfs dfsadmin -safemode leave

1.12, rack awareness

​ Hadoop itself does not have the ability to sense the rack, and must be artificially set to achieve this goal. One is to perform the mapping by configuring a script; the other is to complete the network location mapping by implementing the resolve() method of the DNSToSwitchMapping interface.

  • 1. Write a script, then put it in the core-site.xml configuration file of hadoop, and call it with namenode and jobtracker.
#-*-coding:UTF-8 -*-
import sys

rack = {"hadoop-node-31":"rack1",

if __name__=="__main__":
        print "/" + rack.get(sys.argv[1],"rack0")

2. Give the script executable permission chmod +x RackAware.py, and put it in the bin/ directory.

3. Then open conf/core-site.html


4. Restart the Hadoop cluster

namenode log

2012-06-08 14:42:19,174 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from storage DS-1155827498-
2012-06-08 14:42:19,204 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack2/
2012-06-08 14:42:19,205 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from storage DS-1773813988-
2012-06-08 14:42:19,226 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack2/
2012-06-08 14:42:19,226 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from storage DS-2024494948-
2012-06-08 14:42:19,242 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack1/
2012-06-08 14:42:19,242 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from storage DS-767528606-
  2012-06-08 14:42:49,492 INFO org.apache.hadoop.hdfs.StateChange: STATE* Network topology has 2 racks and 10 datanodes
  2012-06-08 14:42:49,492 INFO org.apache.hadoop.hdfs.StateChange: STATE* UnderReplicatedBlocks has 0 blocks
  2012-06-08 14:42:49,642 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: First cycle completed 0 blocks in 0 msec
  2012-06-08 14:42:49,642 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: Queue flush completed 0 blocks in 0 msec processing time, 0 msec clock time, 1 cycles

2. MapReduce

2.1. Introduction to MapReduce

​ The core idea of ​​MapReduce is " divide and conquer ", which is suitable for a large number of complex task processing scenarios (large-scale data processing scenarios).

​ Map is responsible for "dividing", that is, decomposing complex tasks into several "simple tasks" for parallel processing. The prerequisite for splitting is that these small tasks can be calculated in parallel, and there is almost no dependency between each other.

​ Reduce is responsible for "combination", that is, global aggregation of the results of the map phase.

Figure: MapReduce thinking model

Figure: MapReduce thinking model

2.2, can write Wordcount

  • Define a mapper class
//keyin:  LongWritable    valuein: Text
//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	//map方法的生命周期:  框架每传一行数据就被调用一次
	//key :  这一行的起始点在文件中的偏移量
	//value: 这一行的内容
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] words = line.split(" ");
		for(String word:words){
			context.write(new Text(word), new IntWritable(1));
  • Define a reducer class
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int count = 0;
		for(IntWritable value:values){
			count += value.get();
		context.write(key, new IntWritable(count));
  • Define a main class to describe the job and submit the job
public class WordCountRunner {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job wcjob = Job.getInstance(conf);
//		wcjob.setJar("/home/hadoop/wordcount.jar");
		FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
		FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
		boolean res = wcjob.waitForCompletion(true);

2.3 Combiner

​ Each map may generate a large amount of local output. The function of the Combiner is to merge the output of the map side first to reduce the amount of data transmission between the map and reduce nodes to improve the network IO performance.

​ For example: For the example of wordcount that comes with Hadoop, value is a superimposed number,
so once the map is over, the value of reduce can be superimposed, and it is not necessary to wait for the end of all maps to superimpose the value of reduce.

Insert picture description here
  • Specific use

public static class MyCombiner extends  Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                Text key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {

            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
            context.write(key, new LongWritable(count));

  • Add in the main class
    // 设置Map规约Combiner


2.4, partitioner

​ When performing MapReduce calculations, sometimes the final output data needs to be divided into different files. For example, if you divide by province, you need to put the data of the same province into one file; if you divide according to gender, you need to divide the data of the same gender. Put the data in a file. The class responsible for partitioning data is called Partitioner.

  • HashPartitioner source code is as follows
package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  • The key and value respectively refer to the output of the Mapper task. numReduceTasks refers to the number of Reducer tasks set. The default value is 1 . Then the remainder of any integer divided by 1 must be 0. That is to say, the return value of the getPartition(...) method is always 0. That is, the output of the Mapper task is always sent to a Reducer task, and can only be output to a file in the end.

Concrete realization :

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FivePartitioner extends Partitioner<IntWritable, IntWritable>{

     * 我们的需求:按照能否被5除尽去分区
     * 1、如果除以5的余数是0,  放在0号分区
     * 2、如果除以5的余数不是0,  放在1分区
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int intValue = key.get();
        if(intValue % 5 == 0){
            return 0;
           return 1;
  • Add the following two lines of code to the main function:

2.5, the execution process of MapReduce

Insert picture description here

Detailed process

Map stage

l The first stage is to logically slice the files in the input directory one by one according to certain standards to form a slice plan. By default, Split size = Block size. Each slice is processed by a MapTask. (GetSplits)

l The second stage is to parse the data in the slice into <key, value> pairs according to certain rules. The default rule is to parse each line of text into key-value pairs. The key is the starting position of each line (in bytes), and the value is the text content of the line. (TextInputFormat)

l The third stage is to call the map method in the Mapper class. For each <k,v> parsed in the previous stage, the map method is called once. Each call to the map method will output zero or more key-value pairs.

l The fourth stage is to partition the key-value pairs output by the third stage according to certain rules. The default is only one zone. The number of partitions is the number of Reducer tasks running. There is only one Reducer task by default.

l The fifth stage is to sort the key-value pairs in each partition. First, sort by key. For key-value pairs with the same key, sort by value. For example, three key-value pairs <2,2>, <1,3>, <2,1>, the key and value are integers respectively. Then the sorted result is <1,3>, <2,1>, <2,2>. If there is a sixth stage, then enter the sixth stage; if not, directly output to the file.

l The sixth stage is to perform partial aggregation processing on the data, that is, combiner processing. The reduce method is called once for key-value pairs with equal keys. After this stage, the amount of data will decrease. There is no default at this stage.

reduce phase

l The first stage is that the Reducer task will actively copy its output key-value pairs from the Mapper task. There may be many Mapper tasks, so the Reducer will copy the output of multiple Mappers.

l The second stage is to copy the local data to the Reducer and merge them all, that is, merge the scattered data into one big data. Then sort the merged data.

l The third stage is to call the reduce method on the sorted key-value pairs. The reduce method is called once for key-value pairs with equal keys, and each call will produce zero or more key-value pairs. Finally, these output key-value pairs are written to the HDFS file.

2.6, the shuffle phase of MapReduce

Insert picture description here

Shuffle is called the heart of MapReduce and is the core of MapReduce.

As can be seen from the above figure, each data slice is processed by a Mapper process, which means that mapper is only a part of the processing file.

Each Mapper process has a circular memory buffer to store the output data of the Map. The default size of this memory buffer is 100MB. When the data reaches the threshold of 0.8, which is 80MB, a background program will Overwrite data to disk. In the process of writing data overflow to the disk, there is a complicated process. First, the data should be partitioned and sorted (according to the partition number such as 0, 1, 2). After the partition is completed, in order to avoid the memory overflow of the Map output data, the Map The output data of the map is divided into small files and then partitioned, so that the output data of the map will be divided into the sorted data of the partition with multiple small files. Then merge the partition data of each small file into one big file (combine those with the same partition number in each small file).

At this time, Reducer starts three reducers 0, 1, 2. 0 will get the data of partition 0; Reducer 1 will get the data of partition 1; Reducer 2 will get the data of partition 2.

2.7, MapReduce optimization

//The following parameters can take effect when configured in the user's own MapReduce application

(1) mapreduce.map.memory.mb: The upper limit of memory that can be used by a Map Task (unit: MB), the default is 1024. If the amount of resources actually used by the Map Task exceeds this value, it will be forcibly killed.

(2) mapreduce.reduce.memory.mb: The upper limit of resources that can be used by a Reduce Task (unit: MB), the default is 1024. If the amount of resources actually used by the Reduce Task exceeds this value, it will be forcibly killed.

(3) mapreduce.map.cpu.vcores: The maximum number of CPU cores available for each Maptask, default value: 1

(4) mapreduce.reduce.cpu.vcores: The maximum number of CPU cores available for each Reducetask Default value: 1

(5) mapreduce.map.java.opts: Map Task JVM parameters, you can configure the default java heap here

Parameters such as size, for example: "-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]"

(@[email protected] will be automatically replaced by the corresponding taskid by the Hadoop framework), default value: “”

(6) mapreduce.reduce.java.opts: JVM parameters of Reduce Task, you can configure the default java here

Parameters such as heap size, for example: "-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]", default value: ""

//It should be configured in the server configuration file before yarn starts to take effect

(1) yarn.scheduler.minimum-allocation-mb The minimum configuration requested by each container in RM, in MB, the default is 1024.

(2) yarn.scheduler.maximum-allocation-mb The maximum allocation requested by each container in the RM, in MB, the default is 8192.

(3) yarn.scheduler.minimum-allocation-vcores 1

(4) yarn.scheduler.maximum-allocation-vcores 32

(5) yarn.nodemanager.resource.memory-mb represents the total amount of physical memory available for YARN on the node, the default is 8192 (MB), note that if your node's memory resources are not enough 8GB, you need to reduce this value , And YARN will not intelligently detect the total physical memory of the node.

//Key parameters for shuffle performance optimization should be configured before yarn starts

(1) The ring buffer size of mapreduce.task.io.sort.mb 100 shuffle, the default is 100m

(2) mapreduce.map.sort.spill.percent 0.8 Threshold of ring buffer overflow, default 80%

(1) mapreduce.map.maxattempts: The maximum number of retries for each Map Task. Once the retry parameter exceeds this value, the Map Task is considered to have failed. The default value: 4.

(2) mapreduce.reduce.maxattempts: The maximum number of retries for each Reduce Task. Once the retry parameter exceeds this value, the Map Task is considered to have failed. The default value: 4.

(3) mapreduce.map.failures.maxpercent: When the failure ratio of failed Map Tasks exceeds this value, the entire job will fail. The default value is 0. If your application allows part of the input data to be discarded, this value is set to one A value greater than 0, such as 5, means that if less than 5% of the Map Task fails (if a Map Task retry times exceed mapreduce.map.maxattempts, the Map Task is considered to have failed, and the corresponding input data will not be generated. Any results), the whole assignment is considered successful.

(4) mapreduce.reduce.failures.maxpercent: When the proportion of failed Reduce Task failures exceeds this value, the entire job will fail. The default value is 0.

(5) mapreduce.task.timeout: If a task does not enter within a certain period of time, that is, no new data will be read, and no data will be output, the task is considered to be in the block state, which may be temporarily stuck, or perhaps forever Will get stuck. In order to prevent the user program from being blocked forever, a timeout period (in milliseconds) is forcibly set. The default is 600000. A value of 0 will disable the timeout.

2.7.3, efficiency and stability parameters

(1) mapreduce.map.speculative: Whether to open the speculative execution mechanism for the Map Task, the default is true, if it is true, multiple instances of some Map tasks can be executed in parallel.

(2) mapreduce.reduce.speculative: Whether to open the speculative execution mechanism for Reduce Task, the default is true

(3) mapreduce.input.fileinputformat.split.minsize: The minimum slice size when slicing FileInputFormat, the default is 1.

(5)mapreduce.input.fileinputformat.split.maxsize: Maximum slice size when FileInputFormat is sliced

2.8, the execution process of the mapreduce program on yarn

Insert picture description here

Detailed process :

  • One: The client submits a task to the cluster, and the task first goes to the ApplicationManager in the ResourceManager;
  • Two: After the ApplicationManager receives the task, it will find a NodeManager in the cluster and start an AppMaster process on the DataNode where the NodeManager is located. This process is used for task division and task monitoring;
  • Three: After AppMaster is started, it will register its information with ApplicationManager in ResourceManager (the purpose is to communicate with it);
  • Four: AppMaster applies to ResourceScheduler under ResourceManager for the resources required for computing tasks;
  • Five: After AppMaster applies for resources, it will communicate with all NodeManagers and ask them to start tasks (Map and Reduce) required for computing tasks;
  • Six: Each NodeManager starts the corresponding container to execute Map and Reduce tasks;
  • Seven: Each task will report its execution progress and execution status to AppMaster, so that AppMaster can keep track of the running status of each task at any time, and restart the execution of the task after a certain task has a problem;
  • Eight: After the task is executed, AppMaster reports to the ApplicationManager so that the ApplicationManager can log off and close itself, so that the resources can be recycled;

2.9. Common problems in the implementation of MapReduce

  1. The client does not have permission to operate HDFS in the cluster

The output path of mapreduce already exists, and that path must be deleted first

Submit the cluster to run, the run fails


The log cannot be typed out, and a warning message is reported

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).  
log4j:WARN Please initialize the log4j system properly.  
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.  

Need to create a new file named log4j.properties under the src of the project

3. Yarn

3.1. Introduction to yarn

​ The general resource management system and scheduling platform can provide unified resource management and scheduling for upper-level applications. Yarn can be understood as equivalent to a distributed operating system platform, and computing programs such as mapreduce are equivalent to applications running on the operating system. Yarn provides these programs with the resources (memory, cpu) required for computing.

3.2, the basic architecture of yarn

Insert picture description here

YARN is a framework for resource management and task scheduling, which mainly includes three modules: ResourceManager (RM), NodeManager (NM), and ApplicationMaster (AM).

ResourceManager responsible for monitoring all resources, allocation and management;

ApplicationMaster responsible for scheduling and coordinating each specific application ;

NodeManager responsible for the maintenance of each node.

​ For all applications, RM has absolute control and the right to allocate resources. Each AM will negotiate resources with the RM and communicate with the NodeManager to execute and monitor tasks.

3.3, three major components of yarn

3.3.1. ResourceManager

  • ResourceManager is responsible for the resource management and allocation of the entire cluster and is a global resource management system.
  • NodeManager reports resource usage to ResourceManager in a heartbeat mode (currently, it is mainly CPU and memory usage). RM only accepts NM's resource return information, and the specific resource processing is handed over to NM for processing.
  • YARN Scheduler allocates resources to the application according to the request, and is not responsible for the monitoring, tracking, running status feedback, startup, etc. of the application job.

3.3.2. NodeManager

  • NodeManager is the resource and task manager on each node. It is the agent that manages the machine, responsible for the operation of the node program, and the management and monitoring of the node's resources. Each node of the YARN cluster runs a NodeManager.
  • The NodeManager regularly reports to the ResourceManager the usage of the resources (CPU, memory) of the node and the running status of the Container. When the ResourceManager goes down, the NodeManager automatically connects to the standby RM node.
  • NodeManager receives and processes various requests from ApplicationMaster to start and stop the Container.

3.3.3. ApplicationMaster

  • Each application submitted by the user contains an ApplicationMaster, which can run on machines other than ResourceManager.
  • Responsible for negotiating with the RM scheduler to obtain resources (represented by Container).
  • The tasks obtained are further allocated to internal tasks (secondary allocation of resources).
  • Communicate with NM to start/stop tasks.
  • Monitor the running status of all tasks, and reapply for resources to restart the task when the task fails.
  • Currently YARN comes with two ApplicationMaster implementations, one is the example program DistributedShell used to demonstrate the AM writing method, which can apply for a certain number of Containers to run a Shell command or Shell script in parallel; the other is the AM that runs MapReduce applications— MRAppMaster.

Note: RM is only responsible for monitoring AM, and starts it when AM fails. RM is not responsible for the fault tolerance of AM internal tasks, and the fault tolerance of tasks is completed by AM.

3.4, Yarn scheduler Scheduler

​ In Yarn, the scheduler is responsible for allocating resources to the application . There are three schedulers to choose from in Yarn: FIFO Scheduler, Capacity Scheduler, Fair Scheduler.

3.4.1. FIFO Scheduler

FIFO Scheduler arranges applications into a queue in the order of submission. This is a first-in first-out queue. When resource allocation is performed, resources are allocated to the top application in the queue first, and then the requirements of the top application are met. Give the next assignment, and so on.

Insert picture description here

​ FIFO Scheduler is the simplest and easiest to understand scheduler, and does not require any configuration, but it is not suitable for shared clusters. Large applications may occupy all cluster resources, which causes other applications to be blocked. In a shared cluster, it is more suitable to use Capacity Scheduler or Fair Scheduler, both of which allow large tasks and small tasks to obtain certain system resources while being submitted.

3.4.2. Capacity Scheduler

The Capacity scheduler allows multiple organizations to share the entire cluster, and each organization can obtain a portion of the cluster's computing power. By assigning a dedicated queue to each organization, and then assigning certain cluster resources to each queue, the entire cluster can provide services to multiple organizations by setting up multiple queues. In addition, the queue can be divided vertically, so that multiple members within an organization can share the queue resources. Within a queue, resource scheduling uses a first-in-first-out (FIFO) strategy.

Insert picture description here

​ Capacity Scheduler was originally developed and designed by Yahoo to enable Hadoop applications to be used by multiple users and maximize the throughput of the entire cluster resources. It is now used by IBM BigInsights and Hortonworks HDP.

​ Capacity Scheduler is designed to allow applications to share cluster resources in a predictable and simple way, the "job queue". Capacity Scheduler allocates existing resources to running applications according to the needs and requirements of tenants. Capacity Scheduler also allows applications to access resources that have not been used to ensure that the queues share the allowed resources of other queues. The administrator can control the capacity of each queue, and the Capacity Scheduler is responsible for submitting jobs to the queue.

3.4.3. Fair Scheduler

​ In the Fair scheduler, we do not need to occupy certain system resources in advance. The Fair scheduler dynamically adjusts system resources for all running jobs. As shown in the figure below, when the first big job is submitted, only this job is running, and at this time it has all the cluster resources; when the second small task is submitted, the Fair scheduler will allocate half of the resources to this small task , Let these two tasks share cluster resources fairly.

It should be noted that in the Fair scheduler shown below, there will be a certain delay from the submission of the second task to the acquisition of resources, because it needs to wait for the first task to release the occupied Container. After the execution of the small task is completed, the resources occupied by it will be released, and the large task will obtain all the system resources. The final effect is that the Fair scheduler not only obtains high resource utilization, but also ensures that small tasks are completed in time.

Insert picture description here

The Fair Scheduler was originally developed and designed by Facebook to enable Hadoop applications to share the entire cluster resources fairly by multiple users. It is now adopted by Cloudera CDH.

Fair Scheduler does not need to reserve the resources of the cluster because it dynamically balances resources among all running jobs.

4. hive

4.1 Introduction to hive

Insert picture description here

Hive is a data warehouse Hadoop-based tool that can be structured data file is mapped to a database table, and provides SQL-like query.

The essence is to convert SQL into MapReduce programs.

​ Main purpose: It is used for offline data analysis, which is more efficient than directly using MapReduce to develop.

4.2, hive architecture

Insert picture description here

User interface : including CLI, JDBC/ODBC, WebGUI. Among them, CLI (command line interface) is a shell command line; JDBC/ODBC is the JAVA implementation of Hive, similar to the traditional database JDBC; WebGUI is to access Hive through a browser.

Metadata storage : usually stored in relational databases such as mysql/derby. Hive stores metadata in the database. The metadata in Hive includes the name of the table, the columns and partitions of the table and their attributes, the attributes of the table (whether it is an external table, etc.), and the directory where the data of the table is located.

Interpreter, compiler, optimizer, executor : complete HQL query statement from lexical analysis, syntax analysis, compilation, optimization, and query plan generation. The generated query plan is stored in HDFS and then called and executed by MapReduce.

4.3, Hive data model

All data in Hive is stored in HDFS, there is no special data storage format

When you specify the separator in the data when creating the table, Hive can successfully map and parse the data.

Hive contains the following data models :

**db:** It appears as a folder in the hive.metastore.warehouse.dir directory in hdfs

**table:** Represents a folder under the db directory in hdfs

**external table: **Data storage location can be arbitrarily specified path in HDFS

**partition:** is represented as a subdirectory under the table directory in hdfs

**bucket:** In hdfs, it appears as multiple files after hashing according to the hash in the same table directory

4.4, common operations

The Hive hive contains a database named default.

* ---创建数据库

create database [if not exists] <database name>;

* --显示所有数据库

show databases;  

*  --删除数据库

drop database if exists <database name> [restrict|cascade]; 

​	默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错

	hive> drop database if exists users cascade;
* --切换数据库

use <database name>; 

4.4.2, internal table external table

create table 
student(Sno int,Sname string,Sex string,Sage int,Sdept string) 
row format delimited fields terminated by ',';
create external table 
student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string) 
row format delimited fields terminated by ',' location '/stu';
load data local inpath '/root/hivedata/students.txt' overwrite into table student;

load data inpath '/stu' into table student_ext;

4.4.3, create a partition table

  • There are two types of partition table creation, one is a single partition, which means that there is only a first-level folder directory under the table folder directory. The other is multi-partition, and the multi-folder nesting mode appears under the table folder.
  • Single partition table creation statement
create table day_table (id int, content string) partitioned by (dt string);
  • Double partition table creation statement
create table day_hour_table (id int, content string) partitioned by (dt string, hour string);
 load data local inpath '/root/hivedata/dat_table.txt' into table day_table partition(dt='2017-07-07');
 load data local inpath '/root/hivedata/dat_table.txt' into table day_hour_table partition(dt='2017-07-07', hour='08');

 SELECT day_table.* FROM day_table WHERE day_table.dt = '2017-07-07';

 show partitions day_hour_table;  


Specify delimiter

-Specify the separator to create a partition table

create table day_table (id int, content string) partitioned by (dt string) row format delimited fields terminated by ',';

—Complex type of data table designated delimiter

Data are as follows

zhangsan	beijing,shanghai,tianjin,hangzhou
wangwu	shanghai,chengdu,wuhan,haerbin

Table building statement

create table
complex_array(name string,work_locations array<string>) 
row format delimited fields terminated by '\t' 
collection items terminated by ',';

4.4.4, add and delete partitions

  • Increase partition
alter table t_partition add partition (dt='2008-08-08') location 'hdfs://node-21:9000/t_parti/';

执行添加分区  /t_parti文件夹下的数据不会被移动。并且没有分区目录dt=2008-08-08 
  • Delete partition
alter table t_partition drop partition (dt='2008-08-08');


注意区别于load data时候添加分区:会移动数据 会创建分区目录

4.4.5, join in hive



create table a(id int,name string)
row format delimited fields terminated by ',';

create table b(id int,name string)
row format delimited fields terminated by ',';

load data local inpath '/root/hivedata/a.txt' into table a;
load data local inpath '/root/hivedata/b.txt' into table b;

** inner join
select * from a inner join b on a.id=b.id;

| a.id  | a.name  | b.id  | b.name  |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 7     | y       | 7     | yy      |

**left join   
select * from a left join b on a.id=b.id;
| a.id  | a.name  | b.id  | b.name  |
| 1     | a       | NULL  | NULL    |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 4     | d       | NULL  | NULL    |
| 7     | y       | 7     | yy      |
| 8     | u       | NULL  | NULL    |

**right join
select * from a right join b on a.id=b.id;

select * from b right join a on b.id=a.id;
| a.id  | a.name  | b.id  | b.name  |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 7     | y       | 7     | yy      |
| NULL  | NULL    | 9     | pp      |

select * from a full outer join b on a.id=b.id;
| a.id  | a.name  | b.id  | b.name  |
| 1     | a       | NULL  | NULL    |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 4     | d       | NULL  | NULL    |
| 7     | y       | 7     | yy      |
| 8     | u       | NULL  | NULL    |
| NULL  | NULL    | 9     | pp      |

select * from a left semi join b on a.id = b.id;

select a.* from a inner join b on a.id=b.id;
| a.id  | a.name  
| 2     | b       
| 3     | c       
| 7     | y       
select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低

select a.id,a.name from a join b on (a.id = b.id);

select * from a inner join b on a.id=b.id;

cross join(##慎用)
select a.*,b.* from a cross join b;

4.4.6, json parsing

1、先加载rating.json文件到hive的一个原始表 rat_json

create table rat_json(line string) row format delimited;
load data local inpath '/root/hivedata/rating.json' into table rat_json;

2、需要解析json数据成四个字段,插入一张新的表 t_rating
drop table if exists t_rating;
create table t_rating(movieid string,rate int,timestring string,uid string)
row format delimited fields terminated by '\t';

insert overwrite table t_rating
get_json_object(line,'$.movie') as moive,
get_json_object(line,'$.rate') as rate,
get_json_object(line,'$.timeStamp') as timestring, get_json_object(line,'$.uid') as uid 
from rat_json limit 10;

4.5, commonly used functions

4.5.1 Numerical functions

Specify precision rounding function: round

Syntax: round(double a, int d)

Return value: DOUBLE

Description: Returns the double type with the specified precision d

For example:

hive> select round(3.1415926,4) from dual;


Round down function: floor

Syntax: floor(double a)

Return value: BIGINT

Description: Returns the largest integer equal to or less than the double variable

For example:

hive> select floor(3.1415926) from dual;

hive> select floor(25) from dual;

Round up function: ceil

Syntax: ceil(double a)

Return value: BIGINT

Description: Returns the smallest integer equal to or greater than the double variable

For example:

hive> select ceil(3.1415926) from dual;

hive> select ceil(46) from dual;

Random number function: rand

Syntax: rand(),rand(int seed)

Return value: double

Explanation: Returns a random number in the range of 0 to 1. If the seed seed is specified, it will wait until a stable random number sequence

For example:

hive> select rand() from dual;


Absolute value function: abs

Syntax: abs(double a) abs(int a)

Return value: double int

Description: Returns the absolute value of a

For example:

hive> select abs(-3.9) from dual;

hive> select abs(10.9) from dual;

4.5.2, date function

  • to_date(string timestamp) : Returns the date part in the time string,
  • Such as to_date('1970-01-01 00:00:00')='1970-01-01'
  • current_date : return the current date
  • year(date) : Returns the year of the date date, the type is int
  • Such as year('2019-01-01')=2019
  • month(date) : Returns the month of date date, the type is int,
  • Such as month('2019-01-01')=1
  • day(date) : returns the day of the date date, the type is int,
  • Such as day('2019-01-01')=1
  • weekofyear(date1) : Return the first few weeks of the year in the date date1.
  • Such as weekofyear('2019-03-06')=10
  • datediff(date1,date2) : returns the number of days between date1 and date2
  • Such as datediff('2019-03-06','2019-03-05')=1
  • date_add(date1,int1) : Returns the date date1 plus int1
  • Such as date_add('2019-03-06',1)='2019-03-07'
  • date_sub(date1,int1) : Returns the date date1 minus int1
  • Such as date_sub('2019-03-06',1)='2019-03-05'
  • months_between(date1,date2) : returns the month difference between date1 and date2
  • Such as months_between('2019-03-06','2019-01-01')=2
  • add_months(date1,int1) : Returns the date of date1 plus int1 months, int1 can be a negative number
  • Such as add_months('2019-02-11',-1)='2019-01-11'
  • last_day(date1) : Returns the last day of the month in which date1 is located
  • Such as last_day('2019-02-01')='2019-02-28'
  • next_day(date1,day1) : Returns the date of day1 in the next week of date1. day1 is the first two letters of the English week X
  • For example, next_day('2019-03-06','MO') returns '2019-03-11'
  • **trunc(date1,string1)😗* returns the first year or month of the date. string1 can be year (YYYY/YY/YEAR) or month (MONTH/MON/MM).
  • Such as trunc('2019-03-06','MM')='2019-03-01', trunc('2019-03-06','YYYY')='2019-01-01'
  • unix_timestamp() : Returns the unix timestamp of the current time, and the date format can be specified.
  • Such as unix_timestamp('2019-03-06','yyyy-mm-dd')=1546704180
  • from_unixtime() : Returns the date of the unix timestamp, and the format can be specified.
  • The select from_unixtime (unix_timestamp ( '2019-03-06' , 'yyyy-mm-dd'), 'yyyymmdd') = '20190306'

4.5.3, conditional function

  • if(boolean,t1,t2) : If the Boolean value is true, return t1, anyway, return t2.
  • If (1>2,100,200) returns 200
  • case when boolean then t1 else t2 end : If the boolean value is true, then t1, otherwise t2, multiple judgments can be added
  • coalesce(v0,v1,v2) : returns the first non-null value in the parameter, if all values ​​are null, then null is returned.
  • Such as coalesce(null,1,2) returns 1
  • isnull(a) : returns true if a is null, otherwise returns false

4.5.4, string functions

  • length(string1) : Returns the length of the string
  • concat(string1,string2) : Returns the string after concatenating string1 and string2
  • concat_ws(sep,string1,string2) : Returns the string concatenated by the specified separator
  • lower(string1) : return lowercase string, same as lcase(string1). upper()/ucase(): returns an uppercase string
  • trim(string1) : remove the left and right spaces of the string, ltrim(string1): remove the left spaces of the string. rtrim(string1): go to the right of the string
  • repeat(string1, int1) : return the string after repeating the string1 string int1 times
  • reverse(string1) : Returns the reversed string of string1.
  • Such as reverse('abc') returns'cba'
  • rpad(string1,len1,pad1) : Fill string1 with pad1 characters to the right to the length of len1.
  • For example, rpad('abc',5,'1') returns'abc11'. lpad(): left padding
  • split(string1,pat1) : Separate string string1 with pat1 regularity and return an array.
  • Such as split('a,b,c',',') returns ["a","b","c"]
  • substr(string1,index1,int1) : intercept int1 characters from the index position.
  • The substr ( 'abcde', 1,2) returns 'ab'

4.5.5, type conversion

​ Hive’s atomic data types can be implicitly converted, similar to Java’s type conversion. For example, if an expression uses INT type, TINYINT will automatically convert to INT type, but Hive will not perform reverse conversion. For example, an expression The type uses TINYINT type, INT will not be automatically converted to TINYINT type, it will return an error unless the CAST operation is used.

  • cast(value AS TYPE)
  • select cast(‘1’ as DOUBLE); 返回1.0


4.6.1、 Fetch抓取(Hive可以避免进行MapReduce)

​ Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

​ 在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。


​ 1)把hive.fetch.task.conversion设置成none,然后执行查询语句,都会执行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;


hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;


​ 大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

​ 用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

set hive.exec.mode.local.auto=true;  //开启本地mr
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local  mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;


hive (default)> set hive.exec.mode.local.auto=true; 
hive (default)> select * from score cluster by s_id;
18 rows selected (1.568 seconds)
hive (default)> set hive.exec.mode.local.auto=false; 
hive (default)> select * from score cluster by s_id;
18 rows selected (11.865 seconds)


  • 分区表对sql过滤查询是一种优化
  • 分桶表对join操作时提升性能很大,桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。


  • (新的版本当中已经没有区别了,旧的版本当中需要使用小表)








SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
No rows affected (152.135 seconds)


SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
No rows affected (141.585 seconds)、mapjoin

​ 如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。



set hive.auto.convert.join = true; 默认为true


set hive.mapjoin.smalltable.filesize=25123456;

4.6.5、group by


​ 并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。


​ (1)是否在Map端进行聚合,默认为True

set hive.map.aggr = true;


set hive.groupby.mapaggr.checkinterval = 100000;


set hive.groupby.skewindata = true;

当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。



主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M,可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);


a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数。

b) 假设input目录下有3个文件a,b,c大小分别为10m,20m,150m,那么hadoop会分隔成4个块(10m,20m,128m,22m),从而产生4个map数。即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。








set mapreduce.job.reduces =10;
create table a_1 as
select * from a
distribute by rand(123);



  1. 调整reduce个数方法一


​ hive.exec.reducers.bytes.per.reducer=256123456


​ hive.exec.reducers.max=1009




set mapreduce.job.reduces = 15;






​ JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

​ JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。

  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 


set  mapred.job.reuse.jvm.num.tasks=10;



​ 开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。



压缩可以节约磁盘的空间,基于文本的压缩率可达40%+; 压缩可以增加吞吐量和性能量(减小载入内存的数据量),但是在压缩和解压过程中会增加CPU的开销。所以针对IO密集型的jobs(非计算密集型)可以使用压缩的方式提高性能。 几种压缩算法:

Insert picture description here



Hive数据表的默认格式,存储方式:行存储。 可以使用Gzip压缩算法,但压缩后的文件不支持split 在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。

Sequence Files

Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行 的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。 Hadoop API提供的一种二进制文件,以key-value的形式序列化到文件中。存储方式:行存储。 sequencefile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,RECORD是默认选项,通常BLOCK会带来较RECORD更好的压缩性能。 优势是文件和hadoop api中的MapFile是相互兼容的



首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。 行组大小:行组变大有助于提高数据压缩的效率,但是可能会损害数据的读取性能,因为这样增加了 Lazy 解压性能的消耗。而且行组变大会占用更多的内存,这会影响并发执行的其他MR作业。






  1. 自定义格式


结论,一般选择orcfile/parquet + snappy 的方式

create table tablename (
 xxx, bigint
STORED AS orc tblproperties("orc.compress" = "SNAPPY")


  • 当一个sql中有多个job时候,且这多个job之间没有依赖,则可以让顺序执行变为并行执行(一般为用到union all )
// 开启任务并行执行
 set hive.exec.parallel=true;
 // 同一个sql允许并行任务的最大线程数 
set hive.exec.parallel.thread.number=8;


​ 小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:


set mapred.max.split.size=256000000;  
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;


set hive.merge.mapfiles = true
set hive.merge.mapredfiles = true
set hive.merge.size.per.task = 256*1000*1000
set hive.merge.smallfiles.avgsize=16000000








Insert picture description here



Insert picture description here








  • mysql导入到hdfs
bin/sqoop import \
--connect jdbc:mysql://node03:3306/A  \
--username root \
--password 123456 \
--target-dir /A2 \
--table B --m 1
  • mysql导入到hive
bin/sqoop import \
--connect "jdbc:mysql://node03:3306/A?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table B \
--hive-import \
--m 1 \
--hive-database default;
  • 从hdfs导出到mysql
bin/sqoop export \
--connect "jdbc:mysql://node03:3306/A?useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 123456 \
--table B \
--export-dir /user/hive/warehouse/b


参考资料:: https://www.cnblogs.com/qingyunzong/p/8724155.html



在datax 中导数据使用过程中往往会因为,目标数据过大导致datax oom,那么可以调大datax的jvm参数来防止oom,在python命令后,使用 -jvm=”-Xms5G -Xmx 5G”来调大

python datax.py  --jvm="-Xms5G -Xmx5G" ../job/test.json


如果报java.io.IOException: Maximum column length of 100,000 exceeded in column...异常信息,说明数据源column字段长度超过了100000字符。
 "safetySwitch": false,
  "skipEmptyRecords": false,
  "useTextQualifier": false
   safetySwitch = false;//单列长度不限制100000字符










  • 为分布式存储提供文件系统
  • 针对存储大尺寸的文件进行优化,不需要对HDFS上的文件进行随机读写
  • 直接使用文件
  • 数据模型不灵活
  • 使用文件系统和处理框架
  • 优化一次写入,多次读取的方式


  • 提供表状的面向列的数据存储
  • 针对表状数据的随机读写进行优化
  • 使用key-value操作数据
  • 提供灵活的数据模型
  • 使用表状存储,支持MapReduce,依赖HDFS
  • 优化了多次读,以及多次写



分布式缓存,基于内存,强调缓存,支持数据持久化,支持事务操作,NoSQL 类型的Key/vale数据库,同时支持List、Set等更丰富的类型。








关系型数据和Hive都是支持SQL引擎的数据库;Redis和Hbase都是NoSQL 类型的Key/vale数据库,支持简单的行列操作,不支持SQL引擎。


Hive和HBase是两种基于Hadoop的不同技术:Hive是一种类SQL的引擎,并且运行MapReduce任务,HBase是一种在Hadoop之上的NoSQL 的Key/vale数据库,只支持简单的行列操作。当然,这两种工具是可以同时使用的。Hive可以用来进行统计查询,HBase可以用来进行快速的实时查询,二者可进行整合。


Insert picture description here

















6.5、常用HBASE shell


$ bin/hbase shell


hbase(main):001:0> help


hbase(main):002:0> list



hbase(main):010:0> create 'user', 'info', 'data'
hbase(main):010:0> create 'user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}


  • 向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsan
  hbase(main):011:0> put 'user', 'rk0001', 'info:name', 'zhangsan'

  • 向user表中插入信息,row key为rk0001,列族info中添加gender列标示符,值为female
  hbase(main):012:0> put 'user', 'rk0001', 'info:gender', 'female'
  • 向user表中插入信息,row key为rk0001,列族info中添加age列标示符,值为20
  hbase(main):013:0> put 'user', 'rk0001', 'info:age', 20
  • 向user表中插入信息,row key为rk0001,列族data中添加pic列标示符,值为picture
  hbase(main):014:0> put 'user', 'rk0001', 'data:pic', 'picture'



获取user表中row key为rk0001的所有信息

hbase(main):015:0> get 'user', 'rk0001'


获取user表中row key为rk0001,info列族的所有信息

hbase(main):016:0> get 'user', 'rk0001', 'info'


获取user表中row key为rk0001,info列族的name、age列标示符的信息

hbase(main):017:0> get 'user', 'rk0001', 'info:name', 'info:age'



scan 'user'



scan 'user', {COLUMNS => 'info'}


hbase(main):053:0> count 'user'




Insert picture description here

Row Key

与nosql数据库们一样,row key是用来检索记录的主键。访问hbase table中的行,只有三种方式:

1 通过单个row key访问

2 通过row key的range

3 全表扫描

Row key行键 (Row key)可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes),在hbase内部,row key保存为字节数组。


列族Column Family

​ hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如courses:history , courses:math 都属于 courses 这个列族。

​ 列族越多,在取一行数据时所要参与IO、搜寻的文件就越多,所以,如果没有必要,不要设置太多的列族

列 Column

​ 列族下面的具体列,属于某一个ColumnFamily,类似于我们mysql当中创建的具体的列


​ HBase中通过row和columns确定的为一个存贮单元称为cell。每个 cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。


由{row key, column( = + ), version} 唯一确定的单元。




Insert picture description here
  • HRegionServer保存着meta表以及表数据,要访问表数据,首先Client先去访问zookeeper,从zookeeper里面获取meta表所在的位置信息,即找到这个meta表在哪个HRegionServer上保存着。
  • 接着Client通过刚才获取到的HRegionServer的IP来访问Meta表所在的HRegionServer,从而读取到Meta,进而获取到Meta表中存放的元数据。
  • Client通过元数据中存储的信息,访问对应的HRegionServer,然后扫描所在HRegionServer的Memstore和Storefile来查询数据。
  • 最后HRegionServer把查询到的数据响应给Client。











  • 增加数据读写效率
  • 负载均衡,防止数据倾斜
  • 方便集群容灾调度region
  • 优化Map数量


​ 每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。


  1. 手动设定预分区
 create 'staff','info','partition1',SPLITS => ['1000','2000','3000','4000']
Insert picture description here


create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
Insert picture description here




​ Rowkey 是一个二进制码流,Rowkey 的长度被很多开发者建议说设计在 10~100 个字节,不过建议是越短越好,不要超过 16 个字节,存为byte[]字节数组,一般设计成定长的

数据的持久化文件 HFile 中是按照 KeyValue 存储的,如果 Rowkey 过长比如 100 个字 节,1000 万列数据光 Rowkey 就要占用 100*1000 万=10 亿个字节,将近 1G 数据,这会极大 影响 HFile 的存储效率;


​ 如果 Rowkey 是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将 Rowkey 的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个 Regionserver 实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有 新数据都在一个 RegionServer 上堆积的热点现象,这样在做数据检索的时候负载将会集中 在个别 RegionServer,降低查询效率。

​ row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。

​ 举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。


必须在设计上保证其唯一性。rowkey 是按照字典顺序排序存储的,因此,设计 rowkey 的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问 的数据放到一块。


热点发生在大量的 client 直接访问集群的一个或极少数个节点(访问可能是读, 写或者其他操作)。大量访问会使热点 region 所在的单个机器超出自身承受能力,引起性能 下降甚至 region 不可用,这也会影响同一个 RegionServer 上的其他 region,由于主机无法服 务其他 region 的请求。 设计良好的数据访问模式以使集群被充分,均衡的利用。 为了避免写热点,设计 rowkey 使得不同行在同一个 region,但是在更多数据情况下,数据 应该被写入集群的多个 region,而不是一个。



​ 这里所说的加盐不是密码学中的加盐,而是在 rowkey 的前面增加随机数,具体就是给 rowkey 分配一个随机前缀以使得它和之前的 rowkey 的开头不同。分配的前缀种类数量应该 和你想使用数据分散到不同的 region 的数量一致。加盐之后的 rowkey 就会根据随机生成的 前缀分散到各个 region 上,以避免热点。

原本 rowKey 为 1001 的,SHA1 后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7 

原本 rowKey 为 3001 的,SHA1 后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd 

原本 rowKey 为 5001 的,SHA1 后变成:7b61dec07e02c188790670af43e717f0f46e8913

在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的 rowKey 来 Hash


​ 哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是 可以预测的。使用确定的哈希可以让客户端重构完整的 rowkey,可以使用 get 操作准确获取 某一个行数据


反转 rowkey 的例子以手机号为 rowkey,可以将手机号反转后的字符串作为 rowkey,这 样的就避免了以手机号那样比较固定开头导致热点问题。


一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为 rowkey 的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到 key 的末尾



Redis是当前比较热门的NOSQL系统之一,它是一个开源的使用ANSI c语言编写的key-value存储系统"。Redis数据都是缓存在计算机内存中,它可以周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,实现数据的持久化。



​字符型类型在 Redis 中采用二进制,这说明该类型存入和获取的数据相同。在 Redis 中字符串类型的 Value 最多可以容纳数据长度是512M

  • 赋值: set key value
  • 取值:get key/getset key value
  • 删除:del key
  • 数值自增和自减: incr key/decr key
  • 为key增加一个指定数值: incrby key increment
  • 为key减少一个指定数值: decrby key decrement
  • 拼凑字符串: append key value


Insert picture description here


Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。

  • 赋值: hset key field value 为指定的key设置field/value对
  • 赋值: hmset key field value [field2 value2…] 设置key中多个filed/value
  • 取值: hget key field
  • 取值: hmget key fileds 获取 key 中的多个filed 的值
  • 取值: hgetall key 获取 key 中所有的 filed-value
  • 删除: hdel key field[field…] 可以删除一个或多个字段,返回值是被删除的字段个数
  • 删除 del key 删除整个list
  • 增加数字: hincrby key field increment 设置key 中 filed 的值增加 increment
  • 是否存在: hexists key field 判断指定 key 中的 filed 是否存在
  • field的数量: hlen key 获取 key 所包含的 field 的数量
  • 获得所有的key :hkeys key
  • 获得所有的value :hvals key


Insert picture description here



  • 头部插入:lpush key values[value1 value2…] 在指定的key所在头部插入所有的values,如果该 key 不存在,该命令在插入之前会创建一个与该 key 关联的空链表。插入成功,返回元素的个数
  • 尾部插入:rpush key values[value1 value2…]
  • 获得链表中从start 到 end元素的值:lrange key start end 若为-1则表示链表尾部元素,-2为倒数第二个
  • 头部弹出:lpop key 返回并弹出指定的key 关联的链表中第一个元素,如果key 不存在 返回nil
  • 尾部弹出:rpop key
  • 获得列表中元素格式:llen key
  • lpushx key value:当参数中指定的 key 存在时,向关联的list的头部插入 value。如果不存在,将不进行插入
  • rpushx key value:在该list的尾部添加
  • lrem key cont value :删除 coount 个值为 value 的元素,如果count大于0,从头到尾遍历并删除,如果count 小于0,则从尾向头遍历并删除。如果 count 等于0,则删除链表中所有等于value 的元素
  • lset key index value :设置链表中的index 的脚标的元素值,0代表链表的头元素,-1代表链表的尾元素。角标不存在则抛异常
  • linsert key before|after pivot value :在 pivot 元素前或者后插入 value 这个元素
  • rpoplpush resource destination:将链表中的尾部元素弹出并添加到头部(循环操作)
Insert picture description here
Insert picture description here


在Redis 中,可以将 Set 类型看作为没有排序的字符集合,Set 集合中不允许出现重复的元素

  • sadd key values[value1 value2…]:向 set 中添加数据,如果该 key 的值已有则不会重复添加
  • srem key members[member1 member2…]:删除 set 中指定的成员
  • smembers key:获得 set 中所有的成员
  • sismember key member:判断参数中指定的成员是否在该 set 中,1表示存在,0表示不存在或key本身不存在
  • sdiff key1 key2…:返回 key1 与 key2 中相差的成员,而且与key的顺序有关。即返回差集。(A-B)
  • sinter key1 key2 key3…:返回交集(A∩B)
  • sunion key1 key2 key3…:返回并集(A∪B)
  • scard key:获取 set 中成员的数量
  • srandmember key:随机返回 set 中的一个成员
  • sdiffsotre destination key1 key2…:将key1、key2相差的成员存储在destination 上
  • sintersotre destination key[key…]:将返回的交集存储在 destination上
  • sunionstore destination key[key…]:将返回的并集存储在 destination 上
Insert picture description here





save [seconds] [changes]


save 60 100



​ 1、 对性能影响最小。如前文所述,Redis在保存RDB快照时会fork出子进程进行,几乎不影响Redis处理客户端请求的效率。

​ 2、 每次快照会生成一个完整的数据快照文件,所以可以辅以其他手段保存多个时间点的快照(例如把每天0点的快照备份至其他存储媒介中),作为非常可靠的灾难恢复手段。

​ 3、 使用RDB文件进行数据恢复比使用AOF要快很多


​ 1、 快照是定期生成的,所以在Redis crash时或多或少会丢失一部分数据。

​ 2、 如果数据集非常大且CPU不够强(比如单核CPU),Redis在fork子进程时可能会消耗相对较长的时间,影响Redis对外提供服务的能力。


​ 采用AOF持久方式时,Redis会把每一个写请求都记录在一个日志文件里。在Redis重启时,会把AOF文件中记录的所有写操作顺序执行一遍,确保数据恢复到最新。AOF默认是关闭的,如要开启,进行如下配置:

appendonly yes


appendfsync no:不进行fsync,将flush文件的时机交给OS决定,速度最快appendfsync always:每写入一条日志就进行一次fsync操作,数据安全性最高,但速度最慢

appendfsync everysec:折中的做法,交由后台线程每秒fsync一次


1、 最安全,在启用appendfsync always时,任何已写入的数据都不会丢失,使用在启用appendfsync everysec也至多只会丢失1秒的数据

2、 AOF文件在发生断电等问题时也不会损坏,即使出现了某条日志只写入了一半的情况,也可以使用redis-check-aof工具轻松修复。

3、 AOF文件易读,可修改,在进行了某些错误的数据清除操作后,只要AOF文件没有rewrite,就可以把AOF文件备份出来,把错误的命令删除,然后恢复数据。


1、 AOF文件通常比RDB文件更大

2、 性能消耗比RDB高

3、 数据恢复速度比RDB慢



Insert picture description here


Insert picture description here


  • Redis挂掉了,请求全部走数据库。
  • 对缓存数据设置相同的过期时间,导致某段时间内缓存失效,请求全部走数据库。



  • 对于“Redis挂掉了,请求全部走数据库”这种情况
  • 事发前:实现Redis的高可用(主从架构+Sentinel(哨兵) 或者Redis Cluster(集群)),尽量避免Redis挂掉这种情况发生
  • 事发中:万一Redis真的挂了,我们可以设置本地缓存(ehcache)+限流(hystrix),尽量避免我们的数据库被干掉(起码能保证我们的服务还是能正常工作的)
  • 事发后:redis持久化,重启后自动从磁盘上加载数据,快速恢复缓存数据
  • 对于“对缓存数据设置相同的过期时间,导致某段时间内缓存失效,请求全部走数据库。”这种情况
  • 在缓存的时候给过期时间加上一个随机值,这样就会大幅度的减少缓存在同一时间过期。



​ 缓存穿透是指查询一个一定不存在的数据。由于缓存不命中,并且出于容错考虑,如果从数据库查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,失去了缓存的意义。

​ 比如,我们有一张数据库表,ID都是从1开始的(正数)。但是可能有黑客想把我的数据库搞垮,每次请求的ID都是负数。这会导致我的缓存就没用了,请求全部都找数据库去了,但数据库也没有这个值啊,所以每次都返回空出去。

Insert picture description here






  • 用到Redis了吗
  • 都用到哪些数据结构
  • 存了哪些数据
  • 有数据持久化的方案吗



​ kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的消息队列系统。


​ 常见的消息队列:RabbitMQ,Redis ,zeroMQ ,ActiveMQ


  • 可靠性:分布式的,分区,复制和容错的。
  • 可扩展性:kafka消息传递系统轻松缩放,无需停机。
  • 耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
  • 性能:kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
  • kafka非常快:保证零停机和零数据丢失。


Insert picture description here







Consumer Group:每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)


Insert picture description here


  • 一个主题(topic)下面有一个分区(partition)即可


  • 生产者生产数据到borker的多个分区,每个分区的数据是相对有序的,但整体的数据就无序了。因为消费者在消费的时候是一个个的分区进行消费的,所以不能保证全局有序。


  • 消费组: 由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次。
  • 某一个主题下的分区数,对于消费组来说,应该小于等于该主题下的分区数。
Insert picture description here


  • 没有指定分区号、没指定key根据轮询的方式发送到不同的分区
  • 没有指定分区号、指定了key,根据key.hashcode%numPartition
  • 指定了分区号,则直接将数据写到指定的分区里面去
  • 自定义分区策略
public ProducerRecord(String topic, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

	前缀+date.getTime()   fixlog_1564388581914



同步模式:配置=1 (只有Leader收到,-1 所有副本成功,0 不等待)Leader Partition挂了,数据就会丢失

解决:设置 -1 保证produce 写入所有副本算成功 producer.type = sync request.required.acks=-1




​ broker采用分片副本机制,保证数据高可用。


拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么kafka上的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。 主要是因为offset提交使用了异步提交。


  • Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,需要修改为手动提交offset值。
  • 流式计算。高级数据源以kafka为例,由2种方式:receiver (开启WAL,失败可恢复) director (checkpoint保证)


  • 落表(主键或者唯一索引的方式,避免重复数据)


Insert picture description here


第二步:查找对应的segment里面的index文件 。index文件都是key/value对的。key表示数据在log文件里面的顺序是第几条。value记录了这一条数据在全局的标号。如果能够直接找到对应的offset直接去获取对应的数据即可

​ 如果index文件里面没有存储offset,就会查找offset最近的那一个offset,例如查找offset为7的数据找不到,那么就会去查找offset为6对应的数据,找到之后,再取下一条数据就是offset为7的数据

8.11、Kafka auto.offset.reset值详解


  • 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费


  • 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据



latest 这个设置容易丢失消息,假如kafka出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费,中间出问题的哪些就不管了。



​ 那么对于一般的公司,初期是没有那么多数据的,所以很多公司更倾向于使用传统的数据库:mysql;比如我们要查找关键字”传智播客“,那么查询的方式大概就是:select * from table where field like ‘%传智播客%’; 但是随着业务发展,数据会不断的膨胀,那么问题就来了;mysql单表查询能力即便经过了优化,它的极限也就是400W左右的数据量。而且还会经常出现查询超时的现象;

​ 然后很多公司开始对数据库进行横向和纵向的扩容,开始进行数据库表的“拆分”:横向拆分和纵向拆分;但是即便这样操作,仍然会出现很多问题,比如:

​ 1、数据库会出现单点故障问题,于是先天主从复制关系,于是增加了运维成本

​ 2、因为对表的拆分,增加了后期维护的难度,同样也是增加了运维成本

​ 3、即便做了大量的维护,但对于大数据的检索操作,依然很慢,完全达不到期望值

​ 于是出现了lucene,全文检索的工具。但是lucene对外暴露出的可用接口对于开发人员来说,操作是非常的复杂,而且没有效率的;于是在lucene的基础上进一步的封装,有了一个叫做solr的高性能分布式检索服务框架,但是,solr有一个致命的缺点就是:在建立索引期间,solr的搜索能力会极度下降,这就在一定程度上造成了solr在实时索引上效率并不高;

​ 最后,出现了一个叫做elasticsearch的框架,同样是以lucene为基础,并且吸收了前两代的教训而开发出的分布式多用户能力的全文搜索引擎,并且elasticsearch是基于RESTful web接口进行发布的,那么这就意味着,我们开发人员操作起来更方便快捷;同时es拓展节点方便,可用于存储和检索海量数据,接近实时搜索能力,自动发现节点、副本机制保障可用性


​ Elaticsearch,简称为es, es是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。


Relational DB -> Databases -> Tables -> Rows -> Columns
Hbase -> nameSpace   -> ns:Table  -> rowkey___ -> 列族下的一个个的列
Elasticsearch -> Index   -> Types  -> Documents -> Fields

9.3.1、索引 index

​ 一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母的),并且当我们要对对应于这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。,

9.3.2、类型 type

​ 在一个索引中,你可以定义一种或多种类型。一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。比如说,我们假设你运营一个博客平台并且将你所有的数据存储到一个索引中。在这个索引中,你可以为用户数据定义一个类型,为博客数据定义另一个类型,当然,也可以为评论数据定义另一个类型。


​ 相当于是数据表的字段,对文档数据根据不同属性进行的分类标识

9.3.4、映射 mapping

​ mapping是处理数据的方式和规则方面做一些限制,如某个字段的数据类型、默认值、分析器、是否被索引等等,这些都是映射里面可以设置的,其它就是处理es里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。

9.3.5、文档 document

​ 一个文档是一个可被索引的基础信息单元。比如,你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。

9.3.6、集群 cluster

​ 一个集群就是由一个或多个节点组织在一起,它们共同持有整个的数据,并一起提供索引和搜索功能。一个集群由一个唯一的名字标识,这个名字默认就是“elasticsearch”。这个名字是重要的,因为一个节点只能通过指定某个集群的名字,来加入这个集群。

9.3.7、节点 node

​ 一个节点是集群中的一个服务器,作为集群的一部分,它存储数据,参与集群的索引和搜索功能。

9,3.8、分片和副本 shards & replicas

​ 默认情况下,Elasticsearch中的每个索引被分片5个主分片和1个复制,这意味着,如果你的集群中至少有两个节点,你的索引将会有5个主分片和另外5个复制分片(1个完全拷贝),这样的话每个索引总共就有10个分片。



curl -XPUT http://node01:9200/blog01/?pretty
Insert picture description here


curl -XPUT http://node01:9200/blog01/article/1?pretty -d  '{"id": "1", "title": "What is lucene"}'


curl -XGET http://node01:9200/blog01/article/1?pretty


curl -XPUT http://node01:9200/blog01/article/1?pretty -d  '{"id": "1", "title": " What is elasticsearch"}'


curl -XGET "http://node01:9200/blog01/article/_search?q=title:elasticsearch"


curl -XDELETE "http://node01:9200/blog01/article/1?pretty"


curl -XDELETE http://node01:9200/blog01?pretty



  • hits
  • took
  • shard
  • timeout



match(单条件) 中华 人民 共和人

  • 会将查询的语句进行分词


  • must
  • must_not
  • should


  • 不会将查询的语句进行分词






DELETE  document
PUT document
  "mappings": {
    "article" : {
        "title" : {"type": "text"} , 
        "author" : {"type": "text"} , 
        "titleScore" : {"type": "double"} 

是否存储:false 如果设置为true,在Lucene的文档库中就会保存一份。
get document/article/_mapping


PUT /document/_settings
  "number_of_replicas": 2




Insert picture description here


其次:node1介绍到请求之后,会根据请求中携带的参数“文档id”判断出该文档应该存储在具体哪一个shard中shard = hash(routing) % number of primary_shards


最后:node3接收到请求之后会将请求并行的分发给shard0的所有replica shard之上,也就是存在于node 1和node 2中的replica shard;如果所有的replica shard都成功地执行了请求,那么将会向node 3回复一个成功确认,当node 3收到了所有replica shard的确认信息后,则最后向用户返回一个Success的消息。


Insert picture description here
  • 阶段1:客户端向node 1发送一个文档删除的请求。
  • 阶段2:同样的node 1通过请求中文档的 _id 值判断出该文档应该被存储在shard 0 这个分片中,并且node 1知道shard 0的primary shard位于node 3这个节点上。因此node 1会把这个请求转发到node 3。
  • 阶段3:node 3接收到请求后,在主分片上面执行删除请求
  • 阶段4:如果node 3成功地删除了文档,node 3将会请求并行地发给其余所有的replica shard所在node中。这些node也同样操作删除,执行后则向node 3确认成功,当node 3接收到所有的成功确认之后,再向客户端发送一个删除成功的信息。


select * from student order by score limit 10;

Insert picture description here


(2):node3将检所请求发送给index中的每一个shard(primary 和 replica),每一个在本地执行检索,并将结果添加到本地的优先级队列中;

(3):每个shard返回本地优先级序列中所记录的_id与score值,并发送node3。Node3将这些值合并到自己的本地的优先级队列中,并做全局的排序(node 3将它们合并成一条汇总的结果),返回给客户端。




包括安全(x-pack-security),监视(x-pack-watcher),警报(x-pack-monitoring),报表(x-pack-graph),Elasticsearch SQL(x-pack-sql),跨集群复制(x-pack-ccr)、x-pack-upgrade、x-pack-rollup和机器学习(x-pack-ml)。7.x版本中,es的安全核心功能免费使用。


The es7.x version installs x-pack by default. Modify the configuration and activate it.

xpack.security.enabled: true
xpack.security.audit.enabled: true
xpack.license.self_generated.type: basic
xpack.security.transport.ssl.enabled: true

2. ik participle

a. Introduction

The most popular word segmentation plugin

b, installation

cd /bin
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.3.0/elasticsearch-analysis-ik-7.3.0.zip


a. Introduction

The pinyin plugin can convert documents and query conditions between Chinese characters and Pinyin

b, installation

./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-pinyin/releases/download/v7.3.0/elasticsearch-analysis-pinyin-7.3.0.zip

4. icu

a. Introduction

​ The ICU analyzer plug-in of Elasticsearch uses the international component Unicode (ICU) library (see site.project.org for details ) to provide a wealth of Unicode processing tools. These include the icu_ tokenizer, which is particularly useful for processing Asian languages, as well as a large number of tokenizer filters necessary for correct matching and sorting of languages ​​other than English.

b, installation

./elasticsearch-plugin install analysis-icu

5. Ingest attachment plugin

a. Introduction

​ This is a text parsing plugin for elasticsearch developed based on the Apache text extension library Tika plugin. It can be used to extract and automatically import text from mainstream format files (PDF, DOC, EXCEL, etc.).
​ Since ElasticSearch is a document database based on JSON format, attachment documents must be Base64 encoded before being inserted into ElasticSearch.