Spring Boot整合redis实现队列存储微服务

本文介绍Spring Boot整合Redis实现队列存储。队列存储通常以Rest微服务形式提供服务接口,所以Spring Boot+Redis是一个理想选型。

典型的应用场景,比如爬虫系统中任务列表的存储,各个爬虫子进(线)程独立、主动访问该队列获取URLs,并支持批量获取。

  • Step1:
    Spring Boot工程的Maven中添加依赖:
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
</dependency>

本文使用SpringBoot 1.5.2.RELEASE。

  • Step2
    Application.java入口定义必要的Bean:
@SpringBootApplication(scanBasePackages = { 
        "", "" })
public class Application implements CommandLineRunner {

    @Autowired private JedisConnectionFactory jedisConnFactory;
    
    @Bean
    public StringRedisTemplate redisTemplate() {

        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(jedisConnFactory);
        return redisTemplate;
    }
    
    @Bean
    public QueueService queueService() {
        return new QueueServiceSDRImpl(redisTemplate());
    }
    
    public static void main(String[] args) throws InterruptedException {

        SpringApplication app = new SpringApplication(Application.class);
        app.setBannerMode(Banner.Mode.CONSOLE);
        app.setWebEnvironment(true);
        app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Project: running...");
    }
}

在此只定义一个StringRedisTemplate,至于保存对象的需求可以手动转成json存储。

  • Step3:定义QueueService接口:
public interface QueueService {

    /**
     * 取N条URL队列数据
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL);
    
    /**
     * URL队列入队
     * @param webURLList
     * @return
     */
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray);
    
    /**
     * URL队列长度
     * @param fullTaskName
     * @return
     */
    public Long queueSize(String fullTaskName);
    
    /**
     * 清空URL队列
     * @param fullTaskName
     * @return
     */
    public void queueDump(String fullTaskName);
    
    /**
     * 是否已访问过
     * @param fullTaskName
     * @param url
     * @return
     */
    public Boolean hasVisit(String fullTaskName, String url);
    
    /**
     * 保存链接对象
     * @param fullTaskName
     * @param url
     */
    public Long saveURL(String fullTaskName, String... visitedLinkArray);
    
}
  • Step4:QueueServiceSDRImpl.java的具体实现:
public class QueueServiceSDRImpl implements QueueService {

    private StringRedisTemplate redisTemplate;

    private static String HEAD_HISTORY = "HIST:";
    private static String HEAD_QUEUE = "QUEUE:";

    public QueueServiceSDRImpl(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL) {

        List<Object> results = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
                for (int i = 0; i < numbersOfURL; i++) {
                    stringRedisConn.lPop(HEAD_QUEUE.concat(fullTaskName));
                }
                return null; 
            }
        });
        return results.stream().filter(obj -> obj != null).map(obj -> JSONObject.parseObject(obj.toString(), BasicWebURL.class)).collect(Collectors.toList());
    }

    @Override
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray) {
        Long result = -1L;
        BoundListOperations<String, String> opt = redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName));
        try {
            opt.rightPushAll(webURLJSONStringArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Override
    public Long queueSize(String fullTaskName) {
        return redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).size();
    }

    @Override
    public void queueDump(String fullTaskName) {
        redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
        redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
    }

    @Override
    public Boolean hasVisit(String fullTaskName, String url) {
    
        Boolean hasVisit = false;
        try {
            hasVisit = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).isMember(url);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return hasVisit;
    }

    @Override
    public Long saveURL(String fullTaskName, String... visitedLinkArray) {
    
        Long result = -1L;
        try {
            result = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).add(visitedLinkArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

}

在QueueServiceSDRImpl中实现了两种队列(库),库的value分别是List和Set,特性对应java中的List(有序)和Set(查重)各自特性。fullTaskName为Spring封装的Redis存储中的key对象。

  • Step5,最后看一下QueueController如何暴露服务接口:
@RestController
@RequestMapping()
public class QueueController {

    @Autowired QueueService queueService;
    
    /**
     * Fetch n BasicWebURLs.
     * @param request
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    @GetMapping("/queue/{fullTaskName}")
    public JSONObject webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="10", required=false) Long numbersOfURL) {
        
        JSONObject jo = new JSONObject();
        if(numbersOfURL > 0) {
            jo.put("popLength", numbersOfURL);
            jo.put("data", queueService.fetchN(fullTaskName, numbersOfURL));
        }else{
            jo.put("popLength", 0);
            jo.put("data", Lists.newArrayList());
        }
        jo.put("stillHas", queueService.queueSize(fullTaskName));
        return jo;
    }
    
    /**
     * 入队
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/queue/{fullTaskName}")
    public Long enQueue(HttpServletRequest request, @PathVariable String fullTaskName, @RequestBody String body) {

        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("webURLs");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                return queueService.enQueue(fullTaskName, jsonArray);
            }
        }
        return -1L;
    }
    
    /**
     * 
     * @param request
     * @param fullTaskName
     * @return
     */
    @DeleteMapping("/queue/{fullTaskName}")
    public Integer queueDump(HttpServletRequest request, @PathVariable String fullTaskName) {
        queueService.queueDump(fullTaskName);
        return 1;
    }
    
}

以及在前文所述爬虫系统场景中,用作查重的接口:

@RestController
@RequestMapping("/link")
@Getter
@Setter
public class VisitedLinkController {

    @Autowired QueueService queueService;
    
    @GetMapping("/{fullTaskName}")
    public String webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="", required = false) String link) {
        return queueService.hasVisit(fullTaskName, link) ? "y" : "n";
    }

    /**
     * 加入访问历史
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/{fullTaskName}")
    public Boolean visitLinks(HttpServletRequest request, 
            @PathVariable String fullTaskName, @RequestBody String body) {
        
        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("visitedLinks");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                queueService.saveURL(fullTaskName, jsonArray);
            }
        }
        return true;
    }
    
}

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,679评论 6 342
  • 昨晚和今晚把《小女生,职场修行记》看完,昨晚看到1.00。(熊猫眼)。我这人吧就是看书特死劲(或者说特投入吧)。...
    seven小小柒阅读 214评论 0 1
  • 两个人若能成为好朋友,那一定是两个心灵之间的互相认可。 1. 小A和小B是室友,一起上学、一起吃饭、一起逛街、一起...
    一颗小香猪阅读 346评论 0 2
  • 然后输入@"[^"]*[\u4E00-\u9FA5]+[^"\n]*" 就可方便查找的所有字符串.
    一个开发者_阅读 1,479评论 1 2