此功能设计到三个微服务:
service_business: 负责查询数据库数据。
service_business_api:存feigin远程调用接口。
service_canal:负责canal功能。
service_business
controller
@RestController
@CrossOrigin
@RequestMapping("/ad")
public class AdController {
@Autowired
private AdService adService;
@GetMapping("/findAdByPosition")
public List<Ad> findAdByPosition(String position){
List<Ad> adList=adService.findAdByPosition(position);
return adList;
}
这里注意返回类型,这里一般返回Resoult,但这里是集合。
接口及实现类省略。。。
service_business_api
feign接口:
/**
* @author :gzy
* @date :Created in 2019/8/19
* @description :
* @version: 1.0
*/
@FeignClient(name = "business")
@RequestMapping("/ad")
public interface AdFeign {
@GetMapping("/findAdByPosition")
List<Ad> findAdByPosition(@RequestParam(name = "position") String position);
}
service_canal
首先:启动类添加feign客户端
@SpringBootApplication
@EnableCanalClient
//找到feign的接口
@EnableFeignClients(basePackages = "com.changgou.business.feign")
public class CanaTestApplication {
public static void main(String[] args) {
SpringApplication.run(CanaTestApplication.class, args);
}
}
然后引入service_business_api
<!--用于feign-->
<dependency>
<groupId>com.gzy</groupId>
<artifactId>service_business_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
因为后面要是用Redis,所以在yml配置文件加入redis的IP配置。
最后使用:
package com.example.canatest.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.business.feign.AdFeign;
import com.changgou.pojo.Ad;
import com.xpand.starter.canal.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
/**
* @author gzy
* @date 2019/8/19
*/
@CanalEventListener
public class MyEventListener {
@Autowired
private AdFeign adFeign;
@Autowired
private StringRedisTemplate redisTemplate;
// @InsertListenPoint
// public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
// rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
// }
//
// @UpdateListenPoint
// public void onEvent1(CanalEntry.RowData rowData) {
// System.err.println("UpdateListenPoint");
// rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
// }
//
// @DeleteListenPoint
// public void onEvent3(CanalEntry.EventType eventType) {
// System.err.println("DeleteListenPoint");
// }
@ListenPoint(destination = "example", schema = "changgou_business", table = {"tb_ad"}, eventType = {CanalEntry.EventType.UPDATE,CanalEntry.EventType.DELETE,CanalEntry.EventType.INSERT})
public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("广告跟新了");
// rowData.getAfterColumnsList().forEach((c) -> System.err.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
String position = rowData
.getAfterColumnsList()
.stream()
.filter(column -> column.getName().equals("positon"))
.limit(1)
.collect(toList())
.get(0)
.getValue();
System.out.println("position:" + position);
List<Ad> adByPosition = adFeign.findAdByPosition(position);
System.out.println(adByPosition);
String string = JSON.toJSONString(adByPosition);
redisTemplate.boundValueOps("ad_"+position).set(string);
}
}
只要数据库有变动canal就会在bin-log日志中监控到,然后就会自动更新redis了。