相信大家在日常工作中会遇到这种情况,一台机器上两个进程通过socket相互通信,server端重启后端口冲突直接起不来了...此时rd同学心里一紧,是不是写出bug了赶紧查日志发现是端口冲突了笔者目前在猫眼从事servicemesh相关研发工作,我们公司使用新美大的机器,无法拥有对机器的控制权,因此也就无法基于k8s来做servicemesh。简单来说,sdk通过http的方式与mesh交互做服务注册发现,那sdk如何发现mesh 就成了一个问题。
如上图所示,最开始sdk通过本地文件发现mesh的admin地址,mesh重启后为防止端口冲突问题,会选一个未占用的端口进行监听,然后把最新的admin地址写到本地文件。sdk要能及时的刷新mesh最新的admin地址,所以sdk内部要有一个定时任务定期刷新admin地址。由此来看sdk的逻辑就变得非常重了,sdk本身的定位就是序列化和反序列化数据发送给mesh。那么有什么好的方法能解决上面这么恶心的问题吗?当然有啦,通过uds就可以。
Unix domain socket 又叫 IPC(inter-process communication 进程间通信) socket,用于实现同一主机上的进程间通信。socket 原本是为网络通讯设计的,但后来在 socket 的框架上发展出一种 IPC 机制,就是 UNIX domain socket。虽然网络 socket 也可用于同一台主机的进程间通讯(通过 loopback 地址 127.0.0.1),但是 UNIX domain socket 用于 IPC 更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC 机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。有了uds进行本机通信,再也不用担心mesh重启或启动时端口冲突的问题了。使用uds之后sdk与mesh的交互方式。这样sdk直接拿到uds路径new uds客户端调用mesh就可以了,很方便
sdk的语言有多种,我拿Java来举例。mesh 使用go语言实现,很容易实现一个应用层http协议传输层uds协议的server。
func main() {
RunServer()
}
var (
// 声明 Unix 套接字的地址
serverAddr = &net.UnixAddr{Name: "/opt/test.sock", Net: "unix"}
)
func RunServer() {
// unlink 系统调用比较特殊。关于它的描述中有一点:如果这个文件是一个 unix socket,它会被移除,但是打开它的进程可以继续使用它。也就是说新旧进程都会在这个地址监听。
syscall.Unlink(serverAddr.Name)
lis, err := net.ListenUnix("unix", serverAddr)
if err != nil {
fmt.Println("ListenUnix", err)
return
}
http.HandleFunc("/get", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
query := req.URL.Query()
get := query.Get("key")
fmt.Printf("server get key = %s value = %s \n", "key", get)
builder := strings.Builder{}
for i := 0; i < 3; i++ {
builder.WriteString(strconv.Itoa(i))
}
s := req.Header.Get("sequenceid")
w.Header().Add("sequenceid", s)
w.Write([]byte(builder.String()))
}))
http.HandleFunc("/post", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
bytes, err := ioutil.ReadAll(req.Body)
req.Body.Close()
if err != nil {
return
}
h := req.Header.Get("sequenceid")
contentType := req.Header.Get("Content-Type")
fmt.Println(contentType)
w.Header().Add("sequenceid", h)
fmt.Printf("server post receive request %s \n", string(bytes))
w.Write([]byte("post success"))
}))
svr := &http.Server{Handler: http.DefaultServeMux}
err = svr.Serve(lis)
if err != nil {
fmt.Println("Serve err:", err)
}
}
而Java语言就相对麻烦了,在做这一块的时候在网上调研没有找到什么资料,最终使用netty封装了一个HTTPUdsClient的包。因为netty是异步的,所以要把异步转同步,这里只是给出一个简单的demo并没有转同步。那么异步转同步如何实现呢?我的做法是在http header中添加Sequenceid,channel中发完数据后使用CountDownLatch wait 等待,当mesh 返回数据后解码进入到Handler中,触发CountDownLatch 的countDown操作,很容易就异步转同步了。还有一点说明:每次http 响应收到后都会把uds连接关闭掉。
public class NettyUdsHttpClient {
public static void main(String[] args) throws Exception {
final NettyUdsHttpClient nettyUdsHttpClient = new NettyUdsHttpClient();
nettyUdsHttpClient.request("/opt/test.sock");
}
private Bootstrap b = null;
private static EventLoopGroup workerGroup = null;
public NettyUdsHttpClient() {
EventLoopGroup workerGroup = null;
Class domainSocketChannelClazz = null;
if (Epoll.isAvailable()) {
domainSocketChannelClazz = EpollDomainSocketChannel.class;
System.out.println("Epoll.isAvailable");
workerGroup = new EpollEventLoopGroup(1);
} else if (KQueue.isAvailable()) {
System.out.println("KQueue.isAvailable");
workerGroup = new KQueueEventLoopGroup(1);
domainSocketChannelClazz = KQueueDomainSocketChannel.class;
} else {
System.out.println("use NioEventLoopGroup");
workerGroup = new NioEventLoopGroup(1);
domainSocketChannelClazz = NioSocketChannel.class;
}
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(domainSocketChannelClazz);
b.option(ChannelOption.SO_KEEPALIVE, false);
b.handler(new ChannelInitializer<DomainSocketChannel>() {
@Override
public void initChannel(DomainSocketChannel ch) throws Exception {
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
ch.pipeline().addLast(new HttpResponseDecoder());
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
private Map<String, String> headerMap = new HashMap<>();
private int statusCode;
private StringBuilder contentStr = new StringBuilder();
private int currentSequenceId;
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
DefaultHttpResponse response = (DefaultHttpResponse) msg;
this.statusCode = response.status().code();
HttpHeaders headers = response.headers();
Integer sequenceId = headers.getInt("Sequenceid");
if (sequenceId != null) {
this.currentSequenceId = sequenceId;
}
Iterator<Map.Entry<String, String>> headerIterator = headers.iteratorAsString();
// 封装header
while (headerIterator.hasNext()) {
Map.Entry<String, String> header = headerIterator.next();
headerMap.put(header.getKey(), header.getValue());
}
}
// 和mesh交互,没有Trailer,因此不考虑
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
contentStr.append(content.content().toString(StandardCharsets.UTF_8));
if (msg instanceof LastHttpContent) {
// http 响应已经读完
System.out.println("currentSequenceId = " + currentSequenceId + "响应码 = " + statusCode + " headerMap = " + headerMap + " content = " + contentStr.toString());
ctx.channel().close();
}
}
}
});
}
});
this.b = b;
this.workerGroup = workerGroup;
}
public void request(String path) throws Exception {
try {
// Start the client.
ChannelFuture f = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();
// get 请求
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/get?key=123", Unpooled.EMPTY_BUFFER);
request.headers().set("Sequenceid", 1);
request.headers().set(HttpHeaderNames.HOST, "daemon");
// 发送http请求
f.channel().writeAndFlush(request);
// post 请求
ChannelFuture f1 = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();
String msg = "hello";
ByteBuf byteBuf = Unpooled.wrappedBuffer(msg.getBytes("UTF-8"));
DefaultFullHttpRequest request1 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/post", byteBuf);
request1.headers().set(HttpHeaderNames.HOST, "daemon");
request1.headers().set("Sequenceid", 2);
request1.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
request1.headers().set(HttpHeaderNames.CONTENT_LENGTH, request1.content().readableBytes());
f1.channel().writeAndFlush(request1);
System.out.println("over ");
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
其实在工作中很容易遇到一些比较痛苦的事情,这时候如何考虑优化掉这个事情就很重要了,等解决一个非常棘手的事情就会有成就感。感谢您的阅读,如果感觉我写的还行,求关注~