1、环境准备工作
首先,我们需要为window配置一个hadoop的环境,配置方式和JDK的配置方式一样,区别在于:hadoop的还需要经过本地编译的步骤(我们可以上网去下载和自己windows一样的hadoop编译后的压缩包即可),我的是windows10 x64位的版本,我的编译后的压缩包见:win10_x64_hadoop-2.6.4。
下载好压缩包后,解压到本地即可,假设我解压的目录是:E:\hadoop-2.6.4,目录结构如下:

再在环境变量中添加 HADOOP_HOME ,值为hadoop解压目录:E:\hadoop-2.6.4;再在path中添加上 %HADOOP_HOME%\bin 将bin配置到环境变量中即可,最后在命令窗口执行 hadoop version
如果没有提示 不识别的命令 表示配置成功。
2、创建工程并导入开发库
创建一个java工程,导入hadoop所需要的库,我的 hadoop2.6.4 的库可以在这里下载:hadoop2.6.4-lib
3、编写MapReduce代码
为了演示方便,我用最经典的WordCount做示例。
1)Mapper代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.lanting.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;
/**
* Wordcount的mapper
*
* <LongWritable, Text, Text, IntWritable> 分别表示输入的key类型,输入的value类型;输出的key类型,输出的value类型
*
* 由于文本是一行行读入的,所以输入的key类型是每一行的第一个字符的偏移量,因此用LongWritable,而输入的内容就是文本,因此输入value是Text类型
*
* 由于输出的类型是 <单词,1> 因此,输出key类型是Text类型,输出value是IntWritable类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将每一行进行单词分割
String[] words = value.toString().split("\\PL+");
// 将每个单词写出去(不管单词相不相同,都写 <单词,1> )
Stream.of(words).parallel().forEach(s -> {
try {
context.write(new Text(s), new IntWritable(1));
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
2)Reducer的代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.lanting.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Wordcount的reducer
*
* <Text, IntWritable, Text, IntWritable> 分别表示Reducer的输入K,V类型(也就是mapper的输出类型)和输出的K,V类型。
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//记录某个单词出现的总次数
int count = 0;
for (IntWritable i : values) {
// 统计单词出现的每一次,累加
count += i.get();
}
// 输出单词以及对应出现的总次数
context.write(key, new IntWritable(count));
}
}
3)WordCount主类的代码如下(注释了的部分可以忽略):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66package com.lanting.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 执行wordCount
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 创建配置文件并设置配置
Configuration configuration = new Configuration();
// configuration.set("mapreduce.framework.name", "yarn");// yarn表示用集群,local表示本地(默认)
// // 配置yarn资源管理器节点
// configuration.set("yarn.resourcemanager.hostname", "lantingshuxu");
// //设置文件系统hdfs://lantingshuxu:9000表示用hdfs,file:///表示用本地
// configuration.set("fs.defaultFS","hdfs://lantingshuxu:9000");
// 获取运行job对象
Job job = Job.getInstance(configuration);
// 设置jar的位置(这里设置的是类名)
job.setJarByClass(WordCount.class);
// 设置map和reduce的执行类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// // 这是设置自定义的partition
// job.setPartitionerClass(MyPartitioner.class);
// // 自定义reducer的个数
// job.setNumReduceTasks(5);
// // 可以设置一个Combiner(这个实例也是继承自Reducer)
// job.setCombinerClass(WordCountReducer.class);
// 设置自定义的文件格式工具(将零碎的小文件拼起来处理)
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置碎片拼接的最小文件和最大文件
CombineTextInputFormat.setMaxInputSplitSize(job, 40 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job, 20 * 1024);
// 设置map输出的k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出的k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置job的输入源【注意,这里是通过启动参数获取输入源】
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 设置job的输出地址【注意,这里是通过启动参数获取输出源】
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待执行完毕
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
4)执行WordCount类
需要注意的是,我们的代码的60行和62行是通过输入参数获取的输入源和输出源,输入源也就是我们要统计的文件夹,输出源文件夹不能创建出来(保证文件夹不存在)
假设我们在输入源中添加了几个txt文档用来统计单词,输入源文件夹为:E:\wordcount\input;输出源为:E:\wordcount\output。我的测试文档是test.txt,内容如下(也就是WordCount主类的代码):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66package com.lanting.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 执行wordCount
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 创建配置文件并设置配置
Configuration configuration = new Configuration();
// configuration.set("mapreduce.framework.name", "yarn");// yarn表示用集群,local表示本地(默认)
// // 配置yarn资源管理器节点
// configuration.set("yarn.resourcemanager.hostname", "lantingshuxu");
// //设置文件系统hdfs://lantingshuxu:9000表示用hdfs,file:///表示用本地
// configuration.set("fs.defaultFS","hdfs://lantingshuxu:9000");
// 获取运行job对象
Job job = Job.getInstance(configuration);
// 设置jar的位置(这里设置的是类名)
job.setJarByClass(WordCount.class);
// 设置map和reduce的执行类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// // 这是设置自定义的partition
// job.setPartitionerClass(MyPartitioner.class);
// // 自定义reducer的个数
// job.setNumReduceTasks(5);
// // 可以设置一个Combiner(这个实例也是继承自Reducer)
// job.setCombinerClass(WordCountReducer.class);
// 设置自定义的文件格式工具(将零碎的小文件拼起来处理)
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置碎片拼接的最小文件和最大文件
CombineTextInputFormat.setMaxInputSplitSize(job, 40 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job, 20 * 1024);
// 设置map输出的k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出的k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置job的输入源【注意,这里是通过启动参数获取输入源】
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 设置job的输出地址【注意,这里是通过启动参数获取输出源】
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待执行完毕
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
我们添加上启动参数E:\\wordcount\input E:\\wordcount\output
后就可以执行了【注意两个启动参数之间有空格】
执行完后我们会看到wordcount文件夹下生成了output文件夹,打开之,文件列表如下,其中,.crc 结尾的是校验码,我们忽略即可,_SUCCESS表示状态,part-r-00000才是我们的结果:

用文本文档打开 part-r-00000 内容如下:
1 | 51 |
尾巴
其实windows上运行MapReduce最核心的就是解决环境问题;
注意,reduce后的结果是经过了升序排序,并且去掉了特殊字符和标点符号等。
另外,我们也可以将此项目打个jar包(不需要连带依赖包,例如:wordcount.jar),将此jar上传到hadoop集群中进行执行,执行代码:1
2
3
4#wordcount.jar为jar包名 com.lanting.mapreduce.WordCount 为执行主类名
#/wordcount/input /wordcount/out分别表示输入路径和结果输出路径
hadoop jar wordcount.jar com.lanting.mapreduce.WordCount /wordcount/input /wordcount/out