需求:
有一组好友数据实例如下:
B: A,C 表示B用户加了A、C为好友1
2
3
4A:B,C
B:A,C
C:A,D
D:B,C
我们通过这样的数据通过MapReduce去统计出A、B、C、D之间的共同好友数量(没有共同好友的不输出),例如统计结果如下:1
2
3
4A-B 1
A-D 2
B-C 1
B-D 1
解决思路
我们在map阶段时,可以将 用户:好友,好友,好友...
”转换成 <好友,用户>
模式,所表达的意思就是:把谁加了某些人转换成某个人被谁加过。我们在reduce阶段就可以收到加了某个人的所有用户,这些用户至少就有这么一个共同用户了,我们将所有reduce共同好友数据记录下来,就可以得到每个用户之间的共同好友数量了。
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168/**
* 使用map,reduce获取共有好友数量,文件输入内容格式如下:<br/>
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
*/
public class CommonFriends {
public static void main(String[] args) throws Exception {
// 创建配置文件并设置配置
Configuration configuration = new Configuration();
// 获取运行job对象
Job job = Job.getInstance(configuration);
// 设置jar的位置
job.setJarByClass(CommonFriends.class);
// 设置map和reduce的执行类
job.setMapperClass(FriendsMapper.class);
job.setReducerClass(FriendsReducer.class);
// 设置map输出的k,v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置最终输出的k,v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置job的输入源
FileInputFormat.setInputPaths(job, "F:\\mapreduce\\commonFriends\\input");
// 设置job的输出地址
FileOutputFormat.setOutputPath(job, new Path("F:\\mapreduce\\commonFriends\\output"));
// 等待执行完毕
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
class FriendsMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text friend = new Text();
private Text own = new Text();
protected void map(LongWritable key, Text value, Context context) throws Exception {
//切割出 用户 与 好友 数据
String[] str = value.toString().split(":");
String owner = str[0];// 获取用户
own.set(owner);
String[] friends = str[1].split(",");// 获取好友
//写出<好友,用户>
for (String f : friends) {
context.write(getFriend(f), own);
}
}
private Text getFriend(String f) {
friend.set(f);
return friend;
}
}
class FriendsReducer extends Reducer<Text, Text, Text, NullWritable> {
HashMap<String, FriendBean> map = new HashMap<>();// 记录共同好友信息
ArrayList<String> list = new ArrayList();
Text text = new Text();
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 清空集合
list.clear();
//添加Iterator中的数据到集合中
values.forEach(t -> list.add(t.toString()));
for (int i = 0; i < list.size() - 1; i++) {
for (int j = i + 1; j < list.size(); j++) {
String name1 = list.get(i).toString();
String name2 = list.get(j).toString();
FriendBean b = map.get(getName(name1, name2));//获取是否有好友数量记录
if (b == null) {
b = new FriendBean(name1, name2);
map.put(getName(name1, name2), b);
}
b.addNumber(1);// 共同好友数量+1
}
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
for (FriendBean b : map.values()) {
text.set(b.toString());
context.write(text, NullWritable.get());
}
}
private String getName(String a, String b) {
if (a.compareTo(b) > 0) {
return a + "-" + b;
} else {
return b + "-" + a;
}
}
}
public class FriendBean implements WritableComparable<FriendBean> {
// 判断共同好友的两个人
private String name1;
private String name2;
// 共同好友的数量(默认为0)
private int number = 0;
public FriendBean() {
}
public FriendBean(String n1, String n2) {
if (n1.compareTo(n2) > 0) {
this.name1 = n1;
this.name2 = n2;
} else {
this.name1 = n2;
this.name2 = n1;
}
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public void addNumber(int number) {
this.number += number;
}
public String getNames() {
return name1 + "-" + name2;
}
public String toString() {
return name1 + "-" + name2 + " " + number;
}
public int compareTo(FriendBean o) {
return 0;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name1);
dataOutput.writeUTF(name2);
dataOutput.write(number);
}
public void readFields(DataInput dataInput) throws IOException {
name1 = dataInput.readUTF();
name2 = dataInput.readUTF();
number = dataInput.readInt();
}