首页>>后端>>Golang->了解go

了解go

时间:2023-11-30 本站 点击:0

这篇文章介绍的transport不是我们学习的重点,因为你可能用不到他,但是作为学习go-micro的一部分,还是得提一下。

transport用于服务间通信,基于socket的send/recv语义。其接口的方法集如下:

typeTransportinterface{Init(...Option)errorOptions()OptionsDial(addrstring,opts...DialOption)(Client,error)Listen(addrstring,opts...ListenOption)(Listener,error)String()string}var(DefaultTransportTransport=NewHTTPTransport()DefaultDialTimeout=time.Second*5)

主要的作用体现在DialListen方法中,

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}

net/http源码中,我们也学习过Dial方法,他的作用是连接服务端,并返回一个代表连接的conn.

其实这里也是差不多的,只不过代表连接的conn放在一个client结构体中,这个结构体实现了transport.Client接口

typeSocketinterface{Recv(*Message)errorSend(*Message)errorClose()errorLocal()stringRemote()string}typeClientinterface{Socket}

Listen方法根据addr和ListenOption参数进行了处理,然后组装成httpTransportListener。该结构体实现了

typeListenerinterface{Addr()stringClose()errorAccept(func(Socket))error}

func(h*httpTransport)Listen(addrstring,opts...ListenOption)(Listener,error){varoptionsListenOptionsfor_,o:=rangeopts{o(&options)}varlnet.Listenervarerrerror//TODO:supportuseoflistenoptionsifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigfn:=func(addrstring)(net.Listener,error){ifconfig==nil{hosts:=[]string{addr}//checkifitsavalidhost:portifhost,_,err:=net.SplitHostPort(addr);err==nil{iflen(host)==0{hosts=maddr.IPs()}else{hosts=[]string{host}}}//generateacertificatecert,err:=mls.Certificate(hosts...)iferr!=nil{returnnil,err}config=&tls.Config{Certificates:[]tls.Certificate{cert}}}returntls.Listen("tcp",addr,config)}l,err=mnet.Listen(addr,fn)}else{fn:=func(addrstring)(net.Listener,error){returnnet.Listen("tcp",addr)}l,err=mnet.Listen(addr,fn)}iferr!=nil{returnnil,err}return&httpTransportListener{ht:h,listener:l,},nil}

httpTransportListenerAccept方法中,启动了一个http server来监听请求并处理。

func(h*httpTransportListener)Accept(fnfunc(Socket))error{//createhandlermuxmux:=http.NewServeMux()//registerourtransporthandlermux.HandleFunc("/",func(whttp.ResponseWriter,r*http.Request){varbuf*bufio.ReadWritervarconnet.Conn//readaregularrequestifr.ProtoMajor==1{b,err:=ioutil.ReadAll(r.Body)iferr!=nil{http.Error(w,err.Error(),http.StatusInternalServerError)return}r.Body=ioutil.NopCloser(bytes.NewReader(b))//hijacktheconnhj,ok:=w.(http.Hijacker)if!ok{//we'rescrewedhttp.Error(w,"cannotserveconn",http.StatusInternalServerError)return}conn,bufrw,err:=hj.Hijack()iferr!=nil{http.Error(w,err.Error(),http.StatusInternalServerError)return}deferconn.Close()buf=bufrwcon=conn}//bufferedreaderbufr:=bufio.NewReader(r.Body)//savetherequestch:=make(chan*http.Request,1)ch<-r//createanewtransportsocketsock:=&httpTransportSocket{ht:h.ht,w:w,r:r,rw:buf,buf:bufr,ch:ch,conn:con,local:h.Addr(),remote:r.RemoteAddr,closed:make(chanbool),}//executethesocketfn(sock)})//getoptionalhandlersifh.ht.opts.Context!=nil{handlers,ok:=h.ht.opts.Context.Value("http_handlers").(map[string]http.Handler)ifok{forpattern,handler:=rangehandlers{mux.Handle(pattern,handler)}}}//defaulthttp2serversrv:=&http.Server{Handler:mux,}//insecureconnectionuseh2cif!(h.ht.opts.Secure||h.ht.opts.TLSConfig!=nil){srv.Handler=h2c.NewHandler(mux,&http2.Server{})}//beginservingreturnsrv.Serve(h.listener)}

Accept方法中,启动的Server劫持了客户端请求,并重新创建了一个httpTransportSocket,然后根据Accept方法的函数参数去执行该socket.

我们自定义一个服务,来测试一下,如何使用transport.

//Packagemainpackagemainimport("context""time"hello"github.com/asim/go-micro/examples/v3/greeter/srv/proto/hello""github.com/asim/go-micro/v3""github.com/asim/go-micro/v3/util/log""google.golang.org/grpc")typeSaystruct{}func(s*Say)Hello(ctxcontext.Context,req*hello.Request,rsp*hello.Response)error{log.Log("ReceivedSay.Hellorequest")rsp.Msg="Hello"+req.Namereturnnil}funcmain(){gofunc(){for{grpc.DialContext(context.TODO(),"127.0.0.1:9091")time.Sleep(time.Second)}}()service:=micro.NewService(micro.Name("go.micro.srv.greeter"),)//optionallysetupcommandlineusageservice.Init()//RegisterHandlershello.RegisterSayHandler(service.Server(),new(Say))//Runserveriferr:=service.Run();err!=nil{log.Fatal(err)}}

有关服务间通信使用的proto结构体

syntax="proto3";packagego.micro.srv.greeter;serviceSay{rpcHello(Request)returns(Response){}}messageRequest{stringname=1;}messageResponse{stringmsg=1;}

启动服务的时候,通过日志输出,可以大致的了解启动过程

2021-09-2715:47:40file=v3@v3.5.2-0.20210630062103-c13bb07171bc/service.go:199level=infoStarting[service]go.micro.srv.greeter2021-09-2715:47:40file=server/rpc_server.go:820level=infoTransport[http]Listeningon[::]:648732021-09-2715:47:40file=server/rpc_server.go:840level=infoBroker[http]Connectedto127.0.0.1:648742021-09-2715:47:40file=server/rpc_server.go:654level=infoRegistry[mdns]Registeringnode:go.micro.srv.greeter-042f3737-1410-4a86-9fe5-8f23fc5cc05b

service.Init()做初始化的时候,会把所有的无提供Options的服务进行默认的处理 service.Run()启动service,

iflogger.V(logger.InfoLevel,logger.DefaultLogger){logger.Infof("Starting[service]%s",s.Name())}

然后启动server.默认的server是newRpcServer.rpcServer启动的过程如下:

在transport上进行监听

swap address

连接broker

注册前检查

然后就是for循环监听conn上的请求,并处理

ts.Accept方法我们在之前的内容中说过,在我们的server劫持到请求后,组装新的socket让rpcServer.ServeConn方法来进行处理,在服务移除的时候,会关闭transport的监听

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}0

ServeConn中涉及的sock参数的接收和发送都是我们httpTransportSocket中定义的动作,具体操作,可以详细的看看源码。

然后实现一个简单的客户端请求

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}1

