需求:
有一组用户关注数据实例如下:
B: A,C 表示B用户关注了A、C。1
2
3
4A:B,C
B:A,C
C:A,D
D:B,C
我们通过这样的数据通过MapReduce去统计出互粉的用户,例如统计结果如下(A和B互粉,A和C互粉,C和D互粉):1
2
3A-B
A-C
C-D
解决思路
我们在map阶段时,可以将用户关注的每个用户都组成一个key,而value则为1,例如:A:B,C 在map阶段就组成<A-B,1>、<A-C,1>输出,其中,key必须进行排序,例如,遇到 B:A,C 时,我们在map阶段就不要输出<B-A,1>了,进行排序后,应该输出<A-B,1>,这样,在reduce时,key为 A-B 的值将会有两个,此时我们就知道A和B互粉,否则为单方面粉丝。
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64public class FriendEachOther {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 创建配置文件并设置配置
Configuration configuration = new Configuration();
// 获取运行job对象
Job job = Job.getInstance(configuration);
// 设置jar的位置
job.setJarByClass(FriendEachOther.class);
// 设置map和reduce的执行类
job.setMapperClass(FriendEachOtherMapper.class);
job.setReducerClass(FriendEachOtherReducer.class);
// 设置map输出的k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出的k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置job的输入源
FileInputFormat.setInputPaths(job, "F:\\mapreduce\\commonFriends\\input");
// 设置job的输出地址
FileOutputFormat.setOutputPath(job, new Path("F:\\mapreduce\\commonFriends\\output2"));
// 等待执行完毕
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
class FriendEachOtherMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text out = new Text();
private IntWritable out2 = new IntWritable(1);
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] str = value.toString().split(":");
String owner = str[0];// 用户id
for (String s : str[1].split(",")) {
out.set(getKey(owner, s));// 排序并拼接成key
context.write(out, out2);// 输出
}
}
public String getKey(String name1, String name2) {
if (name1.compareTo(name2) < 0) {
return name1 + "-" + name2;
} else {
return name2 + "-" + name1;
}
}
}
class FriendEachOtherReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 获取key对应有多少个数据
for (IntWritable i : values) {
sum += i.get();
}
// 如果有两个,表示用户互粉
if (sum == 2) {
context.write(key, NullWritable.get());
}
}
}
输入和输出示例如下:

输入示例为:1
2
3
4
5
6
7
8
9
10
11
12
13
14A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J