如何将Flume与kafka进行整合

自从Flume1.6开始,新增了对Kafka的支持,极大地提升了Flume的采集能力。避免后端因热点问题导致kafka的channel爆满而无法采集数据。
本篇介绍使用Flume当前最新版本1.8与Kafka的结合使用。

基本环境

  • Kafka (192.168.156.101:9092)
  • Zookeeper(192.168.156.101:2181)
  • JDK1.8

安装Flume

1
2
wget http://apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz

进入apache-flume-1.8.0-bin目录,在conf路径中新增配置文件flume.properties(名称随意)。

1
2
cd apache-flume-1.8.0-bin
touch conf/flume.properties

新增如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
## 此处定义 agent 的source(数据源)、sink(数据流向)、channel(管道)
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1

## 此处定义Agent 数据源的类型 
agent1.sources.source1.type=http
agent1.sources.source1.bind=0.0.0.0
agent1.sources.source1.port=9000
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=100
agent1.channels.channel1.keep-alive=30

## 此处定义kafka的sink topic broker 
agent1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.topic=kafkaTest
agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.156.101:9092
agent1.sinks.sink1.requiredAcks=1
agent1.sinks.sink1.kafka.producer.acks = 1
agent1.sinks.sink1.kafka.flumeBatchSize = 20
agent1.sinks.sink1.kafka.producer.linger.ms = 1
agent1.sinks.sink1.kafka.producer.compression.type = snappy
## 此处定义source的channel 和 sink的channel
agent1.sources.source1.channels=channel1
agent1.sinks.sink1.channel=channel1

启动flume

apache-flume-1.8.0-bin中执行如下命令启动flume。

1
nohup bin/flume-ng agent -f conf/flume.properties -n agent1 -c /home/cdhuser/apache-flume-1.8.0-bin/conf >/dev/null &

注意此处的-f-n-c参数:

  • -f 表示配置文件的路径
  • -n agent的名称,与配置文件中一直
  • -c 配置文件所在的路径

此时,便已经成功启动了flume,source为HTTP,端口为9000,sink为Kafka,channel默认在内存,当然也可以将channel配置为Kafka Channel。

使用Rest Client给9000端口发送数据,然后在kafka消费者端进行查看。

启动kafka消费端

1
2
3
cd /opt/kafka

bin/kafka-console-consumer.sh --zookeeper 192.168.156.101:2181 --topic kafkaTest --from-beginning

然后发送如下测试数据

1
2
3
4
5
6
7
8
9
[
{
"headers" : {
"datatype" : "test",
"timestamp" : 1456989430522
},
"body" : "123123$45645$20160223111222$10.10.170.75$01$1$2$PC"
}
]

此时在kafka消费者那一侧就可以发现如下信息:

1
123123$45645$20160223111222$10.10.170.75$01$1$2$PC

写的比较乱,有空在整理。

Cco.Xyz wechat
坚持原创技术分享,您的支持将鼓励我继续创作!
0%