在客户端调用服务端方法的时候,这就涉及到client方法调用流程,默认的Client实现是rpcClient,最主要的部分是它的call方法

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}2

该方法首先组装transport.Message消息体,该消息体包含两部分,一部分为Header,一部分为Body. 组装完成后进行消息编码,然后通过rpcStream方式进行发送请求和接收响应。 以rpcStreamRecv方法为例,我们可以发现,其读取请求的消息是通过codec,

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}3

codec读取消息其实就是调用clientRecv,这个client就是我们开头提到的httpTransportClient.

func(h*httpTransport)Dial(addrstring,opts...DialOption)(Client,error){dopts:=DialOptions{Timeout:DefaultDialTimeout,}for_,opt:=rangeopts{opt(&dopts)}varconnnet.Connvarerrerror//TODO:supportdialoptionhereratherthanusinginternalconfigifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigifconfig==nil{config=&tls.Config{InsecureSkipVerify:true,}}config.NextProtos=[]string{"http/1.1"}conn,err=newConn(func(addrstring)(net.Conn,error){returntls.DialWithDialer(&net.Dialer{Timeout:dopts.Timeout},"tcp",addr,config)})(addr)}else{conn,err=newConn(func(addrstring)(net.Conn,error){returnnet.DialTimeout("tcp",addr,dopts.Timeout)})(addr)}iferr!=nil{returnnil,err}return&httpTransportClient{ht:h,addr:addr,conn:conn,buff:bufio.NewReader(conn),dialOpts:dopts,r:make(chan*http.Request,1),local:conn.LocalAddr().String(),remote:conn.RemoteAddr().String(),},nil}4


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/4527.html