redis 异步消息队列Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush/lpush操作入队列,使用lpop 和 rpop来出队列。队列空了怎么办 ?消费端不断轮询,redis 的 QPS 也会被拉高,导致查询变慢 。 通常sleep 来解决这个问题 Thread.sleep(1000) # java 睡 1s但是还会导致延迟增加, 使用 blpop/brpop ,这两个指令的前缀字符b代表的是blocking,也就是阻塞读。阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。空闲连接的问题 :如果线程一直阻塞在哪里,Redis 的客户端连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用。这个时候blpop/brpop会抛出异常来锁冲突处理:客户端在处理请求时加锁没加成功怎么办。一般有 3 种策略来处理加锁失败:直接抛出异常,通知用户稍后重试; 跟前端交互需要sleep 一会再重试;将请求转移至延时队列,过一会再试; 适合不跟前端交互的场景redis 延时队列的实现延时队列可以通过 Redis 的 zset(有序列表) 来实现。我们将消息序列化成一个字符串作为 zset 的value,这个消息的到期处理时间作为score,然后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 只取一条
java 代码示例
import java.lang.reflect.Type;import java.util.Set;import java.util.UUID;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.TypeReference;import redis.clients.jedis.Jedis;public class RedisDelayingQueue{ static class TaskItem{ public String id; public T msg; } // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference private Type TaskType = new TypeReference>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItemtask = new TaskItem(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试 } public void loop() { while (!Thread.interrupted()) { // 只取一条 Setvalues = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇会继续 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 抢到了 TaskItemtask = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}
Redis 作为消息队列为什么不能保证 100% 的可靠性 ?
消息不保证可靠,应该是消息被发送出去,消费者是否接收到消息redis不做保证,不像一般的mq,会有ack机制,要求消费者收到消息进行ack确认,超时未确认mq会再次投递消息,而redis没有这个机制。为什么redis不考虑加上这个特性呢?我觉得还是其主要用途来决定的,毕竟不是专业消息中间件,同时消费者要做好幂等处理。