当前位置:首页 > Web开发 > 正文

Flume的Source、Sink总结,及常用使用场景

2024-03-31 Web开发

标签:

数据源Source

RPC异构流数据交换

Avro Source

Thrift Source

文件或目录变化监听

Exec Source

Spooling Directory Source

Taildir Source

MQ或队列订阅数据持续监听

JMS Source

SSL and JMS Source

Kafka Source

Network类数据交换

NetCat TCP Source

NetCat UDP Source

HTTP Source

Syslog Sources

Syslog TCP Source

Multiport Syslog TCP Source

Syslog UDP Source

定制源

Custom Source

Sink

HDFS Sink

Hive Sink

Logger Sink

Avro Sink

Thrift Sink

IRC Sink

File Roll Sink

HBaseSinks

HBaseSink

HBase2Sink

AsyncHBaseSink

MorphlineSolrSink

ElasticSearchSink

Kite Dataset Sink

Kafka Sink

HTTP Sink

Custom Sink

案例 1、监听文件变化 

exec-memory-logger.properties

#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.1.103 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console

测试

echo "asfsafsf" >> /tmp/log.txt

2、TCP NetCat监听 netcat.properties

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/netcat.properties --name a1 -Dflume.root.logger=INFO,console

测试

telnet localhost 44444

3、Kafka读、写 (读:从kafka到log,,写:从file到kafka)

read-kafka.properties 、write-kafka.properties

#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.s1.channels = c1 a1.sources.s1.batchSize = 5000 a1.sources.s1.batchDurationMillis = 2000 a1.sources.s1.kafka.bootstrap.servers = 192.168.1.103:9092 a1.sources.s1.kafka.topics = test1 a1.sources.s1.kafka.consumer.group.id = custom.g.id #将sources与channels进行绑定 a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = logger #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory

a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec a1.sources.s1.command=tail -F /tmp/kafka.log a1.sources.s1.channels=c1 #设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=192.168.1.103:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=test1 #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100

启动

flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/read-kafka.properties --name a1 -Dflume.root.logger=INFO,console flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/write-kafka.properties --name a1 -Dflume.root.logger=INFO,console

测试

# 创建用于测试主题 bin/kafka-topics.sh --create --bootstrap-server 192.168.1.103:9092 --replication-factor 1 --partitions 1 --topic test1 # 启动 Producer,用于发送测试数据: bin/kafka-console-producer.sh --broker-list 192.168.1.103:9092 --topic test1

4、定制源

a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.channels = c1

5、HDFS Sink 

spooling-memory-hdfs.properties ,监听目录变化,将新建的文件传到HDFS

#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type =spooldir a1.sources.s1.spoolDir =http://www.mamicode.com/tmp/log2 a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = fileName #将sources与channels进行绑定 a1.sources.s1.channels =c1 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/ a1.sinks.k1.hdfs.filePrefix = %{fileName} #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory

测试

hdfs dfs -ls /flume/events/19-11-21/15

6、Hive Sink 

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/43025.html