(一)什么是Flume
Flume是由Cloudera软件公司提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一。
apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。
(二)安装
将安装包上传到Linux,解压(我解压地址为: /home/lanting/flume),
向环境变量中添加:1
2
3
4
5
6
7
8
9
10
11
12 打开环境变量配置文件
vi /etc/profile
添加内容如下
export FLUME_HOME=/home/lanting/flume
export PATH=$PATH:$FLUME_HOME/bin
重新加载配置文件
source /etc/profile
查看版本信息,检查是否安装成功
flume-ng version
(三)具体案例
案例一:
NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。1
2
3
4
5
6#flume官网中NetCat Source描述:
属性名 默认值 描述
channels –
type – The component type name, needs to be netcat
bind – 日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听
port – 日志需要发送到的端口号,该端口号要有netcat类型的source在监听
配置文件为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 指定Agent的组件名称(a),一个进程
a.sources=r1
a.channels=c1
a.sinks=k1
a.sources.r1.type=netcat
a.sources.r1.bind=localhost
a.sources.r1.port=8888
a.sources.r1.channels=c1
#指定下沉的类型为内存
a.channels.c1.type=memory
a.channels.c1.capacity=1000 #一个event记为1
a.channels.c1.transactionCapacity=1000 #一个event记为1
a.sinks.k1.channel=c1
a.sinks.k1.type=logger #下沉到logger中去
启动flume agent a服务端1
2
3
4
5
6
7 其中netcat.conf 为我们上一步创建的配置文件
flume-ng agent -n a -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console
-Dflume.root.logger=DEBUG,console 设置控制台打印
客户端可以使用telnet连接
telnet master 8888 2334/hello/1232
案例二:
NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:hdfs Channel:file (相比于案例1的两个变化)
配置文件为:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#agent组件的名字
a.sources = r1
a.sinks = k1
a.channels = c1
# 配置source
a.sources.r1.type = netcat
a.sources.r1.bind = localhost
a.sources.r1.port = 8888
# 指定下沉的类型为hdfs
a.sinks.k1.type = hdfs
#指定hdfs地址中的输出目录
a.sinks.k1.hdfs.path = hdfs://master:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true
# channel中的事件缓冲到文件中
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/flume/checkpoint
a.channels.c1.dataDirs = /usr/flume/data
# 将source和sinks绑定到channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动flume agent a 服务端:1
2
3
4
5
6# flume-hdfs-test01.properties为配置文件
flume-ng agent -c conf -f flume-hdfs-test01.properties -name a -Dflume.root.logger=INFO,console
#输入测试数据,如:123
#在hdfs的output目录中可以看到目录中多出一个以时间戳命名的文件,文件中写入了你的测试数据(123)
案例三:
Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中 Sink:logger Channel:memory
flume官网中Spooling Directory Source描述:
属性名 | 默认值 | 描述 |
---|---|---|
channels | – | |
type | – | 组件的类型名–> spooldir |
spoolDir | – | Spooling Directory Source监听的目录 |
fileSuffix | .COMPLETED | 文件内容写入到channel之后,标记该文件 |
deletePolicy | never | 文件内容写入到channel之后的删除策略: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip) |
interceptors | – | 指定传输中event的head(头信息),常用timestamp |
Spooling Directory Source的两个注意事项:
1)拷贝到spool目录下的文件不可以再打开编辑
2)不能将具有相同文件名字的文件拷贝到这个目录下
配置文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume agent a1 服务端1
2#flume-nect-test02.properties为配置文件
flume-ng agent -c conf -f flume-nect-test02.properties -name a1 -Dflume.root.logger=INFO,console
案例四
Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例三的两个变化)
配置文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32# Name the components on this agent
a.sources = r1
a.sinks = k1
a.channels = c1
# Describe/configure the source
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /usr/local/datainput
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
# Describe the sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = hdfs://master:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in file
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/flume/checkpoint
a.channels.c1.dataDirs = /usr/flume/data
# Bind the source and sink to the channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动1
2# flume-spooldir-test01.properties 为配置文件
flume-ng agent -c conf -f flume-spooldir-test01.properties -name a -Dflume.root.logger=INFO,console
案例五:
接收json格式数据
配置文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27c.sources=r1 r2
c.channels=c1
c.sinks=s1
c.sources.r1.type=spooldir
c.sources.r1.spoolDir=flume
c.sources.r2.type = http
c.sources.r2.port = 8888
c.source.r2.bind = 192.168.13.100
c.sources.r2.channels = c1
c.channels.c1.type=memory
c.channels.c1.capacity=1000
c.channels.c1.transactionCapacity=100
c.sinks.s1.type=hdfs
c.sinks.s1.hdfs.path=/flume/%y-%m-%d
c.sinks.s1.hdfs.rollInterval=0
c.sinks.s1.hdfs.writeFormat=Text
c.sinks.s1.hdfs.fileType=DataStream
c.sinks.s1.hdfs.rollCount=0
c.sinks.s1.hdfs.rollSize=10485760
c.sinks.s1.hdfs.useLocalTimeStamp=true
c.sources.r1.channels=c1
c.sinks.s1.channel=c1
启动agent1
flume-ng agent -c conf -f flume-http-test01.properties -name c -Dflume.root.logger=INFO,console