(一)什么是Flume
Flume是由Cloudera软件公司提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一。
apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。
(二)安装
将安装包上传到Linux,解压(我解压地址为: /home/lanting/flume),
向环境变量中添加:
1 | 打开环境变量配置文件 |
(三)具体案例
案例一:
NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。
1 | #flume官网中NetCat Source描述: |
配置文件为:
1 | # 指定Agent的组件名称(a),一个进程 |
启动flume agent a服务端
1 | 其中netcat.conf 为我们上一步创建的配置文件 |
案例二:
NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:hdfs Channel:file (相比于案例1的两个变化)
配置文件为:
1 | #agent组件的名字 |
启动flume agent a 服务端:
1 | # flume-hdfs-test01.properties为配置文件 |
案例三:
Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中 Sink:logger Channel:memory
flume官网中Spooling Directory Source描述:
属性名 | 默认值 | 描述 |
---|---|---|
channels | – | |
type | – | 组件的类型名–> spooldir |
spoolDir | – | Spooling Directory Source监听的目录 |
fileSuffix | .COMPLETED | 文件内容写入到channel之后,标记该文件 |
deletePolicy | never | 文件内容写入到channel之后的删除策略: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
ignorePattern | ^$ | Regular expression specifying which files to ignore (skip) |
interceptors | – | 指定传输中event的head(头信息),常用timestamp |
Spooling Directory Source的两个注意事项:
1)拷贝到spool目录下的文件不可以再打开编辑
2)不能将具有相同文件名字的文件拷贝到这个目录下
配置文件:
1 | a1.sources = r1 |
启动flume agent a1 服务端
1 | #flume-nect-test02.properties为配置文件 |
案例四
Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例三的两个变化)
配置文件:
1 | # Name the components on this agent |
启动
1 | # flume-spooldir-test01.properties 为配置文件 |
案例五:
接收json格式数据
配置文件:
1 | c.sources=r1 r2 |
启动agent
1 | flume-ng agent -c conf -f flume-http-test01.properties -name c -Dflume.root.logger=INFO,console |