进程重启端口号冲突了,太坑了吧

  相信大家在日常工作中会遇到这种情况,一台机器上两个进程通过socket相互通信,server端重启后端口冲突直接起不来了...此时rd同学心里一紧,是不是写出bug了赶紧查日志发现是端口冲突了笔者目前在猫眼从事servicemesh相关研发工作,我们公司使用新美大的机器,无法拥有对机器的控制权,因此也就无法基于k8s来做servicemesh。简单来说,sdk通过http的方式与mesh交互做服务注册发现,那sdk如何发现mesh 就成了一个问题。

image

  如上图所示,最开始sdk通过本地文件发现mesh的admin地址,mesh重启后为防止端口冲突问题,会选一个未占用的端口进行监听,然后把最新的admin地址写到本地文件。sdk要能及时的刷新mesh最新的admin地址,所以sdk内部要有一个定时任务定期刷新admin地址。由此来看sdk的逻辑就变得非常重了,sdk本身的定位就是序列化和反序列化数据发送给mesh。那么有什么好的方法能解决上面这么恶心的问题吗?当然有啦,通过uds就可以。
  Unix domain socket 又叫 IPC(inter-process communication 进程间通信) socket,用于实现同一主机上的进程间通信。socket 原本是为网络通讯设计的,但后来在 socket 的框架上发展出一种 IPC 机制,就是 UNIX domain socket。虽然网络 socket 也可用于同一台主机的进程间通讯(通过 loopback 地址 127.0.0.1),但是 UNIX domain socket 用于 IPC 更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC 机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。有了uds进行本机通信,再也不用担心mesh重启或启动时端口冲突的问题了。使用uds之后sdk与mesh的交互方式。这样sdk直接拿到uds路径new uds客户端调用mesh就可以了,很方便

image

  sdk的语言有多种,我拿Java来举例。mesh 使用go语言实现,很容易实现一个应用层http协议传输层uds协议的server。

func main() {
  RunServer()
}
​
var (
  // 声明 Unix 套接字的地址
  serverAddr = &net.UnixAddr{Name: "/opt/test.sock", Net: "unix"}
)
​
func RunServer() {
  // unlink 系统调用比较特殊。关于它的描述中有一点:如果这个文件是一个 unix socket,它会被移除,但是打开它的进程可以继续使用它。也就是说新旧进程都会在这个地址监听。
  syscall.Unlink(serverAddr.Name)
  lis, err := net.ListenUnix("unix", serverAddr)
  if err != nil {
    fmt.Println("ListenUnix", err)
    return
  }
  http.HandleFunc("/get", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    query := req.URL.Query()
    get := query.Get("key")
    fmt.Printf("server get key = %s value = %s \n", "key", get)
    builder := strings.Builder{}
    for i := 0; i < 3; i++ {
      builder.WriteString(strconv.Itoa(i))
    }
    s := req.Header.Get("sequenceid")
    w.Header().Add("sequenceid", s)
    w.Write([]byte(builder.String()))
  }))
​
  http.HandleFunc("/post", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    bytes, err := ioutil.ReadAll(req.Body)
    req.Body.Close()
    if err != nil {
      return
    }
    h := req.Header.Get("sequenceid")
    contentType := req.Header.Get("Content-Type")
    fmt.Println(contentType)
    w.Header().Add("sequenceid", h)
    fmt.Printf("server post receive request %s \n", string(bytes))
    w.Write([]byte("post success"))
  }))
  svr := &http.Server{Handler: http.DefaultServeMux}
  err = svr.Serve(lis)
  if err != nil {
    fmt.Println("Serve err:", err)
  }
}
​

  而Java语言就相对麻烦了,在做这一块的时候在网上调研没有找到什么资料,最终使用netty封装了一个HTTPUdsClient的包。因为netty是异步的,所以要把异步转同步,这里只是给出一个简单的demo并没有转同步。那么异步转同步如何实现呢?我的做法是在http header中添加Sequenceid,channel中发完数据后使用CountDownLatch wait 等待,当mesh 返回数据后解码进入到Handler中,触发CountDownLatch 的countDown操作,很容易就异步转同步了。还有一点说明:每次http 响应收到后都会把uds连接关闭掉。

