###安装Java(略)
###**安装Flume**
解压缩与重命名
```
tar -zxvf apache-flume-1.6.0-bin.tar.gz
mv apache-flume-1.6.0-bin flume
```
修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置
```
cd flume/conf/
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
```
验证是否安装成功
```
../bin/flume-ng version
```
###**案例1:Avro**
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
1)创建agent配置文件`/liguodong/install/flume/conf/avro.conf`
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sinkp to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
2)启动flume agent a1
```
# -c 后面指定conf位置
# -f 后面指定具体的conf文件
# -n 后面指定具体的agent名字
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
```
3)创建指定文件
```
echo "hello world" > /liguodong/install/flume/tmp/test.log
```
4)使用avro-client发送文件
```
# -H 后面是主机名
# -p 后面是端口号
# -F 后面是日志文件
/liguodong/install/flume/bin/flume-ng avro-client -c /liguodong/install/flume/conf -H 0.0.0.0 -p 4141 -F /liguodong/install/flume/tmp/test.log
```
控制台输出日志输出如下,则表明启动成功。
```
2016-02-18 02:34:37,198 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:228)] Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2016-02-18 02:34:37,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2016-02-18 02:34:37,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
2016-02-18 02:34:37,463 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started.
2016-02-18 02:57:04,314 (New I/O server boss #1 ([id: 0x8ee6cf4c, /0:0:0:0:0:0:0:0:4141])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] OPEN
2016-02-18 02:57:04,316 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141
2016-02-18 02:57:04,316 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:39530
2016-02-18 02:57:04,658 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] DISCONNECTED
2016-02-18 02:57:04,658 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] UNBOUND
2016-02-18 02:57:04,659 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] CLOSED
2016-02-18 02:57:04,659 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:39530 disconnected.
2016-02-18 02:57:07,256 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
```
###**案例2:Spool**
Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
1) 拷贝到spool目录下的文件不可以再打开编辑。
2) spool目录下不可包含相应的子目录
a)创建agent配置文件spool.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /liguodong/install/flume/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
```
c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目录
```
echo "spool test" > /liguodong/install/flume/logs/spool_text.log
```
d)在控制台,可以看到以下相关信息:
```
2016-02-18 03:53:43,629 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2016-02-18 03:53:43,630 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /liguodong/install/flume/logs/spool_text.log to /liguodong/install/flume/logs/spool_text.log.COMPLETED
2016-02-18 03:53:47,615 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/liguodong/install/flume/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 spool test }
```
###**案例3:Exec**
EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容
a)创建agent配置文件exec_tail.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /liguodong/install/flume/log_exec_tail
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
```
c)生成足够多的内容在文件里
```
for i in {1..100};do echo "exec tail $i" >> /liguodong/install/flume/log_exec_tail;echo $i;sleep 0.1;done
```
e)在控制台,可以看到以下信息:
```
2016-02-18 04:16:27,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 31 exec tail 1 }
2016-02-18 04:16:27,821 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 32 exec tail 2 }
2016-02-18 04:16:27,821 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 33 exec tail 3 }
2016-02-18 04:16:27,822 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 34 exec tail 4 }
2016-02-18 04:16:27,822 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 35 exec tail 5 }
...
...
...
2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 39 38 exec tail 98 }
2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 39 39 exec tail 99 }
2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 31 30 30 exec tail 100 }
```
###**案例4:Syslogtcp**
Syslogtcp监听TCP的端口做为数据源
a)创建agent配置文件syslog_tcp.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动 flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
```
c)测试产生syslog
```
echo "hello idoall.org syslog" | nc localhost 5140
```
d)在控制台,可以看到以下信息:
```
2016-02-18 04:32:01,782 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SyslogTcpSource.start(SyslogTcpSource.java:119)] Syslog TCP Source starting...
2016-02-18 04:32:13,467 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data.
2016-02-18 04:32:13,473 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
```
###**案例5:JSONHandler**
a)创建agent配置文件post_json.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
```
c)生成JSON 格式的POST request
```
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888
```
d)在m1的控制台,可以看到以下信息:
```
2016-02-18 04:39:05,324 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2016-02-18 04:39:05,361 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26
2016-02-18 04:39:05,427 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:8888
2016-02-18 04:39:05,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2016-02-18 04:39:05,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
2016-02-18 04:39:43,317 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 idoall.org_body }
```
###**案例6:File Roll Sink**
a)创建agent配置文件 file_roll.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5555
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /liguodong/install/flume/logs
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
```
c)测试产生log
```
echo "hello idoall.org syslog" | nc localhost 5555
echo "hello idoall.org syslog 2" | nc localhost 5555
```
d)查看/liguodong/install/flume/logs下是否生成文件,默认每30秒生成一个新文件
```
[root@localhost logs]# ll
total 16
-rw-r--r--. 1 root root 24 Feb 18 05:24 1455791059065-1
-rw-r--r--. 1 root root 26 Feb 18 05:25 1455791059065-2
-rw-r--r--. 1 root root 0 Feb 18 05:25 1455791059065-3
-rw-r--r--. 1 root root 0 Feb 18 05:25 1455791059065-4
-rw-r--r--. 1 root root 2128 Feb 18 02:50 flume.log.COMPLETED
-rw-r--r--. 1 root root 11 Feb 18 03:53 spool_text.log.COMPLETED
[root@localhost logs]# cat 1455791059065-1
hello idoall.org syslog
[root@localhost logs]# cat 1455791059065-2
hello idoall.org syslog 2
[root@localhost logs]# cat 1455791059065-3
[root@localhost logs]# cat 1455791059065-4
```
###**案例7:Hadoop sink**
a)创建agent配置文件hdfs_sink.conf
```
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/syslogtcp
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
b)启动flume agent a1
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
```
c)测试产生syslog
```
echo "hello idoall flume -> hadoop testing one" | nc localhost 5140
```
d)在控制台,可以看到以下信息:
```
2016-02-19 16:01:54,635 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data.
2016-02-19 16:01:54,653 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
2016-02-19 16:01:55,050 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp
2016-02-19 16:01:55,424 (hdfs-k1-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.(NativeCodeLoader.java:62)] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-02-19 16:02:28,181 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:363)] Closing hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp
2016-02-19 16:02:28,250 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:629)] Renaming hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp to hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645
2016-02-19 16:02:28,269 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:394)] Writer callback called.
```
e)在localhost上再打开一个窗口,去hadoop上检查文件是否生成。
```
bin/hdfs dfs -ls /flume/s*
Found 1 items
-rw-r--r-- 1 liguodong supergroup 155 2016-02-19 16:02 /flume/syslogtcp/Syslog.1455868914645
bin/hdfs dfs -cat /flume/s*/Sys*
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableJ眑�!"�x;(�
R����(hello idoall flume -> hadoop testing one
```
###**案例8:Replicating Channel Selector**
Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out,分别是复制和复用。
在复制的情况下,流的事件被发送到所有的配置通道。
在复用的情况下,事件被发送到可用的渠道中的一个子集。
Fan out流需要指定源和Fan out通道的规则。
这次我们需要用到m1,m2两台机器
(
m1 --> liguodong@192.168.101.10 ubuntu
m2 --> root@192.168.101.56 hadoop
)
a)在m1创建replicating_Channel_Selector配置文件 replicating_Channel_Selector.conf
```
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = ubuntu
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = hadoop
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
```
b)在m1创建replicating_Channel_Selector_avro配置文件 replicating_Channel_Selector_avro.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
c)在m1上将2个配置文件复制到m2上一份
```
scp -r /liguodong/install/flume/conf/replicating_Channel_Selector.conf root@192.168.101.56:/liguodong/install/flume/conf/replicating_Channel_Selector.conf
scp -r /liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf
```
d)打开4个窗口,在m1和m2上同时启动两个flume agent
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
```
e)然后在m1或m2的任意一台机器上,测试产生syslog
```
echo "hello idoall.org syslog" | nc localhost 5140
```
f)在m1和m2的sink窗口,分别可以看到以下信息,这说明信息得到了同步:
```
2016-02-22 03:15:35,741 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
```
###**案例9: Multiplexing Channel Selector**
(
m1 --> liguodong@192.168.101.10 ubuntu
m2 --> root@192.168.101.56 hadoop
)
a)在m1创建Multiplexing_Channel_Selector配置文件 Multiplexing_Channel_Selector.conf
```
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。
a1.sources.r1.selector.mapping.baidu = c1
a1.sources.r1.selector.mapping.ali = c2
a1.sources.r1.selector.default = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = ubuntu
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = hadoop
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
```
b)在m1创建Multiplexing_Channel_Selector_avro配置文件 Multiplexing_Channel_Selector_avro.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
c)将2个配置文件复制到m2上一份
```
scp -r /liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf root@192.168.101.56:/liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf
scp -r /liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf
```
d)打开4个窗口,在m1和m2上同时启动两个flume agent
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console
```
e)然后在m1或m2的任意一台机器上,测试产生syslog
```
curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140
```
f)在m1的sink窗口,可以看到以下信息:
```
2016-02-22 15:35:08,509 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 idoall_TEST1 }
2016-02-22 15:35:08,510 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33 idoall_TEST3 }
```
g)在m2的sink窗口,可以看到以下信息:
```
2016-02-22 02:35:13,389 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32 idoall_TEST2 }
```
可以看到,根据header中不同的条件分布到不同的channel上。
###**案例10:Flume Sink Processors**
failover的机器是一直发送给其中一个sink,当这个sink不可用的时候,自动发送到下一个sink。
(
m1 --> liguodong@192.168.101.10 ubuntu
m2 --> root@192.168.101.56 hadoop
)
a)在m1创建Flume_Sink_Processors配置文件
/liguodong/install/flume/conf/Flume_Sink_Processors.conf
```
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#这个是配置failover的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#处理的类型是failover
a1.sinkgroups.g1.processor.type = failover
#优先级,数字越大优先级越高,每个sink的优先级必须不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = ubuntu
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = hadoop
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
```
b)在m1创建Flume_Sink_Processors_avro配置文件
/liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
c)将2个配置文件复制到m2上一份
```
scp -r /liguodong/install/flume/conf/Flume_Sink_Processors.conf root@192.168.101.56:/liguodong/install/flume/conf/Flume_Sink_Processors.conf
scp -r /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf
```
d)打开4个窗口,在m1和m2上同时启动两个flume agent
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
```
e)然后在m1或m2的任意一台机器上,测试产生log
```
echo "idoall.org test1 failover" | nc localhost 5140
```
f)因为m2的优先级高,所以在m2的sink窗口,可以看到以下信息,而m1没有:
```
2016-02-22 03:46:31,369 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
```
g)这时我们停止掉m2机器上的sink(ctrl+c),再次输出测试数据:
```
echo "idoall.org test2 failover" | nc localhost 5140
```
h)可以在m1的sink窗口,看到读取到了刚才发送的两条测试数据:
```
2016-02-22 16:54:41,897 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
2016-02-22 16:54:41,900 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
```
i)我们再在m2的sink窗口中,启动sink:
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
```
j)输入两批测试数据:
```
echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140
```
k)在m2的sink窗口,我们可以看到以下信息,因为优先级的关系,log消息会再次落到m2上:
```
2016-02-22 03:59:49,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
2016-02-22 04:00:04,783 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
2016-02-22 04:00:04,783 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
```
###**案例11:Load balancing Sink Processor**
load balance type和failover不同的地方是, load balance有两个配置,一个是轮询,一个是随机。
两种情况下如果被选择的sink不可用,就会自动尝试发送到下一个可用的sink上面。
(
m1 --> liguodong@192.168.101.10 ubuntu
m2 --> root@192.168.101.56 hadoop
)
a)在m1创建 Load_balancing_Sink_Processors 配置文件
/liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf
```
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
#这个是配置Load balancing的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = ubuntu
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
```
b)在m1创建Load_balancing_Sink_Processors_avro配置文件
/liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
c)将2个配置文件复制到m2上一份
```
scp -r /liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf root@192.168.101.56:/liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf
scp -r /liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf
```
d)打开4个窗口,在m1和m2上同时启动两个flume agent
```
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
/liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
```
e)然后在m1或m2的任意一台机器上,测试产生log,一行一行输入,输入太快,容易落到一台机器上
```
echo "idoall.org test1" | nc localhost 5140
echo "idoall.org test2" | nc localhost 5140
echo "idoall.org test3" | nc localhost 5140
echo "idoall.org test4" | nc localhost 5140
echo "idoall.org test5" | nc localhost 5140
```
f)在m1的sink窗口,可以看到以下信息:
```
2016-02-22 17:21:06,801 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
2016-02-22 17:21:15,805 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
2016-02-22 17:21:37,811 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
2016-02-22 17:21:37,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
```
g)在m2的sink窗口,可以看到以下信息:
```
2016-02-22 04:21:43,319 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 35 idoall.org test5 }
```
说明轮询模式起到了作用。
###**案例12:Hbase sink**
a)在测试之前,将hbase启动。
b)确保test_idoall_org表在hbase中已经存在。
进入命令行`bin/hbase shell`
```
#查看表名
list
#创建表
create 'test_idoall_org','name'
#插入数据
put 'test_idoall_org','10086','name:idoall','idoallvalue'
#查看数据
scan 'test_idoall_org'
ROW COLUMN+CELL
ROW COLUMN+CELL
10086 column=name:idoall, timestamp=1456213159108, value=idoallvalue
1 row(s) in 0.0620 seconds
```
c)在m1创建hbase_simple配置文件
/home/liguodong/flume/hbase_simple.conf
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.column = idoall
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = memoryChannel
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
e)启动flume agent
```
flume-ng agent -c /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/etc/flume-ng/conf.empty -f /home/liguodong/flume/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
```
f)测试产生syslog
```
echo "hello idoall.org from flume" | nc localhost 5140
```
g)这时登录到hbase中,可以发现新数据已经插入
```
scan 'test_idoall_org'
ROW COLUMN+CELL
10086 column=name:idoall, timestamp=1456213159108, value=idoallvalue
1456213398294-tsqNo7GB3e-0 column=name:payload, timestamp=1456213401785, value=hello idoall.org from flume
2 row(s) in 0.0540 seconds
#退出命令行
quit
```
参考博文:
`http://www.cnblogs.com/lion.net/p/3903197.html`