Flume的Source、Sink总结,及常用使用场景
标签:
数据源SourceRPC异构流数据交换
Avro Source
Thrift Source
文件或目录变化监听
Exec Source
Spooling Directory Source
Taildir Source
MQ或队列订阅数据持续监听
JMS Source
SSL and JMS Source
Kafka Source
Network类数据交换
NetCat TCP Source
NetCat UDP Source
HTTP Source
Syslog Sources
Syslog TCP Source
Multiport Syslog TCP Source
Syslog UDP Source
定制源
Custom Source
SinkHDFS Sink
Hive Sink
Logger Sink
Avro Sink
Thrift Sink
IRC Sink
File Roll Sink
HBaseSinks
HBaseSink
HBase2Sink
AsyncHBaseSink
MorphlineSolrSink
ElasticSearchSink
Kite Dataset Sink
Kafka Sink
HTTP Sink
Custom Sink
案例 1、监听文件变化exec-memory-logger.properties
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.1.103 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
启动
flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console
测试
echo "asfsafsf" >> /tmp/log.txt
2、TCP NetCat监听 netcat.properties# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动
flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/netcat.properties --name a1 -Dflume.root.logger=INFO,console
测试
telnet localhost 44444
3、Kafka读、写 (读:从kafka到log,,写:从file到kafka)read-kafka.properties 、write-kafka.properties
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.s1.channels = c1 a1.sources.s1.batchSize = 5000 a1.sources.s1.batchDurationMillis = 2000 a1.sources.s1.kafka.bootstrap.servers = 192.168.1.103:9092 a1.sources.s1.kafka.topics = test1 a1.sources.s1.kafka.consumer.group.id = custom.g.id #将sources与channels进行绑定 a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = logger #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1 #设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=192.168.1.103:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=test1 #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100
启动
flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/read-kafka.properties --name a1 -Dflume.root.logger=INFO,console flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/write-kafka.properties --name a1 -Dflume.root.logger=INFO,console
测试
# 创建用于测试主题 bin/kafka-topics.sh --create --bootstrap-server 192.168.1.103:9092 --replication-factor 1 --partitions 1 --topic test1 # 启动 Producer,用于发送测试数据: bin/kafka-console-producer.sh --broker-list 192.168.1.103:9092 --topic test1
4、定制源a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.channels = c1
5、HDFS Sinkspooling-memory-hdfs.properties ,监听目录变化,将新建的文件传到HDFS
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type =spooldir a1.sources.s1.spoolDir =http://www.mamicode.com/tmp/log2 a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = fileName #将sources与channels进行绑定 a1.sources.s1.channels =c1 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/ a1.sinks.k1.hdfs.filePrefix = %{fileName} #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory
测试
hdfs dfs -ls /flume/events/19-11-21/15
6、Hive Sink温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/43025.html