MapReduce database table input

MapReduce database table input

1. Requirements


  • Description : Read data directly from the MySQL table and use it as the input source of MapReduce
  • Requirement : Requirement: Use MR to directly read the data in the emp table, and calculate the sum of wages in each department.

2. Test data


Employee information table [emp] table SQL statement to create a table:

/*
Navicat MySQL Data Transfer

Source Server         : local-mysql
Source Server Version : 50722
Source Host           : localhost:3306
Source Database       : hadoop

Target Server Type    : MYSQL
Target Server Version : 50722
File Encoding         : 65001

Date: 2020-05-19 16:05:22
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for emp
-- ----------------------------
DROP TABLE IF EXISTS `emp`;
CREATE TABLE `emp` (
  `EMPNO` int(30) NOT NULL AUTO_INCREMENT,
  `ENAME` varchar(50) DEFAULT NULL,
  `JOB` varchar(50) DEFAULT NULL,
  `MGR` int(30) DEFAULT '0',
  `HIREDATE` varchar(255) DEFAULT NULL,
  `SAL` int(30) DEFAULT '0',
  `COMM` int(30) DEFAULT '0',
  `DEPTNO` int(30) DEFAULT '0',
  PRIMARY KEY (`EMPNO`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES ('7369', 'SMITH', 'CLERK', '7902', '1980/12/17', '8000', '0', '20');
INSERT INTO `emp` VALUES ('7499', 'ALLEN', 'SALESMAN', '7698', '1981/2/20', '1600', '300', '30');
INSERT INTO `emp` VALUES ('7521', 'WARD', 'SALESMAN', '7698', '1981/2/22', '1250', '500', '30');
INSERT INTO `emp` VALUES ('7566', 'JONES', 'MANAGER', '7839', '1981/4/2', '2975', '0', '20');
INSERT INTO `emp` VALUES ('7654', 'MARTIN', 'SALESMAN', '7698', '1981/9/28', '1250', '1400', '30');
INSERT INTO `emp` VALUES ('7698', 'BLAKE', 'MANAGER', '7839', '1981/5/1', '2850', '0', '30');
INSERT INTO `emp` VALUES ('7782', 'CLARK', 'MANAGER', '7839', '1981/6/9', '2450', '0', '10');
INSERT INTO `emp` VALUES ('7788', 'SCOTT', 'ANALYST', '7566', '1987/4/19', '3000', '0', '20');
INSERT INTO `emp` VALUES ('7839', 'KING', 'PRESIDENT', '0', '1981/11/17', '5000', '0', '10');
INSERT INTO `emp` VALUES ('7844', 'TURNER', 'SALESMAN', '7698', '1981/9/8', '1500', '0', '30');
INSERT INTO `emp` VALUES ('7876', 'ADAMS', 'CLERK', '7788', '1987/5/23', '1100', '0', '20');
INSERT INTO `emp` VALUES ('7900', 'JAMES', 'CLERK', '7698', '1981/12/3', '9500', '0', '30');
INSERT INTO `emp` VALUES ('7902', 'FORD', 'ANALYST', '7566', '1981/12/3', '3000', '0', '20');
INSERT INTO `emp` VALUES ('7934', 'MILLER', 'CLERK', '7782', '1982/1/23', '1300', '0', '10');

Table field description:

Insert picture description here

Three, programming ideas


  • Ideas :
    1. The input uses DBInputFormat to specify the database table
    2. The core is to read the database table data through JDBC
    3. The introduction of mysql driver in pom.xml depends on mysql-connector-java
  • Note : If you are manually importing dependencies, you need to download the mysql-connector-java driver, whose version is consistent with your database version. This experiment uses mysql-5.7, so as long as the imported version is 5.x.

Four, implementation steps


Create a maven project in Idea or eclipse

Add hadoop dependency in pom.xml

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.7.3</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>2.7.3</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-mapreduce-client-common</artifactId>
	<version>2.7.3</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-mapreduce-client-core</artifactId>
	<version>2.7.3</version>
</dependency>
<!-- 添加MySQL驱动,本案例使用的是MySQL5.7,故驱动程序选择5.1.17 -->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.17</version>
</dependency>

Add the log4j.properties file in the resources directory, namely resources, the contents of the file are as follows:

### 配置根 ###
log4j.rootLogger = debug,console,fileAppender
## 配置输出到控制台 ###
log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c:%L - %m%n
### 配置输出到文件 ###
log4j.appender.fileAppender = org.apache.log4j.FileAppender
log4j.appender.fileAppender.File = logs/logs.log
log4j.appender.fileAppender.Append = false
log4j.appender.fileAppender.Threshold = DEBUG,INFO,WARN,ERROR
log4j.appender.fileAppender.layout = org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n

Write the entity class EmployeeDBWritable corresponding to the database table emp

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class EmployeeDBWritable implements Writable , DBWritable {
	private int empno;
	private String ename;
	private String job;
	private int mgr;
	private String hiredate;
	private int sal;
	private int comm;
	private int deptno;
	
	public int getEmpno() {
		return empno;
	}
	
	public void setEmpno(int empno) {
		this.empno = empno;
	}
	
	public String getEname() {
		return ename;
	}
	
	public void setEname(String ename) {
		this.ename = ename;
	}
	
	public String getJob() {
		return job;
	}
	
	public void setJob(String job) {
		this.job = job;
	}
	
	public int getMgr() {
		return mgr;
	}
	
	public void setMgr(int mgr) {
		this.mgr = mgr;
	}
	
	public String getHiredate() {
		return hiredate;
	}
	
	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}
	
	public int getSal() {
		return sal;
	}
	
	public void setSal(int sal) {
		this.sal = sal;
	}
	
	public int getComm() {
		return comm;
	}
	
	public void setComm(int comm) {
		this.comm = comm;
	}
	
	public int getDeptno() {
		return deptno;
	}
	
	public void setDeptno(int deptno) {
		this.deptno = deptno;
	}
	
	@Override
	public void readFields(DataInput input) throws IOException {
		// 反序列化
		// 数据格式:7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
		this.empno = input.readInt();
		this.ename = input.readUTF();
		this.job = input.readUTF();
		this.mgr = input.readInt();
		this.hiredate = input.readUTF();
		this.sal = input.readInt();
		this.comm = input.readInt();
		this.deptno = input.readInt();
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		// 序列化
		// 数据格式:7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
		output.writeInt(empno);
		output.writeUTF(ename);
		output.writeUTF(job);
		output.writeInt(mgr);
		output.writeUTF(hiredate);
		output.writeInt(sal);
		output.writeInt(comm);
		output.writeInt(deptno);
	}
	
	@Override
	public String toString() {
		return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job + ", mgr=" + mgr + ", hiredate="
		+ hiredate + ", sal=" + sal + ", comm=" + comm + ", deptno=" + deptno + "]";
	}
	
	/**
	* 写数据库表
	* @param statement
	* @throws SQLException
	*/
	public void write(PreparedStatement statement) throws SQLException {
		statement.setInt(1,empno);
		statement.setString(2,ename);
		statement.setString(3,job);
		statement.setInt(4,mgr);
		statement.setString(5,hiredate);
		statement.setInt(6,sal);
		statement.setInt(7,comm);
		statement.setInt(8,deptno);
	}
	
	/**
	* 读数据库表
	* @param resultSet
	* @throws SQLException
	*/
	public void readFields(ResultSet resultSet) throws SQLException {
		this.empno = resultSet.getInt(1);
		this.ename = resultSet.getString(2);
		this.job = resultSet.getString(3);
		this.mgr = resultSet.getInt(4);
		this.hiredate = resultSet.getString(5);
		this.sal = resultSet.getInt(6);
		this.comm = resultSet.getInt(7);
		this.deptno = resultSet.getInt(8);
	}
}

