1. Redis通信协议
Jedis Client是Redis官网推荐的一个面向java客户端,库文件实现了对redis各类API进行封装调用。redis通信协议是Redis客户端与Redis Server之间交流的语言,它规定了请求和返回值的格式。redis-cli与server端使用一种专门为redis设计的协议RESP(Redis Serialization Protocol)交互,Resp本身没有指定TCP,但redis上下文只使用TCP连接。
RESP规定:
- 用 \r\n 做间隔
- 对于简单的字符串,以+开头
set hello world
+OK\r\n
- 对于错误消息,以-开头,例如:
sethx // 该命令不存在
-ERR unknown command 'sethx'
- 对于整数,以:开头,例如:
dbsize
:100\r\n
- 对于大字符串,以$开头,接着跟上字符串长度的数字:最长为512MB。例如:
get name
$6\r\nfoobar\r\n 代表一个长6的字符串, foobar
$0\r\n\r\n 长度为0 的空字符串
$-1\r\n Null
- 对于数组,以*开头,接上数组元素的个数。
*0\r\n 一个空的数组
mset name1 foo name2 bar
mget name1 name2
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n 一个有两个元素的数组 foo bar
- 我们知道redis-cli只能看到最终的执行结果,因为redis-cli本身就是按照RESP进行结果解析的,所以看不到中间结果。
- 通过RESP,执行一个命令,客户端与服务端的交互步骤如下:
输入命令->将命令编码成字节流->通过TCP发送到服务端->服务端解析字节流->服务端执行命令->
->将结果编码成字节流->通过TCP链接发送给客户端->解析字节流->得到执行结果
比如执行set hello world,根据resp协议,需要客户端解析为下面格式字节流发送给服务端
*3\r\n
$3\r\nset\r\n
$5\r\nhello\r\n
$5\r\nworld\r\n
2. jedis通信原理
试想,如果让我们自己根据上面提到的协议用java去实现一个客户端与redis服务端实现通信,该怎么做呢?
public class TSocketClient {
// 定义socket
private Socket socket;
public TSocketClient() {
try {
socket = new Socket("192.168.58.99", 6379);
} catch (IOException e) {
e.printStackTrace();
}
}
public String set(final String key, final String value) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*3").append("\r\n");
sb.append("$3").append("\r\n");
sb.append("set").append("\r\n");
sb.append("$").append(key.getBytes().length).append("\r\n");
sb.append(key).append("\r\n");
sb.append("$").append(value.getBytes().length).append("\r\n");
sb.append(value).append("\r\n");
socket.getOutputStream().write(sb.toString().getBytes());
byte[] b = new byte[2048];
socket.getInputStream().read(b);
return new String(b);
}
public String get(final String key) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("*2").append("\r\n"); // *表示数组 后面数字表示数组长度
sb.append("$3").append("\r\n");
sb.append("get").append("\r\n");
sb.append("$").append(key.getBytes().length).append("\r\n"); // 美元符号表示字符串,后面的数字表示长度
sb.append(key).append("\r\n");
socket.getOutputStream().write(sb.toString().getBytes());
byte[] b = new byte[2048];
socket.getInputStream().read(b);
return new String(b);
}
public static void main(String[] args) throws IOException {
TSocketClient client = new TSocketClient();
client.set("hello", "ziyan");
}
}
上面代码通过实现resp协议实现了与redis服务端的通信,其实jedis客户端本质上也是通过建立socket按照resp协议与redis通信,下面来分析jedis具体的代码:
第一部分:jedis对象的创建:Jedis jedis = new Jedis(); 主要是创建连接Redis服务器的客户端,在Jedis基类BinaryJedis中主要有Connection对象,创建jedis对象的时候尚未连接到redis服务器,在Connection类中,主要设置了链接Redis所使用socket的参数以及操作socket所使用的工具。
//创建Redis客户端
Jedis jedis = new Jedis();
//调用set 命令,返回状态标记
String code=jedis.set("s", "s");
System.out.println("code="+code);
//调用get命令
String s =jedis.get("s");
System.out.println("s="+s);
//Jedis客户端链接,使用原始socket进行链接
public class Connection implements Closeable
{
private static final byte[][] EMPTY_ARGS = new byte[0][];
//默认主机
private String host = Protocol.DEFAULT_HOST;
//默认端口
private int port = Protocol.DEFAULT_PORT;
//原始socket
private Socket socket;
//输入输出流
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
private int soTimeout = Protocol.DEFAULT_TIMEOUT;
private boolean broken = false;
public Connection() {
}
}
从Connection的成员变量中可以看出,jedis使用了jdk的io socket来处理网络通信。
第二部分:在调用 String code=jedis.set("s", "s"); 命令的时候,才是真正创建链接的过程。Client(BinaryClient).set(byte[], byte[]) 方法参数就是把由String 字符串转换成字节数值,并调用Connection的sendCommand方法来发送Redis命令。
//每次发送命令前都判断是否链接,如果链接端口并且链接不上,则抛出异常
protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
connect();//每次发送Redis命令都会调用Connect()方法来链接Redis远程服务器
Protocol.sendCommand(outputStream, cmd, args); //操作socket 的输出流来发送命令
return this;
} catch (JedisConnectionException ex) {
/*
* When client send request which formed by invalid protocol, Redis send back error message
* before close connection. We try to read it to provide reason of failure.
*/
try {
String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
if (errorMessage != null && errorMessage.length() > 0) {
ex = new JedisConnectionException(errorMessage, ex.getCause());
}
} catch (Exception e) {
/*
* Catch any IOException or JedisConnectionException occurred from InputStream#read and just
* ignore. This approach is safe because reading error message is optional and connection
* will eventually be closed.
*/
}
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
每次调用sendCommand发送命令时候,都会调用Connnect()方法尝试链接远程端口。
//在发送命令之前连接redis服务器
public void connect() {
if (!isConnected()) {
try {
//创建新socket
socket = new Socket();
//设置socket参数
socket.setReuseAddress(true);
socket.setKeepAlive(true); // Will monitor the TCP connection is
// valid
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
// ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method,
// the underlying socket is closed
// immediately
// <-@wjw_add
//设置链接超时时间
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
//设置读取超时时间
socket.setSoTimeout(soTimeout);
//获取socket原始输入输出流
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
每次链接到远程Redis服务器后,第一个命令就是发送密钥命令,这是一个BinaryClient的重写方法,方法里还是调用父类Connection的connect方法。
@Override
public void connect() {
if (!isConnected()) {
super.connect();
if (password != null) {
auth(password);
getStatusCodeReply();
}
if (db > 0) {
select(Long.valueOf(db).intValue());
getStatusCodeReply();
}
}
}
在每次发送一个命令后,都会去获取返回码。
public String set(final String key, String value) {
checkIsInMultiOrPipeline();
client.set(key, value);
return client.getStatusCodeReply();
}
在取状态码时,每次都去刷新通道。读取数据流最终通过SocketInputStream 类来读取。
public String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
}
3. jedis的使用
jedis的使用方法很简单:
// 创建Redis客户端
Jedis jedis = new Jedis("192.168.58.99", 6379);
// 调用set 命令,返回状态标记
String code=jedis.set("s", "xxx");
System.out.println("code="+code);
// 调用get命令
String s =jedis.get("s");
System.out.println("s="+s);
上面代码中只用到了包含ip和端口两个参数的构造函数,更常用的是包含四个参数的构造函数:
public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout)
// connectionTimeout 表示客户端连接超时
// soTimeout 表示客户端读写超时
3.1 为什么要用jedis连接池?
虽然基于内存的Redis数据库有着超高的性能,但是底层的网络通信却占用了一次数据请求的大量时间,因为每次数据交互都需要先建立连接,假设一次数据交互总共用时30ms,超高性能的Redis数据库处理数据所花的时间可能不到1ms,也即是说前期的连接占用了29ms,上面介绍的jedis直连方式,也就是每次new jedis都会新建tcp连接,使用后再断开连接,这对于频繁访问redis的场景显然不是高效的使用方式。连接池则可以实现在客户端建立多个链接并且不释放,当需要使用连接的时候通过一定的算法获取已经建立的连接,使用完了以后则还给连接池,这就免去了数据库连接所占用的时间。因此,通常会使用连接池的方式对Jedis连接进行管理,所有jedis对象会预先放在池子中(JedisPool),每次要连接redis,只需要在池子中借,用完了再归还给池子。
客户端连接Redis使用的是TCP协议,直连的方式每次需要建立TCP连接,而连接池的方式是可以预先初始化好jedis连接,每次只需要从jedis连接池借用即可,借用和归还操作是在本地进行的,只有少量的并发同步开销,远远小于新建tcp连接的开销。此外,连接池的方式可以有效保护和控制资源的使用,而直连的方式无法限制jedis对象的个数,并且可能存在连接泄漏的情况。
Jedis提供了JedisPool这个类作为Jedis的连接池,同时使用Apache的通用对象池工具common-pool作为资源的管理工具。JedisPoolConfig继承了GenericObjectPoolConfig,提供了很多参数配置:
- maxActive: 控制一个pool可分配多少个jedis实例,如果pool已经分配了maxActive个jedis实例,此时pool的状态为exhausted。
- maxIdle: 控制一个pool最多有多少个状态为idle的jedis实例。(如果超出了这个阈值,会close掉超出的连接)
- whenExhaustedAction: 表示当pool中的jedis实例被allocated完时,pool要采取的操作:默认有三种:
WHEN_EXHAUSTED_FAIL : 表示无jedis实例时,直接抛出NoSuchElementException,
WHEN_EXHAUSTED_BLOCK 表示阻塞住,达到maxWait时抛出,JedisConnectionException
WHEN_EXHAUSTED_GROW 表示新建一个jedis实例,设置的maxActive无用。 - maxwait: 表示borrow一个jedis实例时,最大的等待时间,如果超出等待时间,直接抛出jedisConnectionException。
- testOnBorrow:在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则使用redisPING PONG命令测试redis连接是否有效,保证得到的jedis实例均是可用的;
- testOnReturn:在return给pool时,是否提前进行validate操作;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class TestJedis {
public static final Logger logger = LoggerFactory.getLogger(TestJedis.class);
// Jedispool
JedisCommands jedisCommands;
JedisPool jedisPool;
// common-pool 连接池配置,
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
String ip = "192.168.58.99";
int port = 6379;
int timeout = 2000;
public TestJedis() {
// 初始化jedis
// 设置配置
jedisPoolConfig.setMaxTotal(1024);
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMaxWaitMillis(100);
jedisPoolConfig.setTestOnBorrow(false);
jedisPoolConfig.setTestOnReturn(true);
// 初始化JedisPool
jedisPool = new JedisPool(jedisPoolConfig, ip, port, timeout);
//
Jedis jedis = jedisPool.getResource();
jedisCommands = jedis;
}
public void setValue(String key, String value) {
this.jedisCommands.set(key, value);
}
public String getValue(String key) {
return this.jedisCommands.get(key);
}
public static void main(String[] args) {
TestJedis testJedis = new TestJedis();
testJedis.setValue("testJedisKey", "testJedisValue");
logger.info("get value from redis:{}",testJedis.getValue("testJedisKey"));
}
}
对于jedis对象池的原理可参考我的上一篇文章jedis对象池
3.2 jedis pipeline
我们知道redis提供了mget、mset方法,但没有提供mdel方法,如果要实现这个功能,可以借助Pipeline来模拟批量删除,Jedis支持Pipeline特性,可以通过jedis实现。
public void mdel(List<String> keys) {
Jedis jedis = new Jedis("192.168.58.99");
Pipeline pipeline = jedis.pipelined();
for (String key : keys) {
pipeline.del(key); // 此时命令并非真正执行
}
// 真正执行命令
pipeline.sync(); // 除了pipline.sync(),还可以使用pipeline.syncAndReturnAll()将pipeline的命令进行返回。
}