Flume
2020-06-08 14:39:12 0 举报
AI智能生成
flume
作者其他创作
大纲/内容
source
type=exec
type = spooldir #监控目录
channel
type = file
type = memory
sink
type = hdfs
fileType = DataStream#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
type = avro
hostname = 192.168.200.210
port = 4141
故障转移sinkgroups
processor.type = failover
sinks = k1 k2
负载均衡
processor.type =load_balance
# 默认是round_robin,还可以选择random
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector = round_robin
#如果backoff被开启,则sink processor会屏蔽故障的sink
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.backoff = true
静态拦截器使用
interceptors = i1
interceptors.i1.type = static
## static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sinks.k1.hdfs.path=hdfs://node1:9000/source/logs/%{type}/%Y%m%d
自定义拦截器
implements Interceptor
自定义Source
继承AbstractSource类并实现Configurable和PollableSource接口
configure(Context context)
- 初始化context
- 初始化context
process()
- 从mysql表中获取数据,然后把数据封装成event对象写入到channel,该方法被一直调用
- 从mysql表中获取数据,然后把数据封装成event对象写入到channel,该方法被一直调用
stop()
- 关 闭相关资源
- 关 闭相关资源
自定义Sink
继承AbstractSink类并实现Configurable
configure(Context context)
- 初始化context
- 初始化context
start()
- 启动准备操作
- 启动准备操作
process()
- 从channel获取数据,然后解析之后,保存在mysql表中
- 从channel获取数据,然后解析之后,保存在mysql表中
stop()
- 关闭相关资源
- 关闭相关资源
启动agent
flume-ng agent -n a1 -c /opt/bigdata/apache-flume-1.8.0-bin/myconf -f /opt/bigdata/apache-flume-1.8.0-bin/myconf/file2Hdfs.conf -Dflume.root.logger=info,console
0 条评论
下一页