diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 792563d7..e8e23a36 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -12,12 +12,10 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/selector" - "github.com/micro/go-micro/codec" raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/transport" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -623,45 +621,19 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { } func newClient(opts ...client.Option) client.Client { - options := client.Options{ - Codecs: make(map[string]codec.NewCodec), - CallOptions: client.CallOptions{ - Backoff: client.DefaultBackoff, - Retry: client.DefaultRetry, - Retries: client.DefaultRetries, - RequestTimeout: client.DefaultRequestTimeout, - DialTimeout: transport.DefaultDialTimeout, - }, - PoolSize: client.DefaultPoolSize, - PoolTTL: client.DefaultPoolTTL, - } + options := client.NewOptions() + // default content type for grpc + options.ContentType = "application/grpc+proto" for _, o := range opts { o(&options) } - if len(options.ContentType) == 0 { - options.ContentType = "application/grpc+proto" - } - - if options.Broker == nil { - options.Broker = broker.DefaultBroker - } - - if options.Registry == nil { - options.Registry = registry.DefaultRegistry - } - - if options.Selector == nil { - options.Selector = selector.NewSelector( - selector.Registry(options.Registry), - ) - } - rc := &grpcClient{ once: sync.Once{}, opts: options, } + rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) diff --git a/client/grpc/grpc_pool.go b/client/grpc/grpc_pool.go index ba076b2a..340aecce 100644 --- a/client/grpc/grpc_pool.go +++ b/client/grpc/grpc_pool.go @@ -10,11 +10,11 @@ import ( type pool struct { size int ttl int64 - + // max streams on a *poolConn maxStreams int // max idle conns - maxIdle int + maxIdle int sync.Mutex conns map[string]*streamsPool @@ -22,20 +22,20 @@ type pool struct { type streamsPool struct { // head of list - head *poolConn + head *poolConn // busy conns list - busy *poolConn + busy *poolConn // the siza of list - count int + count int // idle conn - idle int + idle int } type poolConn struct { // grpc conn *grpc.ClientConn - err error - addr string + err error + addr string // pool and streams pool pool *pool @@ -44,9 +44,9 @@ type poolConn struct { created int64 // list - pre *poolConn - next *poolConn - in bool + pre *poolConn + next *poolConn + in bool } func newPool(size int, ttl time.Duration, idle int, ms int) *pool { @@ -57,11 +57,11 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { idle = 0 } return &pool{ - size: size, - ttl: int64(ttl.Seconds()), + size: size, + ttl: int64(ttl.Seconds()), maxStreams: ms, - maxIdle: idle, - conns: make(map[string]*streamsPool), + maxIdle: idle, + conns: make(map[string]*streamsPool), } } @@ -70,7 +70,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) p.Lock() sp, ok := p.conns[addr] if !ok { - sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0} + sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} p.conns[addr] = sp } // while we have conns check streams and then return one @@ -90,11 +90,11 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) } // a busy conn if conn.streams >= p.maxStreams { - next := conn.next - removeConn(conn) - addConnAfter(conn, sp.busy) - conn = next - continue + next := conn.next + removeConn(conn) + addConnAfter(conn, sp.busy) + conn = next + continue } // a idle conn if conn.streams == 0 { @@ -112,7 +112,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) if err != nil { return nil, err } - conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false} + conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} // add conn to streams pool p.Lock() @@ -148,7 +148,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { // 2. too many idle conn or // 3. conn is too old now := time.Now().Unix() - if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { + if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { removeConn(conn) p.Unlock() conn.ClientConn.Close() @@ -160,11 +160,11 @@ func (p *pool) release(addr string, conn *poolConn, err error) { return } -func (conn *poolConn)Close() { +func (conn *poolConn) Close() { conn.pool.release(conn.addr, conn, conn.err) } -func removeConn(conn *poolConn) { +func removeConn(conn *poolConn) { if conn.pre != nil { conn.pre.next = conn.next } @@ -178,7 +178,7 @@ func removeConn(conn *poolConn) { return } -func addConnAfter(conn *poolConn, after *poolConn) { +func addConnAfter(conn *poolConn, after *poolConn) { conn.next = after.next conn.pre = after if after.next != nil { diff --git a/client/grpc/options.go b/client/grpc/options.go index 8384c48a..146e42b0 100644 --- a/client/grpc/options.go +++ b/client/grpc/options.go @@ -28,8 +28,8 @@ var ( DefaultMaxSendMsgSize = 1024 * 1024 * 4 ) -type poolMaxStreams struct {} -type poolMaxIdle struct {} +type poolMaxStreams struct{} +type poolMaxIdle struct{} type codecsKey struct{} type tlsAuth struct{} type maxRecvMsgSizeKey struct{} @@ -129,4 +129,3 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption { o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) } } - diff --git a/client/options.go b/client/options.go index 694a2fe9..50ec3951 100644 --- a/client/options.go +++ b/client/options.go @@ -85,9 +85,11 @@ type RequestOptions struct { Context context.Context } -func newOptions(options ...Option) Options { +func NewOptions(options ...Option) Options { opts := Options{ - Codecs: make(map[string]codec.NewCodec), + Context: context.Background(), + ContentType: DefaultContentType, + Codecs: make(map[string]codec.NewCodec), CallOptions: CallOptions{ Backoff: DefaultBackoff, Retry: DefaultRetry, @@ -95,40 +97,18 @@ func newOptions(options ...Option) Options { RequestTimeout: DefaultRequestTimeout, DialTimeout: transport.DefaultDialTimeout, }, - PoolSize: DefaultPoolSize, - PoolTTL: DefaultPoolTTL, + PoolSize: DefaultPoolSize, + PoolTTL: DefaultPoolTTL, + Broker: broker.DefaultBroker, + Selector: selector.DefaultSelector, + Registry: registry.DefaultRegistry, + Transport: transport.DefaultTransport, } for _, o := range options { o(&opts) } - if len(opts.ContentType) == 0 { - opts.ContentType = DefaultContentType - } - - if opts.Broker == nil { - opts.Broker = broker.DefaultBroker - } - - if opts.Registry == nil { - opts.Registry = registry.DefaultRegistry - } - - if opts.Selector == nil { - opts.Selector = selector.NewSelector( - selector.Registry(opts.Registry), - ) - } - - if opts.Transport == nil { - opts.Transport = transport.DefaultTransport - } - - if opts.Context == nil { - opts.Context = context.Background() - } - return opts } @@ -171,6 +151,8 @@ func PoolTTL(d time.Duration) Option { func Registry(r registry.Registry) Option { return func(o *Options) { o.Registry = r + // set in the selector + o.Selector.Init(selector.Registry(r)) } } diff --git a/client/options_test.go b/client/options_test.go index 9252994c..8d209e21 100644 --- a/client/options_test.go +++ b/client/options_test.go @@ -23,7 +23,7 @@ func TestCallOptions(t *testing.T) { var cl Client if d.set { - opts = newOptions( + opts = NewOptions( Retries(d.retries), RequestTimeout(d.rtimeout), DialTimeout(d.dtimeout), @@ -35,7 +35,7 @@ func TestCallOptions(t *testing.T) { DialTimeout(d.dtimeout), ) } else { - opts = newOptions() + opts = NewOptions() cl = NewClient() } diff --git a/client/rpc_client.go b/client/rpc_client.go index 4da6df45..34590f26 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -29,7 +29,7 @@ type rpcClient struct { } func newRpcClient(opt ...Option) Client { - opts := newOptions(opt...) + opts := NewOptions(opt...) p := pool.NewPool( pool.Size(opts.PoolSize), diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index ca02b6dd..47fb2776 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -202,7 +202,6 @@ var ( } DefaultClients = map[string]func(...client.Option) client.Client{ - "rpc": client.NewClient, "mucp": cmucp.NewClient, "grpc": cgrpc.NewClient, } @@ -224,7 +223,6 @@ var ( } DefaultServers = map[string]func(...server.Option) server.Server{ - "rpc": server.NewServer, "mucp": smucp.NewServer, "grpc": sgrpc.NewServer, } @@ -242,8 +240,8 @@ var ( } // used for default selection as the fall back - defaultClient = "rpc" - defaultServer = "rpc" + defaultClient = "grpc" + defaultServer = "grpc" defaultBroker = "http" defaultRegistry = "mdns" defaultSelector = "registry" diff --git a/options.go b/options.go index aa90c88f..673d1bfd 100644 --- a/options.go +++ b/options.go @@ -107,8 +107,6 @@ func Registry(r registry.Registry) Option { // Update Client and Server o.Client.Init(client.Registry(r)) o.Server.Init(server.Registry(r)) - // Update Selector - o.Client.Options().Selector.Init(selector.Registry(r)) // Update Broker o.Broker.Init(broker.Registry(r)) } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 7d3d7426..11f48630 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -611,7 +611,7 @@ func (g *grpcServer) Register() error { g.Unlock() if !registered { - log.Logf("Registering node: %s", node.Id) + log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) } // create registry options diff --git a/service/grpc/README.md b/service/grpc/README.md deleted file mode 100644 index ee333454..00000000 --- a/service/grpc/README.md +++ /dev/null @@ -1,28 +0,0 @@ -# gRPC Service - -A simplified experience for building gRPC services. - -## Overview - -The **gRPC service** makes use of [go-micro](https://github.com/micro/go-micro) plugins to create a simpler framework for gRPC development. -It interoperates with standard gRPC services seamlessly, including the [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway). -The grpc service uses the go-micro broker, client and server plugins which make use of -[github.com/grpc/grpc-go](https://github.com/grpc/grpc-go) internally. -This means we ignore the go-micro codec and transport but provide a native grpc experience. - - - -## Features - -- **Service Discovery** - We make use of go-micro's registry and selector interfaces to provide pluggable discovery -and client side load balancing. There's no need to dial connections, we'll do everything beneath the covers for you. - -- **PubSub Messaging** - Where gRPC only provides you synchronous communication, the **gRPC service** uses the go-micro broker -to provide asynchronous messaging while using the gRPC protocol. - -- **Micro Ecosystem** - Make use of the existing micro ecosystem of tooling including our api gateway, web dashboard, -command line interface and much more. We're enhancing gRPC with a simplified experience using micro. - -## I18n - -### [中文](README_cn.md) diff --git a/service/grpc/README_cn.md b/service/grpc/README_cn.md deleted file mode 100644 index 99cdaf51..00000000 --- a/service/grpc/README_cn.md +++ /dev/null @@ -1,25 +0,0 @@ -# Micro gRPC [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro/service/grpc?status.svg)](https://godoc.org/github.com/micro/go-micro/service/grpc) [![Travis CI](https://api.travis-ci.org/micro/go-micro/service/grpc.svg?branch=master)](https://travis-ci.org/micro/go-micro/service/grpc) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro/service/grpc)](https://goreportcard.com/report/github.com/micro/go-micro/service/grpc) - -Micro gRPC是micro的gRPC框架插件,简化开发基于gRPC的服务。 - -## 概览 - -micro提供有基于Go的gRPC插件[go-micro](https://github.com/micro/go-micro),该插件可以在内部集成gPRC,并与之无缝交互,让开发gRPC更简单,并支持[grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway)。 - -micro有面向gRPC的[客户端](https://github.com/micro/go-plugins/tree/master/client)和[服务端](https://github.com/micro/go-plugins/tree/master/server)插件,go-grpc库调用客户端/服务端插件生成micro需要的gRPC代码,而客户端/服务端插件都是从[github.com/grpc/grpc-go](https://github.com/grpc/grpc-go)扩展而来,也即是说,我们不需要去知道go-micro是如何编解码或传输就可以使用原生的gRPC。 - -## 特性 - -- **服务发现** - go-micro的服务发现基于其[注册](https://github.com/micro/go-plugins/tree/master/registry)与[选择器](https://github.com/micro/go-micro/tree/master/selector)接口,实现了可插拔的服务发现与客户端侧的负载均衡,不需要拨号连接,micro已经把所有都封装好,大家只管用。 - -- **消息发布订阅** - 因为gRPC只提供同步通信机制,而**Go gRPC**使用go-micro的[broker代理](https://github.com/micro/go-micro/tree/master/broker)提供异步消息,broker也是基于gRPC协议。 - -- **Micro生态系统** - Micro生态系统包含工具链中,比如api网关、web管理控制台、CLI命令行接口等等。我们通过使用micro来增强gRPC框架的易用性。 - -## 示例 - -示例请查看[examples/greeter](https://github.com/micro/go-micro/service/grpc/tree/master/examples/greeter)。 - -## 开始使用 - -我们提供相关文档[docs](https://micro.mu/docs/go-grpc_cn.html),以便上手。 \ No newline at end of file diff --git a/service/grpc/grpc.go b/service/grpc/grpc.go index fac9024d..f6c0effa 100644 --- a/service/grpc/grpc.go +++ b/service/grpc/grpc.go @@ -1,58 +1,124 @@ package grpc import ( - "time" - - "github.com/micro/go-micro" - broker "github.com/micro/go-micro/broker" - client "github.com/micro/go-micro/client/grpc" - server "github.com/micro/go-micro/server/grpc" + "github.com/micro/go-micro/client" + gclient "github.com/micro/go-micro/client/grpc" + "github.com/micro/go-micro/server" + gserver "github.com/micro/go-micro/server/grpc" + "github.com/micro/go-micro/service" ) +type grpcService struct { + opts service.Options +} + +func newService(opts ...service.Option) service.Service { + options := service.NewOptions(opts...) + + return &grpcService{ + opts: options, + } +} + +func (s *grpcService) Name() string { + return s.opts.Server.Options().Name +} + +// Init initialises options. Additionally it calls cmd.Init +// which parses command line flags. cmd.Init is only called +// on first Init. +func (s *grpcService) Init(opts ...service.Option) { + // process options + for _, o := range opts { + o(&s.opts) + } +} + +func (s *grpcService) Options() service.Options { + return s.opts +} + +func (s *grpcService) Client() client.Client { + return s.opts.Client +} + +func (s *grpcService) Server() server.Server { + return s.opts.Server +} + +func (s *grpcService) String() string { + return "grpc" +} + +func (s *grpcService) Start() error { + for _, fn := range s.opts.BeforeStart { + if err := fn(); err != nil { + return err + } + } + + if err := s.opts.Server.Start(); err != nil { + return err + } + + for _, fn := range s.opts.AfterStart { + if err := fn(); err != nil { + return err + } + } + + return nil +} + +func (s *grpcService) Stop() error { + var gerr error + + for _, fn := range s.opts.BeforeStop { + if err := fn(); err != nil { + gerr = err + } + } + + if err := s.opts.Server.Stop(); err != nil { + return err + } + + for _, fn := range s.opts.AfterStop { + if err := fn(); err != nil { + gerr = err + } + } + + return gerr +} + +func (s *grpcService) Run() error { + if err := s.Start(); err != nil { + return err + } + + // wait on context cancel + <-s.opts.Context.Done() + + return s.Stop() +} + // NewService returns a grpc service compatible with go-micro.Service -func NewService(opts ...micro.Option) micro.Service { +func NewService(opts ...service.Option) service.Service { // our grpc client - c := client.NewClient() + c := gclient.NewClient() // our grpc server - s := server.NewServer() - // our grpc broker - b := broker.NewBroker() + s := gserver.NewServer() // create options with priority for our opts - options := []micro.Option{ - micro.Client(c), - micro.Server(s), - micro.Broker(b), + options := []service.Option{ + service.Client(c), + service.Server(s), } // append passed in opts options = append(options, opts...) // generate and return a service - return micro.NewService(options...) -} - -// NewFunction returns a grpc service compatible with go-micro.Function -func NewFunction(opts ...micro.Option) micro.Function { - // our grpc client - c := client.NewClient() - // our grpc server - s := server.NewServer() - // our grpc broker - b := broker.NewBroker() - - // create options with priority for our opts - options := []micro.Option{ - micro.Client(c), - micro.Server(s), - micro.Broker(b), - micro.RegisterTTL(time.Minute), - micro.RegisterInterval(time.Second * 30), - } - - // append passed in opts - options = append(options, opts...) - - // generate and return a function - return micro.NewFunction(options...) + return newService(options...) } diff --git a/service/grpc/grpc_test.go b/service/grpc/grpc_test.go index 5978c4fb..4ea738dd 100644 --- a/service/grpc/grpc_test.go +++ b/service/grpc/grpc_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/micro/go-micro" "github.com/micro/go-micro/registry/memory" + "github.com/micro/go-micro/service" hello "github.com/micro/go-micro/service/grpc/proto" mls "github.com/micro/go-micro/util/tls" ) @@ -32,13 +32,13 @@ func TestGRPCService(t *testing.T) { // create GRPC service service := NewService( - micro.Name("test.service"), - micro.Registry(r), - micro.AfterStart(func() error { + service.Name("test.service"), + service.Registry(r), + service.AfterStart(func() error { wg.Done() return nil }), - micro.Context(ctx), + service.Context(ctx), ) // register test handler @@ -81,50 +81,6 @@ func TestGRPCService(t *testing.T) { } } -func TestGRPCFunction(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // create service - fn := NewFunction( - micro.Name("test.function"), - micro.Registry(memory.NewRegistry()), - micro.AfterStart(func() error { - wg.Done() - return nil - }), - micro.Context(ctx), - ) - - // register test handler - hello.RegisterTestHandler(fn.Server(), &testHandler{}) - - // run service - go fn.Run() - - // wait for start - wg.Wait() - - // create client - test := hello.NewTestService("test.function", fn.Client()) - - // call service - rsp, err := test.Call(context.Background(), &hello.Request{ - Name: "John", - }) - if err != nil { - t.Fatal(err) - } - - // check message - if rsp.Msg != "Hello John" { - t.Fatalf("unexpected response %s", rsp.Msg) - } -} - func TestGRPCTLSService(t *testing.T) { var wg sync.WaitGroup wg.Add(1) @@ -147,13 +103,13 @@ func TestGRPCTLSService(t *testing.T) { // create GRPC service service := NewService( - micro.Name("test.service"), - micro.Registry(r), - micro.AfterStart(func() error { + service.Name("test.service"), + service.Registry(r), + service.AfterStart(func() error { wg.Done() return nil }), - micro.Context(ctx), + service.Context(ctx), // set TLS config WithTLS(config), ) diff --git a/service/grpc/options.go b/service/grpc/options.go index cbbec81f..82dbb19f 100644 --- a/service/grpc/options.go +++ b/service/grpc/options.go @@ -3,14 +3,14 @@ package grpc import ( "crypto/tls" - "github.com/micro/go-micro" gc "github.com/micro/go-micro/client/grpc" gs "github.com/micro/go-micro/server/grpc" + "github.com/micro/go-micro/service" ) // WithTLS sets the TLS config for the service -func WithTLS(t *tls.Config) micro.Option { - return func(o *micro.Options) { +func WithTLS(t *tls.Config) service.Option { + return func(o *service.Options) { o.Client.Init( gc.AuthTLS(t), ) diff --git a/service/mucp/mucp.go b/service/mucp/mucp.go index 2ee454c4..9d9326d9 100644 --- a/service/mucp/mucp.go +++ b/service/mucp/mucp.go @@ -2,20 +2,116 @@ package mucp import ( - // TODO: change to go-micro/service - "github.com/micro/go-micro" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/server" cmucp "github.com/micro/go-micro/client/mucp" smucp "github.com/micro/go-micro/server/mucp" + "github.com/micro/go-micro/service" ) +type mucpService struct { + opts service.Options +} + +func newService(opts ...service.Option) service.Service { + options := service.NewOptions(opts...) + + return &mucpService{ + opts: options, + } +} + +func (s *mucpService) Name() string { + return s.opts.Server.Options().Name +} + +// Init initialises options. Additionally it calls cmd.Init +// which parses command line flags. cmd.Init is only called +// on first Init. +func (s *mucpService) Init(opts ...service.Option) { + // process options + for _, o := range opts { + o(&s.opts) + } +} + +func (s *mucpService) Options() service.Options { + return s.opts +} + +func (s *mucpService) Client() client.Client { + return s.opts.Client +} + +func (s *mucpService) Server() server.Server { + return s.opts.Server +} + +func (s *mucpService) String() string { + return "mucp" +} + +func (s *mucpService) Start() error { + for _, fn := range s.opts.BeforeStart { + if err := fn(); err != nil { + return err + } + } + + if err := s.opts.Server.Start(); err != nil { + return err + } + + for _, fn := range s.opts.AfterStart { + if err := fn(); err != nil { + return err + } + } + + return nil +} + +func (s *mucpService) Stop() error { + var gerr error + + for _, fn := range s.opts.BeforeStop { + if err := fn(); err != nil { + gerr = err + } + } + + if err := s.opts.Server.Stop(); err != nil { + return err + } + + for _, fn := range s.opts.AfterStop { + if err := fn(); err != nil { + gerr = err + } + } + + return gerr +} + +func (s *mucpService) Run() error { + if err := s.Start(); err != nil { + return err + } + + // wait on context cancel + <-s.opts.Context.Done() + + return s.Stop() +} + // NewService returns a new mucp service -func NewService(opts ...micro.Option) micro.Service { - options := []micro.Option{ - micro.Client(cmucp.NewClient()), - micro.Server(smucp.NewServer()), +func NewService(opts ...service.Option) service.Service { + options := []service.Option{ + service.Client(cmucp.NewClient()), + service.Server(smucp.NewServer()), } options = append(options, opts...) - return micro.NewService(options...) + return newService(options...) } diff --git a/service/options.go b/service/options.go index 9a077629..b3268786 100644 --- a/service/options.go +++ b/service/options.go @@ -31,7 +31,7 @@ type Options struct { type Option func(*Options) -func newOptions(opts ...Option) Options { +func NewOptions(opts ...Option) Options { opt := Options{ Broker: broker.DefaultBroker, Client: client.DefaultClient,