首页 > 大数据平台 > kafka > 用flume望kafka 上面发日志
2016
03-22

用flume望kafka 上面发日志

作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka

先要各自安装好,
kafka安装

wget http://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
tar xvf kafka_2.11-0.8.2.2.tgz
mv kafka_2.11-0.8.2.2.tgz  /usr/local/kafka
 
vim /usr/local/kafka/conf/server.properties
 
broker.id:  唯一,填数字
host.name:唯一,填服务器IP
zookeeper.connect=192.168.40.134:2181,192.168.40.132:2181,192.168.40.133:2181

先启动zookeeper服务: bin/zkServer.sh start (本文中zookeeper为独立安装,具体过程在此不细述)
再在每台机器上执行: bin/kafka-server-start.sh config/server.properties
创建topic:

 ./bin/kafka-topics.sh --create --zookeeper 192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181 --replication-factor 1 --partitions 1 --topic kafkaflume

查看Topic:

$ ./bin/kafka-topics.sh --list --zookeeper 192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181 
显示   kafkaflume

查看详细信息:

$ ./bin/kafka-topics.sh  --describe --zookeeper 192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181

发送消息:

$ ./bin/kafka-console-producer.sh --broker-list 192.168.7.244:9092 --topic kafkaflume             
[2015-11-06 17:14:29,074] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
agagaga
[2015-11-06 17:15:04,318] INFO Closing socket connection to /192.168.7.244. (kafka.network.Processor)
444444
aaaaa
^C[2015-11-06 17:15:27,948] INFO Closing socket connection to /192.168.7.244. (kafka.network.Processor)

接收消息:

$ ./bin/kafka-console-consumer.sh --zookeeper 192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181  --topic kafkaflume --from-beginning                        
[2015-11-06 17:16:17,591] INFO Closing socket connection to /192.168.7.244. (kafka.network.Processor)
agagaga
444444
aaaaa

kafka安装好了

flume 安装

wget  http://mirror.bit.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
wget  http://mirror.bit.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-src.tar.gz
 
tar zxvf apache-flume-1.6.0-bin.tar.gz
tar zxvf apache-flume-1.5.0-src.tar.gz
 
cp -ri apache-flume-1.5.0-src/* apache-flume-1.5.0-bin
 
mv apache-flume-1.5.0-bin /usr/local/flume
cd /usr/local/flume/conf

就可以新建配置文件了
防止环境变量问题可以先把

cp flume-env.sh.template flume-env.sh
 
vi flume-env.sh
# set hadoop path
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
 
# set JDK path
export JAVA_HOME=/usr/local/jdk
export PATH=$PATH:$JAVA_HOME/bin
# set SCALA path
export SCALA_HOME=/usr/local/scala
export PATH=${SCALA_HOME}/bin:$PATH
 
#Set ZooKeeper Enviroment
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf
 
#Set hbase path
export HBASE_HOME=/usr/local/hbase
export PATH=$PATH:$HBASE_HOME/bin

新建个配置文件

cat conf/producer1.properties
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
 
#source section
 
#producer.sources.s.type = netcat
#producer.sources.s.bind = localhost
#producer.sources.s.port = 44444
#producer.sources.s.channels = c
 
producer.sources.s.type = exec
producer.sources.s.shell = /bin/bash -c
producer.sources.s.command = tail -n +0 -F /tmp/test.log
producer.sources.s.threads = 5
producer.sources.s.channels = c
 
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=192.168.7.244:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=kafkaflume
 
#Specify the channel the sink should use
producer.sinks.r.channel = c
 
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000

这种方式是以tail 文件的方式,也可以改成我注释掉的端口接收的方式。

配置consumer,source是Kafka,sink是logger

cat conf/comsumer1.properties 
consumer.sources = s
consumer.channels = c
consumer.sinks = r
 
consumer.sources.s.type = seq
consumer.sources.s.channels = c
consumer.sinks.r.type = logger
 
consumer.sinks.r.channel = c
consumer.channels.c.type = memory
consumer.channels.c.capacity = 100
 
consumer.sources.s.type = org.apache.flume.plugins.KafkaSource
consumer.sources.s.zookeeper.connect=192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181
consumer.sources.s.group.id=testGroup
consumer.sources.s.zookeeper.session.timeout.ms=400
consumer.sources.s.zookeeper.sync.time.ms=200
consumer.sources.s.auto.commit.interval.ms=1000
consumer.sources.s.custom.topic.name=test
consumer.sources.s.custom.thread.per.consumer=4
<pre lang="bash">
 
分别运行着两个agent 
./bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
 
./bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console
 
/tmp/test.log 里面的内容就会发送到 kafka的  topic  kafkaflume 上去
用
<pre lang="bash">
./bin/kafka-console-consumer.sh --zookeeper 192.168.7.244:2181,192.168.7.232:2181,192.168.7.237:2181,192.168.7.246:2181  --topic kafkaflume --from-beginning

就能开到信息

如果有2个topic 就在起一个 conf/producer2.properties 文件对应topic 名字就可以了

最后编辑:
作者:saunix
大型互联网公司linux系统运维攻城狮,专门担当消防员

留下一个回复