1
2
3
4
|
#!/bin/sh filename=$ 1
hostname=`hostname -s` tail -F $ 1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}'
|
1
2
3
4
5
6
7
8
|
xxxx.sources.kafka1.interceptors = i1 xxxx.sources.kafka1.interceptors.i1.type = regex_extractor xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/ xxxx.sources.kafka1.interceptors.i1.serializers = s1 xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename xxxx.sources.kafka1.selector.type = multiplexing xxxx.sources.kafka1.selector.header = logtypename xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel |
1
2
3
4
5
6
|
serializers 定义匹配组(正则匹配之后的值作为header的值,比如如果 Event body为 1 : 2 : 3 .4foobar5,regex为(\\d):(\\d):(\\d),serializers
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name 为three,那么one-> 1 ,two-> 2 ,three-> 3 .4foobar5,注意可以不必匹配所有的组)
serializers.x.name 作为event的header |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
.... private void configureSerializers(Context context) {
String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
"Must supply at least one name and serializer" );
String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
Context serializerContexts =
new Context(context.getSubProperties( SERIALIZERS + "." ));
serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
for (String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
Context serializerContext = new Context(
serializerContexts.getSubProperties(serializerName + "." ));
String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
Preconditions. checkArgument(!StringUtils. isEmpty(name),
"Supplied name cannot be empty." );
if ( "DEFAULT" .equals(type)) {
serializerList .add( new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
} else {
serializerList .add( new NameAndSerializer(name, getCustomSerializer(
type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
}
}
}
|
1
2
3
4
|
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //直接返回,不做另外的操作(默认的类) org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer //使用指定的formatting pattern把传入的值转换为milliseconds |
1
|
return new RegexExtractorInterceptor( regex , serializerList );
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
static final String REGEX = "regex" ;
static final String SERIALIZERS = "serializers" ;
... public Event intercept(Event event) {
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
for ( int group = 0 , count = matcher.groupCount(); group < count; group++) {
int groupIndex = group + 1 ; // 匹配的index从1开始
if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
.... break ;
}
NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
.... headers.put(serializer. headerName,
serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
}
}
return event;
}
|
相关推荐
大数据采集技术与应用
大数据采集技术与应用
该项目为可缓存的 Flume 拦截器提供了一个框架。 它是使用提供缓存服务的 Spring Framework 4.1.5 版实现的。 要实现自己的拦截器,请克隆此项目并实现 CacheableInterceptor 和 FlumeCacheService 的缺失部分。 ...
flume自定义拦截器学习
电商数仓项目(八) Flume(2) 拦截器开发源代码
flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可
利用java实现了flume的自定义拦截器,实现过滤数据
Flume进阶-自定义拦截器jar包
29-Flume自定义拦截器-多路复用选择器介绍).avi 30-Flume自定义拦截器-编码.avi 31-Flume自定义拦截器-打包&配置信息.avi 32-Flume自定义拦截器-案例测试.avi 35-Flume自定义Source-打包测试.avi 38-Flume-事务源码...
大数据 Flume 框架高频面试题 1、Flume组成,Put事务,Take...2、Flume拦截器 3、Flume采集数据会丢失吗?(防止数据丢失的机制) 4、Flume 内存 5、FileChannel优化 6、Flume Channel容量 7、HDFS Sink小文件处理 等等
Flume拦截器根据事件标头中配置的passedTime检查时间戳字段有选择地过滤事件。 这支持基于包含或排除的过滤。 入门 克隆存储库 构建源 $ mvn clean package 创建拦截器目录并部署 $ mkdir -p /usr/lib/flume-ng/...
Flume拦截器、channel选择器、sink 处理器回顾 Flume实现日志采集到HDFS并自动分区 定时调度Shell脚本实现日志数据分区上传HDFS ETL实现思路分析 ETL中Driver类的实现 ETL中自定义Key的实现 ETL日志解析类的代码实现...
文章《利用Flume拦截器(interceptors)实现Kafka Sink的自定义规则多分区写入》所需的代码和jar包。这里吐槽一下CSDN,上传个文件还必须填这填那的,不填还不行,关键词长度还超出了限制,烦不烦人啊。。。。。。。...
视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。...章节八:拦截器 章节九:Channel选择器 章节十:Sink处理器 章节十一:导入数据到HDFS 章节十二:Flume SDK 章节十三:Flume监控
flume自定义拦截器,来自每个source的数据,每个event都是一个事件,把几个事件合并为一个事件,然后把数据传送给sink进行处理,当然sink也可以自定义处理。
之后,讲解拦截器、Channel选择器、Sink 组和Sink 处理器等内容,它们为Flume 提供灵活的扩展支持。最后,介绍了Flume 的高级使用,如何使用Flume 软件开发工具集(SDK)和Embedded Agent API,如何设计、部署和监控...
文章Hadoop_16_flume中自定义拦截器的jar包,可以用来测试使用。 在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存。
Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。 ...
及Source端增加的一些选择器、拦截器:多路复用/路由器Selector、timestamp拦截器、Serach And Replace Interceptor、Regex Filter Interceptor、Regex Extractor Interceptor、Host Interceptor、Static ...
flume-0621-0.0.1-SNAPSHOT-jar-with-dependencies.jar