public class NettyUdsHttpClient {

    public static void main(String[] args) throws Exception {
        final NettyUdsHttpClient nettyUdsHttpClient = new NettyUdsHttpClient();
        nettyUdsHttpClient.request("/opt/test.sock");
    }

    private Bootstrap b = null;

    private static EventLoopGroup workerGroup = null;

    public NettyUdsHttpClient() {
        EventLoopGroup workerGroup = null;
        Class domainSocketChannelClazz = null;
        if (Epoll.isAvailable()) {
            domainSocketChannelClazz = EpollDomainSocketChannel.class;
            System.out.println("Epoll.isAvailable");
            workerGroup = new EpollEventLoopGroup(1);
        } else if (KQueue.isAvailable()) {
            System.out.println("KQueue.isAvailable");
            workerGroup = new KQueueEventLoopGroup(1);
            domainSocketChannelClazz = KQueueDomainSocketChannel.class;
        } else {
            System.out.println("use NioEventLoopGroup");
            workerGroup = new NioEventLoopGroup(1);
            domainSocketChannelClazz = NioSocketChannel.class;
        }

        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(domainSocketChannelClazz);
        b.option(ChannelOption.SO_KEEPALIVE, false);
        b.handler(new ChannelInitializer<DomainSocketChannel>() {
            @Override
            public void initChannel(DomainSocketChannel ch) throws Exception {
                // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                ch.pipeline().addLast(new HttpResponseDecoder());

                // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {

                    private Map<String, String> headerMap = new HashMap<>();

                    private int statusCode;

                    private StringBuilder contentStr = new StringBuilder();

                    private int currentSequenceId;

                    @Override
                    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
                        if (msg instanceof HttpResponse) {
                            DefaultHttpResponse response = (DefaultHttpResponse) msg;
                            this.statusCode = response.status().code();
                            HttpHeaders headers = response.headers();
                            Integer sequenceId = headers.getInt("Sequenceid");
                            if (sequenceId != null) {
                                this.currentSequenceId = sequenceId;
                            }
                            Iterator<Map.Entry<String, String>> headerIterator = headers.iteratorAsString();
                            // 封装header
                            while (headerIterator.hasNext()) {
                                Map.Entry<String, String> header = headerIterator.next();
                                headerMap.put(header.getKey(), header.getValue());
                            }
                        }
                        // 和mesh交互,没有Trailer,因此不考虑
                        if (msg instanceof HttpContent) {
                            HttpContent content = (HttpContent) msg;
                            contentStr.append(content.content().toString(StandardCharsets.UTF_8));
                            if (msg instanceof LastHttpContent) {
                                // http 响应已经读完
                                System.out.println("currentSequenceId = " + currentSequenceId + "响应码 = " + statusCode + " headerMap = " + headerMap + " content = " + contentStr.toString());
                                ctx.channel().close();
                            }
                        }
                    }
                });
            }
        });
        this.b = b;
        this.workerGroup = workerGroup;
    }

    public void request(String path) throws Exception {
        try {
            // Start the client.
            ChannelFuture f = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            // get 请求
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                    "/get?key=123", Unpooled.EMPTY_BUFFER);

            request.headers().set("Sequenceid", 1);
            request.headers().set(HttpHeaderNames.HOST, "daemon");
            // 发送http请求
            f.channel().writeAndFlush(request);

            // post 请求
            ChannelFuture f1 = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            String msg = "hello";
            ByteBuf byteBuf = Unpooled.wrappedBuffer(msg.getBytes("UTF-8"));

            DefaultFullHttpRequest request1 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/post", byteBuf);
            request1.headers().set(HttpHeaderNames.HOST, "daemon");
            request1.headers().set("Sequenceid", 2);
            request1.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
            request1.headers().set(HttpHeaderNames.CONTENT_LENGTH, request1.content().readableBytes());
            f1.channel().writeAndFlush(request1);
            System.out.println("over ");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

  其实在工作中很容易遇到一些比较痛苦的事情,这时候如何考虑优化掉这个事情就很重要了,等解决一个非常棘手的事情就会有成就感。感谢您的阅读,如果感觉我写的还行,求关注~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343