1、导入maven引用
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
<!-- elasticsearch http api client -->
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
2、配置文件如下:
#停用服务端口
spring.main.web-environment=false
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=consumer1
spring.kafka.consumer.auto-offset-reset=latest
#spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消息JSON格式化模板
es.data.format.user={"id":"0","name":"1"}
es.data.format.role={"id":"0","name":"1"}
3、初始化Jest客户端
public class EsJestClient {
private static JestClient client;
/**
* 获取客户端
*
* @return jestclient
*/
public static synchronized JestClient getClient() {
if (client == null) {
build();
}
return client;
}
/**
* 关闭客户端
*/
public static void close(JestClient client) {
if (!Objects.isNull(client)) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 建立连接
*/
private static void build() {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(Config.ES_HOST)
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(10000)
.readTimeout(10000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
client = factory.getObject();
}
}
4、实现Kafka批量消费
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.group-id}")
String groupId;
@Value("${spring.kafka.bootstrap-servers}")
String bootstrapServers;
@Value("${spring.kafka.consumer.auto-offset-reset}")
String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);//每一批数量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
5、实现ES 通用业务逻辑层
@Service
public class ESService{
private JestClient client;
public ESService(JestClient client) {
this.client = client;
}
public boolean update(String id, String esType, Object object) {
Index index = new Index.Builder(object).index(Config.ES_INDICES).type(esType).id(id).refresh(true).build();
try {
JestResult result = client.execute(index);
return result != null && result.isSucceeded();
} catch (Exception ignore) {
}
return false;
}
public Index getUpdateIndex(String id, String esType, Object object) {
return new Index.Builder(object).index(Config.ES_INDICES).type(esType).id(id).refresh(true).build();
}
public Delete getDeleteIndex(String id, String esType) {
return new Delete.Builder(id).index(Config.ES_INDICES).type(esType).build();
}
public boolean executeESClientRequest(List indexList, String esType) {
Bulk bulk = new Bulk.Builder()
.defaultIndex(Config.ES_INDICES)
.defaultType(esType)
.addAction(indexList)
.build();
indexList.clear();
try {
JestResult result = client.execute(bulk);
return result != null && result.isSucceeded();
} catch (Exception ignore) {
}
return false;
}
public boolean delete(String id, String esType) {
try {
DocumentResult result = client.execute(new Delete.Builder(id)
.index(Config.ES_INDICES)
.type(esType)
.build());
return result.isSucceeded();
} catch (Exception e) {
throw new RuntimeException("delete exception", e);
}
}
}
6、实现消费逻辑
@Component
public class JsonConsumer {
@Value("${es.data.format.user}")
String userFormat;
@Value("${es.data.format.role}")
String roleFormat;
JestClient client = EsJestClient.getClient();
ESService documentDao = new ESService(client);
@KafkaListener(topics = Config.KAFKA_JSON_TOPICS, id = Config.KAFKA_JSON_ID, containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> list) {
List<String> messages = new ArrayList<>();
for (ConsumerRecord<?, ?> record : list) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// 获取消息
kafkaMessage.ifPresent(o -> messages.add(o.toString()));
}
if (messages.size() > 0) {
// 更新索引
updateES(messages);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取ES的TYPE
*
* @param tableName
* @return
*/
private String getESType(String tableName) {
String esType = "";
switch (tableName) {
case "role": {
esType = Config.ES_ROLE_TYPE;
break;
}
case "user": {
esType = Config.ES_USER_TYPE;
break;
}
}
return esType;
}
/**
* 获取消息JSON解析格式
*
* @param tableName
* @return
*/
private String getJsonFormat(String tableName) {
String format = "";
switch (tableName) {
case "role": {
format = roleFormat;
break;
}
case "user": {
format = userFormat;
break;
}
}
return format;
}
/**
* 获取解析后的ES对象
*
* @param message
* @param tableName
* @return
*/
private JSONObject getESObject(JSONArray message, String tableName) {
JSONObject resultObject = new JSONObject();
String format = getJsonFormat(tableName);
if (!format.isEmpty()) {
JSONObject jsonFormatObject = JSON.parseObject(format);
for (String key : jsonFormatObject.keySet()) {
String[] formatValues = jsonFormatObject.getString(key).split(",");
if (formatValues.length < 2) {
resultObject.put(key, message.get(jsonFormatObject.getInteger(key)));
} else {
Object object = message.get(Integer.parseInt(formatValues[0]));
if (object == null) {
String[] array = {};
resultObject.put(key, array);
} else {
String objectStr = message.get(Integer.parseInt(formatValues[0])).toString();
String[] result = objectStr.split(formatValues[1]);
resultObject.put(key, result);
}
}
}
}
return resultObject;
}
/**
* 更新ES索引
*
* @param messages
*/
private void updateES(List<String> messages) {
List<Index> updateUserList = new ArrayList<>();
List<Index> updateRoleList = new ArrayList<>();
List<Delete> deleteUserList = new ArrayList<>();
List<Delete> deleteRoleList = new ArrayList<>();
for (String message : messages) {
JSONObject result = null;
try {
result = JSON.parseObject(message);
} catch (Exception e) {
continue;
}
// 获取事件类型 event:"wtv3.videos.insert"
String event = (String) result.get("event");
String[] eventArray = event.split("\\.");
String tableName = eventArray[1];
String eventType = eventArray[2];
// 获取具体数据
JSONArray valueStr = (JSONArray) result.get("value");
// 转化为对应格式的json字符串
JSONObject object = getESObject(valueStr, tableName);
// 获取ES的type
String esType = getESType(tableName);
switch (eventType) {
case "insert": {
appendUpdateList(updateUserList, updateRoleList, object, esType);
break;
}
case "update": {
// 更新videos
appendUpdateList(updateUserList, updateRoleList, object, esType);
break;
}
case "delete": {
// 删除videos
appendDeleteList(deleteUserList, deleteRoleList, object, esType);
break;
}
}
}
if (updateUserList.size() > 0) {
documentDao.executeESClientRequest(updateUserList, Config.ES_USER_TYPE);
}
if (updateRoleList.size() > 0) {
documentDao.executeESClientRequest(updateRoleList, Config.ES_ROLE_TYPE);
}
if (deleteUserList.size() > 0) {
documentDao.executeESClientRequest(deleteUserList, Config.ES_USER_TYPE);
}
if (deleteRoleList.size() > 0) {
documentDao.executeESClientRequest(deleteRoleList, Config.ES_ROLE_TYPE);
}
}
private void appendDeleteList(List<Delete> userList, List<Delete> roleList, JSONObject object, String esType) {
switch (esType) {
case Config.ES_USER_TYPE: {
userList.add(documentDao.getDeleteIndex(object.get("id").toString(), esType));
break;
}
case Config.ES_ROLE_TYPE: {
roleList.add(documentDao.getDeleteIndex(object.get("id").toString(), esType));
break;
}
}
}
private void appendUpdateList(List<Index> userList, List<Index> roleList, JSONObject object, String esType) {
switch (esType) {
case Config.ES_USER_TYPE: {
userList.add(documentDao.getUpdateIndex(object.get("id").toString(), esType, object));
break;
}
case Config.ES_ROLE_TYPE: {
roleList.add(documentDao.getUpdateIndex(object.get("id").toString(), esType, object));
break;
}
}
}
}
7、运行结果(源码在前言)