###安装Java(略) ###**安装Flume** 解压缩与重命名 ``` tar -zxvf apache-flume-1.6.0-bin.tar.gz mv apache-flume-1.6.0-bin flume ``` 修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置 ``` cd flume/conf/ cp flume-env.sh.template flume-env.sh vi flume-env.sh ``` 验证是否安装成功 ``` ../bin/flume-ng version ``` ###**案例1:Avro** Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。 1)创建agent配置文件`/liguodong/install/flume/conf/avro.conf` ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sinkp to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` 2)启动flume agent a1 ``` # -c 后面指定conf位置 # -f 后面指定具体的conf文件 # -n 后面指定具体的agent名字 /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console ``` 3)创建指定文件 ``` echo "hello world" > /liguodong/install/flume/tmp/test.log ``` 4)使用avro-client发送文件 ``` # -H 后面是主机名 # -p 后面是端口号 # -F 后面是日志文件 /liguodong/install/flume/bin/flume-ng avro-client -c /liguodong/install/flume/conf -H 0.0.0.0 -p 4141 -F /liguodong/install/flume/tmp/test.log ``` 控制台输出日志输出如下,则表明启动成功。 ``` 2016-02-18 02:34:37,198 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:228)] Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }... 2016-02-18 02:34:37,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2016-02-18 02:34:37,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started 2016-02-18 02:34:37,463 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started. 2016-02-18 02:57:04,314 (New I/O server boss #1 ([id: 0x8ee6cf4c, /0:0:0:0:0:0:0:0:4141])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] OPEN 2016-02-18 02:57:04,316 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141 2016-02-18 02:57:04,316 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:39530 2016-02-18 02:57:04,658 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] DISCONNECTED 2016-02-18 02:57:04,658 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] UNBOUND 2016-02-18 02:57:04,659 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x479937e9, /127.0.0.1:39530 :> /127.0.0.1:4141] CLOSED 2016-02-18 02:57:04,659 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /127.0.0.1:39530 disconnected. 2016-02-18 02:57:07,256 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } ``` ###**案例2:Spool** Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点: 1) 拷贝到spool目录下的文件不可以再打开编辑。 2) spool目录下不可包含相应的子目录 a)创建agent配置文件spool.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /liguodong/install/flume/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console ``` c)追加文件到/home/hadoop/flume-1.5.0-bin/logs目录 ``` echo "spool test" > /liguodong/install/flume/logs/spool_text.log ``` d)在控制台,可以看到以下相关信息: ``` 2016-02-18 03:53:43,629 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2016-02-18 03:53:43,630 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /liguodong/install/flume/logs/spool_text.log to /liguodong/install/flume/logs/spool_text.log.COMPLETED 2016-02-18 03:53:47,615 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{file=/liguodong/install/flume/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 spool test } ``` ###**案例3:Exec** EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容 a)创建agent配置文件exec_tail.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /liguodong/install/flume/log_exec_tail # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console ``` c)生成足够多的内容在文件里 ``` for i in {1..100};do echo "exec tail $i" >> /liguodong/install/flume/log_exec_tail;echo $i;sleep 0.1;done ``` e)在控制台,可以看到以下信息: ``` 2016-02-18 04:16:27,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 31 exec tail 1 } 2016-02-18 04:16:27,821 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 32 exec tail 2 } 2016-02-18 04:16:27,821 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 33 exec tail 3 } 2016-02-18 04:16:27,822 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 34 exec tail 4 } 2016-02-18 04:16:27,822 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 35 exec tail 5 } ... ... ... 2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 39 38 exec tail 98 } 2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 39 39 exec tail 99 } 2016-02-18 04:16:38,758 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 20 31 30 30 exec tail 100 } ``` ###**案例4:Syslogtcp** Syslogtcp监听TCP的端口做为数据源 a)创建agent配置文件syslog_tcp.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动 flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console ``` c)测试产生syslog ``` echo "hello idoall.org syslog" | nc localhost 5140 ``` d)在控制台,可以看到以下信息: ``` 2016-02-18 04:32:01,782 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SyslogTcpSource.start(SyslogTcpSource.java:119)] Syslog TCP Source starting... 2016-02-18 04:32:13,467 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data. 2016-02-18 04:32:13,473 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } ``` ###**案例5:JSONHandler** a)创建agent配置文件post_json.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 8888 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console ``` c)生成JSON 格式的POST request ``` curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888 ``` d)在m1的控制台,可以看到以下信息: ``` 2016-02-18 04:39:05,324 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 2016-02-18 04:39:05,361 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26 2016-02-18 04:39:05,427 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:8888 2016-02-18 04:39:05,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2016-02-18 04:39:05,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started 2016-02-18 04:39:43,317 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{b=b1, a=a1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79 idoall.org_body } ``` ###**案例6:File Roll Sink** a)创建agent配置文件 file_roll.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5555 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /liguodong/install/flume/logs # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console ``` c)测试产生log ``` echo "hello idoall.org syslog" | nc localhost 5555 echo "hello idoall.org syslog 2" | nc localhost 5555 ``` d)查看/liguodong/install/flume/logs下是否生成文件,默认每30秒生成一个新文件 ``` [root@localhost logs]# ll total 16 -rw-r--r--. 1 root root 24 Feb 18 05:24 1455791059065-1 -rw-r--r--. 1 root root 26 Feb 18 05:25 1455791059065-2 -rw-r--r--. 1 root root 0 Feb 18 05:25 1455791059065-3 -rw-r--r--. 1 root root 0 Feb 18 05:25 1455791059065-4 -rw-r--r--. 1 root root 2128 Feb 18 02:50 flume.log.COMPLETED -rw-r--r--. 1 root root 11 Feb 18 03:53 spool_text.log.COMPLETED [root@localhost logs]# cat 1455791059065-1 hello idoall.org syslog [root@localhost logs]# cat 1455791059065-2 hello idoall.org syslog 2 [root@localhost logs]# cat 1455791059065-3 [root@localhost logs]# cat 1455791059065-4 ``` ###**案例7:Hadoop sink** a)创建agent配置文件hdfs_sink.conf ``` # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/syslogtcp a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` b)启动flume agent a1 ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console ``` c)测试产生syslog ``` echo "hello idoall flume -> hadoop testing one" | nc localhost 5140 ``` d)在控制台,可以看到以下信息: ``` 2016-02-19 16:01:54,635 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:316)] Event created from Invalid Syslog data. 2016-02-19 16:01:54,653 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false 2016-02-19 16:01:55,050 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp 2016-02-19 16:01:55,424 (hdfs-k1-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2016-02-19 16:02:28,181 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:363)] Closing hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp 2016-02-19 16:02:28,250 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:629)] Renaming hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645.tmp to hdfs://localhost:9000/flume/syslogtcp/Syslog.1455868914645 2016-02-19 16:02:28,269 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:394)] Writer callback called. ``` e)在localhost上再打开一个窗口,去hadoop上检查文件是否生成。 ``` bin/hdfs dfs -ls /flume/s* Found 1 items -rw-r--r-- 1 liguodong supergroup 155 2016-02-19 16:02 /flume/syslogtcp/Syslog.1455868914645 bin/hdfs dfs -cat /flume/s*/Sys* SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableJ眑�!"�x;(� R����(hello idoall flume -> hadoop testing one ``` ###**案例8:Replicating Channel Selector** Flume支持Fan out流从一个源到多个通道。有两种模式的Fan out,分别是复制和复用。 在复制的情况下,流的事件被发送到所有的配置通道。 在复用的情况下,事件被发送到可用的渠道中的一个子集。 Fan out流需要指定源和Fan out通道的规则。 这次我们需要用到m1,m2两台机器 ( m1 --> liguodong@192.168.101.10 ubuntu m2 --> root@192.168.101.56 hadoop ) a)在m1创建replicating_Channel_Selector配置文件 replicating_Channel_Selector.conf ``` a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = ubuntu a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = hadoop a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 ``` b)在m1创建replicating_Channel_Selector_avro配置文件 replicating_Channel_Selector_avro.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` c)在m1上将2个配置文件复制到m2上一份 ``` scp -r /liguodong/install/flume/conf/replicating_Channel_Selector.conf root@192.168.101.56:/liguodong/install/flume/conf/replicating_Channel_Selector.conf scp -r /liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf ``` d)打开4个窗口,在m1和m2上同时启动两个flume agent ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console ``` e)然后在m1或m2的任意一台机器上,测试产生syslog ``` echo "hello idoall.org syslog" | nc localhost 5140 ``` f)在m1和m2的sink窗口,分别可以看到以下信息,这说明信息得到了同步: ``` 2016-02-22 03:15:35,741 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org } ``` ###**案例9: Multiplexing Channel Selector** ( m1 --> liguodong@192.168.101.10 ubuntu m2 --> root@192.168.101.56 hadoop ) a)在m1创建Multiplexing_Channel_Selector配置文件 Multiplexing_Channel_Selector.conf ``` a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type #映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。 a1.sources.r1.selector.mapping.baidu = c1 a1.sources.r1.selector.mapping.ali = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = ubuntu a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = hadoop a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 ``` b)在m1创建Multiplexing_Channel_Selector_avro配置文件 Multiplexing_Channel_Selector_avro.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` c)将2个配置文件复制到m2上一份 ``` scp -r /liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf root@192.168.101.56:/liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf scp -r /liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf ``` d)打开4个窗口,在m1和m2上同时启动两个flume agent ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf/ -f /liguodong/install/flume/conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console ``` e)然后在m1或m2的任意一台机器上,测试产生syslog ``` curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]' http://localhost:5140 && curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]' http://localhost:5140 ``` f)在m1的sink窗口,可以看到以下信息: ``` 2016-02-22 15:35:08,509 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 idoall_TEST1 } 2016-02-22 15:35:08,510 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33 idoall_TEST3 } ``` g)在m2的sink窗口,可以看到以下信息: ``` 2016-02-22 02:35:13,389 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32 idoall_TEST2 } ``` 可以看到,根据header中不同的条件分布到不同的channel上。 ###**案例10:Flume Sink Processors** failover的机器是一直发送给其中一个sink,当这个sink不可用的时候,自动发送到下一个sink。 ( m1 --> liguodong@192.168.101.10 ubuntu m2 --> root@192.168.101.56 hadoop ) a)在m1创建Flume_Sink_Processors配置文件 /liguodong/install/flume/conf/Flume_Sink_Processors.conf ``` a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 #这个是配置failover的关键,需要有一个sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 #处理的类型是failover a1.sinkgroups.g1.processor.type = failover #优先级,数字越大优先级越高,每个sink的优先级必须不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #设置为10秒,当然可以根据你的实际状况更改成更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = ubuntu a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = hadoop a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 ``` b)在m1创建Flume_Sink_Processors_avro配置文件 /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` c)将2个配置文件复制到m2上一份 ``` scp -r /liguodong/install/flume/conf/Flume_Sink_Processors.conf root@192.168.101.56:/liguodong/install/flume/conf/Flume_Sink_Processors.conf scp -r /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf ``` d)打开4个窗口,在m1和m2上同时启动两个flume agent ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console ``` e)然后在m1或m2的任意一台机器上,测试产生log ``` echo "idoall.org test1 failover" | nc localhost 5140 ``` f)因为m2的优先级高,所以在m2的sink窗口,可以看到以下信息,而m1没有: ``` 2016-02-22 03:46:31,369 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } ``` g)这时我们停止掉m2机器上的sink(ctrl+c),再次输出测试数据: ``` echo "idoall.org test2 failover" | nc localhost 5140 ``` h)可以在m1的sink窗口,看到读取到了刚才发送的两条测试数据: ``` 2016-02-22 16:54:41,897 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 2016-02-22 16:54:41,900 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } ``` i)我们再在m2的sink窗口中,启动sink: ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console ``` j)输入两批测试数据: ``` echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140 ``` k)在m2的sink窗口,我们可以看到以下信息,因为优先级的关系,log消息会再次落到m2上: ``` 2016-02-22 03:59:49,781 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 2016-02-22 04:00:04,783 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 2016-02-22 04:00:04,783 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } ``` ###**案例11:Load balancing Sink Processor** load balance type和failover不同的地方是, load balance有两个配置,一个是轮询,一个是随机。 两种情况下如果被选择的sink不可用,就会自动尝试发送到下一个可用的sink上面。 ( m1 --> liguodong@192.168.101.10 ubuntu m2 --> root@192.168.101.56 hadoop ) a)在m1创建 Load_balancing_Sink_Processors 配置文件 /liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf ``` a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 #这个是配置Load balancing的关键,需要有一个sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = ubuntu a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = hadoop a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 ``` b)在m1创建Load_balancing_Sink_Processors_avro配置文件 /liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5555 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` c)将2个配置文件复制到m2上一份 ``` scp -r /liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf root@192.168.101.56:/liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf scp -r /liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf root@192.168.101.56:/liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf ``` d)打开4个窗口,在m1和m2上同时启动两个flume agent ``` /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console /liguodong/install/flume/bin/flume-ng agent -c /liguodong/install/flume/conf -f /liguodong/install/flume/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console ``` e)然后在m1或m2的任意一台机器上,测试产生log,一行一行输入,输入太快,容易落到一台机器上 ``` echo "idoall.org test1" | nc localhost 5140 echo "idoall.org test2" | nc localhost 5140 echo "idoall.org test3" | nc localhost 5140 echo "idoall.org test4" | nc localhost 5140 echo "idoall.org test5" | nc localhost 5140 ``` f)在m1的sink窗口,可以看到以下信息: ``` 2016-02-22 17:21:06,801 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 } 2016-02-22 17:21:15,805 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 } 2016-02-22 17:21:37,811 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 } 2016-02-22 17:21:37,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 } ``` g)在m2的sink窗口,可以看到以下信息: ``` 2016-02-22 04:21:43,319 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 35 idoall.org test5 } ``` 说明轮询模式起到了作用。 ###**案例12:Hbase sink** a)在测试之前,将hbase启动。 b)确保test_idoall_org表在hbase中已经存在。 进入命令行`bin/hbase shell` ``` #查看表名 list #创建表 create 'test_idoall_org','name' #插入数据 put 'test_idoall_org','10086','name:idoall','idoallvalue' #查看数据 scan 'test_idoall_org' ROW COLUMN+CELL ROW COLUMN+CELL 10086 column=name:idoall, timestamp=1456213159108, value=idoallvalue 1 row(s) in 0.0620 seconds ``` c)在m1创建hbase_simple配置文件 /home/liguodong/flume/hbase_simple.conf ``` a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.type = hbase a1.sinks.k1.table = test_idoall_org a1.sinks.k1.columnFamily = name a1.sinks.k1.column = idoall a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = memoryChannel # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` e)启动flume agent ``` flume-ng agent -c /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/etc/flume-ng/conf.empty -f /home/liguodong/flume/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console ``` f)测试产生syslog ``` echo "hello idoall.org from flume" | nc localhost 5140 ``` g)这时登录到hbase中,可以发现新数据已经插入 ``` scan 'test_idoall_org' ROW COLUMN+CELL 10086 column=name:idoall, timestamp=1456213159108, value=idoallvalue 1456213398294-tsqNo7GB3e-0 column=name:payload, timestamp=1456213401785, value=hello idoall.org from flume 2 row(s) in 0.0540 seconds #退出命令行 quit ``` 参考博文: `http://www.cnblogs.com/lion.net/p/3903197.html`