Write a text type mapper that is EmpTableMapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class EmpTableMapper extends Mapper<LongWritable,EmployeeDBWritable, IntWritable,IntWritable> {

	IntWritable key2 = new IntWritable();
	IntWritable value2 = new IntWritable();

	protected void map(LongWritable key, EmployeeDBWritable value, Context context) throws IOException, InterruptedException {
		System.out.println("offset:" + key + "=>" + value.toString());
		key2.set(value.getDeptno());
		value2.set(value.getSal());
		context.write(key2,value2);
	}
}

Write the reducer class

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class EmpTableReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {

	protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable v: values) {
			sum += v.get();
		}
		context.write(key,new IntWritable(sum));
	}
}

Write the Driver class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpTableJob {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("EmpDBJob");
		job.setJarByClass(EmpTableJob.class);
		
		job.setMapperClass(EmpTableMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setReducerClass(EmpTableReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置输入:数据库的输入,需要配置数据库的读取驱动、用户名、密码等
		/**
		* Sets the DB access related fields in the {@link Configuration}.
		* @param conf the configuration
		* @param driverClass JDBC Driver class name
		* @param dbUrl JDBC DB access URL.
		* @param userName DB access username
		* @param passwd DB access passwd
		*/
		
		String driverClass = "com.mysql.jdbc.Driver";
		String dbUrl = "jdbc:mysql://localhost:3306/hadoop";
		String userName = "root";
		String passwd = "123456";
		
		DBConfiguration.configureDB(job.getConfiguration(),driverClass,dbUrl,userName,passwd);
		
		DBInputFormat.setInput(job,EmployeeDBWritable.class,"select * from emp",
		"select count(*) from emp");
		
		//设置job的输出
		FileOutputFormat.setOutputPath(job,new Path("F:\\NIIT\\hadoopOnWindow\\output\\emp"));
		
		//提交作业
		System.out.println(job.waitForCompletion(true));
	}
}

Run the code locally and test whether the result is correct or not

Five, package and upload to the cluster to run (for reference only, modify by yourself)


After the local test results are correct, you need to modify the driver class output code. The specific modifications are as follows:
FileOutputFormat.setOutputPath(job,new Path(args[0]));

Modify [Database] related information in Job

To type the program into a jar package, you need to configure the packaging plug-in in pom.xml

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId> maven-assembly-plugin </artifactId>
                <configuration>
                    <!-- 使用Maven预配置的描述符-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <!-- 绑定到package生命周期 -->
                        <phase>package</phase>
                        <goals>
                            <!-- 只运行一次 -->
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Operate as shown in the figure below

Insert picture description here


Insert picture description here

Submit the cluster to run, execute the following command:

hadoop jar packagedemo-1.0-SNAPSHOT.jar  com.niit.mr.EmpJob /datas/emp.csv /output/emp/ 

At this point, all the steps have been completed, you can try it, and good luck to everyone~~~~