List.class); } public static Map?
1)Source中使用拦截器
kafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = com.bigdata.flume.MyInterceptor$Builder
kafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = regex_extractor
kafka_key.sources.sources1.interceptors.i1.regex = .*?\\|(.*?)\\|.*
kafka_key.sources.sources1.interceptors.i1.serializers = s1
kafka_key.sources.sources1.interceptors.i1.serializers.s1.name = key
2)sink中使用自界说的partitioner类:
kafka_key.sinks.sink1.kafka.partitioner.class = com.lxw1234.flume17.SimplePartitioner
自界说的分区类:
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.collect.Lists;
import com.nebo.kafka_study.an2.model.UserBehaviorRequestModel;
import com.nebo.kafka_study.an2.utils.JSONUtil;
public class BehaviorInterceptor implements Interceptor
{
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//如果event为空过滤失
if(event == null || event.getBody() == null || event.getBody().length == 0){
return null;
}
long userId = 0;
//解析日志
try{
UserBehaviorRequestModel model = JSONUtil.json2Object(new String(event.getBody()),UserBehaviorRequestModel.class);
userId = model.getUserId();
}catch (Exception e){
e.printStackTrace();
}
if(userId == 0){
return null;
}
//将userId赋值给key
event.getHeaders().put("key",userId+"");
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> out = Lists.newArrayList();
for (Event event : events) {
Event outEvent = intercept(event);
//event 为空过滤失
if (outEvent != null) { out.add(outEvent); }
}
return out;
}
@Override
public void close() {
}
//措施入口
public static class BehaviorBuilder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new BehaviorInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
Json工具:
package com.nebo.kafka_study.an2.utils;
import com.google.gson.Gson;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class JSONUtil
{
private static final Logger LOG = LoggerFactory.getLogger(JSONUtil.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectMapper NEWMAPPER = new ObjectMapper();
private static JSONUtil jsonUtil;
private static Gson GSON = new Gson();
static {
//NEWMAPPER.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
NEWMAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
private JSONUtil()
{
}
public static JSONUtil getInstance()
{
synchronized (JSONUtil.class) {
if (jsonUtil == null) {
jsonUtil = new JSONUtil();
}
}
return jsonUtil;
}
public static String fromObject(Object obj) throws IOException, JsonGenerationException, JsonMappingException
{
StringWriter stringWriter = new StringWriter();
MAPPER.writeValue(stringWriter, obj);
return stringWriter.toString();
}
温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/32814.html