| @@ -12,12 +12,10 @@ import ( | |||||||
| 	"github.com/micro/go-micro/broker" | 	"github.com/micro/go-micro/broker" | ||||||
| 	"github.com/micro/go-micro/client" | 	"github.com/micro/go-micro/client" | ||||||
| 	"github.com/micro/go-micro/client/selector" | 	"github.com/micro/go-micro/client/selector" | ||||||
| 	"github.com/micro/go-micro/codec" |  | ||||||
| 	raw "github.com/micro/go-micro/codec/bytes" | 	raw "github.com/micro/go-micro/codec/bytes" | ||||||
| 	"github.com/micro/go-micro/errors" | 	"github.com/micro/go-micro/errors" | ||||||
| 	"github.com/micro/go-micro/metadata" | 	"github.com/micro/go-micro/metadata" | ||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| 	"github.com/micro/go-micro/transport" |  | ||||||
|  |  | ||||||
| 	"google.golang.org/grpc" | 	"google.golang.org/grpc" | ||||||
| 	"google.golang.org/grpc/credentials" | 	"google.golang.org/grpc/credentials" | ||||||
| @@ -623,45 +621,19 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { | |||||||
| } | } | ||||||
|  |  | ||||||
| func newClient(opts ...client.Option) client.Client { | func newClient(opts ...client.Option) client.Client { | ||||||
| 	options := client.Options{ | 	options := client.NewOptions() | ||||||
| 		Codecs: make(map[string]codec.NewCodec), | 	// default content type for grpc | ||||||
| 		CallOptions: client.CallOptions{ | 	options.ContentType = "application/grpc+proto" | ||||||
| 			Backoff:        client.DefaultBackoff, |  | ||||||
| 			Retry:          client.DefaultRetry, |  | ||||||
| 			Retries:        client.DefaultRetries, |  | ||||||
| 			RequestTimeout: client.DefaultRequestTimeout, |  | ||||||
| 			DialTimeout:    transport.DefaultDialTimeout, |  | ||||||
| 		}, |  | ||||||
| 		PoolSize: client.DefaultPoolSize, |  | ||||||
| 		PoolTTL:  client.DefaultPoolTTL, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| 		o(&options) | 		o(&options) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if len(options.ContentType) == 0 { |  | ||||||
| 		options.ContentType = "application/grpc+proto" |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if options.Broker == nil { |  | ||||||
| 		options.Broker = broker.DefaultBroker |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if options.Registry == nil { |  | ||||||
| 		options.Registry = registry.DefaultRegistry |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if options.Selector == nil { |  | ||||||
| 		options.Selector = selector.NewSelector( |  | ||||||
| 			selector.Registry(options.Registry), |  | ||||||
| 		) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	rc := &grpcClient{ | 	rc := &grpcClient{ | ||||||
| 		once: sync.Once{}, | 		once: sync.Once{}, | ||||||
| 		opts: options, | 		opts: options, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) | 	rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) | ||||||
|  |  | ||||||
| 	c := client.Client(rc) | 	c := client.Client(rc) | ||||||
|   | |||||||
| @@ -14,7 +14,7 @@ type pool struct { | |||||||
| 	//  max streams on a *poolConn | 	//  max streams on a *poolConn | ||||||
| 	maxStreams int | 	maxStreams int | ||||||
| 	//  max idle conns | 	//  max idle conns | ||||||
| 	maxIdle    int | 	maxIdle int | ||||||
|  |  | ||||||
| 	sync.Mutex | 	sync.Mutex | ||||||
| 	conns map[string]*streamsPool | 	conns map[string]*streamsPool | ||||||
| @@ -22,20 +22,20 @@ type pool struct { | |||||||
|  |  | ||||||
| type streamsPool struct { | type streamsPool struct { | ||||||
| 	//  head of list | 	//  head of list | ||||||
| 	head   *poolConn | 	head *poolConn | ||||||
| 	//  busy conns list | 	//  busy conns list | ||||||
| 	busy   *poolConn | 	busy *poolConn | ||||||
| 	//  the siza of list | 	//  the siza of list | ||||||
| 	count  int | 	count int | ||||||
| 	//  idle conn | 	//  idle conn | ||||||
| 	idle   int | 	idle int | ||||||
| } | } | ||||||
|  |  | ||||||
| type poolConn struct { | type poolConn struct { | ||||||
| 	//  grpc conn | 	//  grpc conn | ||||||
| 	*grpc.ClientConn | 	*grpc.ClientConn | ||||||
| 	err     error | 	err  error | ||||||
| 	addr    string | 	addr string | ||||||
|  |  | ||||||
| 	//  pool and streams pool | 	//  pool and streams pool | ||||||
| 	pool    *pool | 	pool    *pool | ||||||
| @@ -44,9 +44,9 @@ type poolConn struct { | |||||||
| 	created int64 | 	created int64 | ||||||
|  |  | ||||||
| 	//  list | 	//  list | ||||||
| 	pre     *poolConn | 	pre  *poolConn | ||||||
| 	next    *poolConn | 	next *poolConn | ||||||
| 	in      bool | 	in   bool | ||||||
| } | } | ||||||
|  |  | ||||||
| func newPool(size int, ttl time.Duration, idle int, ms int) *pool { | func newPool(size int, ttl time.Duration, idle int, ms int) *pool { | ||||||
| @@ -57,11 +57,11 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { | |||||||
| 		idle = 0 | 		idle = 0 | ||||||
| 	} | 	} | ||||||
| 	return &pool{ | 	return &pool{ | ||||||
| 		size:  size, | 		size:       size, | ||||||
| 		ttl:   int64(ttl.Seconds()), | 		ttl:        int64(ttl.Seconds()), | ||||||
| 		maxStreams: ms, | 		maxStreams: ms, | ||||||
| 		maxIdle: idle, | 		maxIdle:    idle, | ||||||
| 		conns: make(map[string]*streamsPool), | 		conns:      make(map[string]*streamsPool), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -70,7 +70,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) | |||||||
| 	p.Lock() | 	p.Lock() | ||||||
| 	sp, ok := p.conns[addr] | 	sp, ok := p.conns[addr] | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0} | 		sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} | ||||||
| 		p.conns[addr] = sp | 		p.conns[addr] = sp | ||||||
| 	} | 	} | ||||||
| 	//  while we have conns check streams and then return one | 	//  while we have conns check streams and then return one | ||||||
| @@ -90,11 +90,11 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) | |||||||
| 		} | 		} | ||||||
| 		//  a busy conn | 		//  a busy conn | ||||||
| 		if conn.streams >= p.maxStreams { | 		if conn.streams >= p.maxStreams { | ||||||
| 				next := conn.next | 			next := conn.next | ||||||
| 				removeConn(conn) | 			removeConn(conn) | ||||||
| 				addConnAfter(conn, sp.busy) | 			addConnAfter(conn, sp.busy) | ||||||
| 				conn = next | 			conn = next | ||||||
| 	           	continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		//  a idle conn | 		//  a idle conn | ||||||
| 		if conn.streams == 0 { | 		if conn.streams == 0 { | ||||||
| @@ -112,7 +112,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false} | 	conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} | ||||||
|  |  | ||||||
| 	//  add conn to streams pool | 	//  add conn to streams pool | ||||||
| 	p.Lock() | 	p.Lock() | ||||||
| @@ -148,7 +148,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { | |||||||
| 		//  2. too many idle conn or | 		//  2. too many idle conn or | ||||||
| 		//  3. conn is too old | 		//  3. conn is too old | ||||||
| 		now := time.Now().Unix() | 		now := time.Now().Unix() | ||||||
| 		if  err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { | 		if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { | ||||||
| 			removeConn(conn) | 			removeConn(conn) | ||||||
| 			p.Unlock() | 			p.Unlock() | ||||||
| 			conn.ClientConn.Close() | 			conn.ClientConn.Close() | ||||||
| @@ -160,11 +160,11 @@ func (p *pool) release(addr string, conn *poolConn, err error) { | |||||||
| 	return | 	return | ||||||
| } | } | ||||||
|  |  | ||||||
| func (conn *poolConn)Close()  { | func (conn *poolConn) Close() { | ||||||
| 	conn.pool.release(conn.addr, conn, conn.err) | 	conn.pool.release(conn.addr, conn, conn.err) | ||||||
| } | } | ||||||
|  |  | ||||||
| func removeConn(conn *poolConn)  { | func removeConn(conn *poolConn) { | ||||||
| 	if conn.pre != nil { | 	if conn.pre != nil { | ||||||
| 		conn.pre.next = conn.next | 		conn.pre.next = conn.next | ||||||
| 	} | 	} | ||||||
| @@ -178,7 +178,7 @@ func removeConn(conn *poolConn)  { | |||||||
| 	return | 	return | ||||||
| } | } | ||||||
|  |  | ||||||
| func addConnAfter(conn *poolConn, after *poolConn)  { | func addConnAfter(conn *poolConn, after *poolConn) { | ||||||
| 	conn.next = after.next | 	conn.next = after.next | ||||||
| 	conn.pre = after | 	conn.pre = after | ||||||
| 	if after.next != nil { | 	if after.next != nil { | ||||||
|   | |||||||
| @@ -28,8 +28,8 @@ var ( | |||||||
| 	DefaultMaxSendMsgSize = 1024 * 1024 * 4 | 	DefaultMaxSendMsgSize = 1024 * 1024 * 4 | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type poolMaxStreams struct {} | type poolMaxStreams struct{} | ||||||
| type poolMaxIdle struct {} | type poolMaxIdle struct{} | ||||||
| type codecsKey struct{} | type codecsKey struct{} | ||||||
| type tlsAuth struct{} | type tlsAuth struct{} | ||||||
| type maxRecvMsgSizeKey struct{} | type maxRecvMsgSizeKey struct{} | ||||||
| @@ -129,4 +129,3 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption { | |||||||
| 		o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) | 		o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -85,9 +85,11 @@ type RequestOptions struct { | |||||||
| 	Context context.Context | 	Context context.Context | ||||||
| } | } | ||||||
|  |  | ||||||
| func newOptions(options ...Option) Options { | func NewOptions(options ...Option) Options { | ||||||
| 	opts := Options{ | 	opts := Options{ | ||||||
| 		Codecs: make(map[string]codec.NewCodec), | 		Context:     context.Background(), | ||||||
|  | 		ContentType: DefaultContentType, | ||||||
|  | 		Codecs:      make(map[string]codec.NewCodec), | ||||||
| 		CallOptions: CallOptions{ | 		CallOptions: CallOptions{ | ||||||
| 			Backoff:        DefaultBackoff, | 			Backoff:        DefaultBackoff, | ||||||
| 			Retry:          DefaultRetry, | 			Retry:          DefaultRetry, | ||||||
| @@ -95,40 +97,18 @@ func newOptions(options ...Option) Options { | |||||||
| 			RequestTimeout: DefaultRequestTimeout, | 			RequestTimeout: DefaultRequestTimeout, | ||||||
| 			DialTimeout:    transport.DefaultDialTimeout, | 			DialTimeout:    transport.DefaultDialTimeout, | ||||||
| 		}, | 		}, | ||||||
| 		PoolSize: DefaultPoolSize, | 		PoolSize:  DefaultPoolSize, | ||||||
| 		PoolTTL:  DefaultPoolTTL, | 		PoolTTL:   DefaultPoolTTL, | ||||||
|  | 		Broker:    broker.DefaultBroker, | ||||||
|  | 		Selector:  selector.DefaultSelector, | ||||||
|  | 		Registry:  registry.DefaultRegistry, | ||||||
|  | 		Transport: transport.DefaultTransport, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, o := range options { | 	for _, o := range options { | ||||||
| 		o(&opts) | 		o(&opts) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if len(opts.ContentType) == 0 { |  | ||||||
| 		opts.ContentType = DefaultContentType |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if opts.Broker == nil { |  | ||||||
| 		opts.Broker = broker.DefaultBroker |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if opts.Registry == nil { |  | ||||||
| 		opts.Registry = registry.DefaultRegistry |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if opts.Selector == nil { |  | ||||||
| 		opts.Selector = selector.NewSelector( |  | ||||||
| 			selector.Registry(opts.Registry), |  | ||||||
| 		) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if opts.Transport == nil { |  | ||||||
| 		opts.Transport = transport.DefaultTransport |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if opts.Context == nil { |  | ||||||
| 		opts.Context = context.Background() |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return opts | 	return opts | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -171,6 +151,8 @@ func PoolTTL(d time.Duration) Option { | |||||||
| func Registry(r registry.Registry) Option { | func Registry(r registry.Registry) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
| 		o.Registry = r | 		o.Registry = r | ||||||
|  | 		// set in the selector | ||||||
|  | 		o.Selector.Init(selector.Registry(r)) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -23,7 +23,7 @@ func TestCallOptions(t *testing.T) { | |||||||
| 		var cl Client | 		var cl Client | ||||||
|  |  | ||||||
| 		if d.set { | 		if d.set { | ||||||
| 			opts = newOptions( | 			opts = NewOptions( | ||||||
| 				Retries(d.retries), | 				Retries(d.retries), | ||||||
| 				RequestTimeout(d.rtimeout), | 				RequestTimeout(d.rtimeout), | ||||||
| 				DialTimeout(d.dtimeout), | 				DialTimeout(d.dtimeout), | ||||||
| @@ -35,7 +35,7 @@ func TestCallOptions(t *testing.T) { | |||||||
| 				DialTimeout(d.dtimeout), | 				DialTimeout(d.dtimeout), | ||||||
| 			) | 			) | ||||||
| 		} else { | 		} else { | ||||||
| 			opts = newOptions() | 			opts = NewOptions() | ||||||
| 			cl = NewClient() | 			cl = NewClient() | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -29,7 +29,7 @@ type rpcClient struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func newRpcClient(opt ...Option) Client { | func newRpcClient(opt ...Option) Client { | ||||||
| 	opts := newOptions(opt...) | 	opts := NewOptions(opt...) | ||||||
|  |  | ||||||
| 	p := pool.NewPool( | 	p := pool.NewPool( | ||||||
| 		pool.Size(opts.PoolSize), | 		pool.Size(opts.PoolSize), | ||||||
|   | |||||||
| @@ -202,7 +202,6 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClients = map[string]func(...client.Option) client.Client{ | 	DefaultClients = map[string]func(...client.Option) client.Client{ | ||||||
| 		"rpc":  client.NewClient, |  | ||||||
| 		"mucp": cmucp.NewClient, | 		"mucp": cmucp.NewClient, | ||||||
| 		"grpc": cgrpc.NewClient, | 		"grpc": cgrpc.NewClient, | ||||||
| 	} | 	} | ||||||
| @@ -224,7 +223,6 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultServers = map[string]func(...server.Option) server.Server{ | 	DefaultServers = map[string]func(...server.Option) server.Server{ | ||||||
| 		"rpc":  server.NewServer, |  | ||||||
| 		"mucp": smucp.NewServer, | 		"mucp": smucp.NewServer, | ||||||
| 		"grpc": sgrpc.NewServer, | 		"grpc": sgrpc.NewServer, | ||||||
| 	} | 	} | ||||||
| @@ -242,8 +240,8 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// used for default selection as the fall back | 	// used for default selection as the fall back | ||||||
| 	defaultClient    = "rpc" | 	defaultClient    = "grpc" | ||||||
| 	defaultServer    = "rpc" | 	defaultServer    = "grpc" | ||||||
| 	defaultBroker    = "http" | 	defaultBroker    = "http" | ||||||
| 	defaultRegistry  = "mdns" | 	defaultRegistry  = "mdns" | ||||||
| 	defaultSelector  = "registry" | 	defaultSelector  = "registry" | ||||||
|   | |||||||
| @@ -107,8 +107,6 @@ func Registry(r registry.Registry) Option { | |||||||
| 		// Update Client and Server | 		// Update Client and Server | ||||||
| 		o.Client.Init(client.Registry(r)) | 		o.Client.Init(client.Registry(r)) | ||||||
| 		o.Server.Init(server.Registry(r)) | 		o.Server.Init(server.Registry(r)) | ||||||
| 		// Update Selector |  | ||||||
| 		o.Client.Options().Selector.Init(selector.Registry(r)) |  | ||||||
| 		// Update Broker | 		// Update Broker | ||||||
| 		o.Broker.Init(broker.Registry(r)) | 		o.Broker.Init(broker.Registry(r)) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -611,7 +611,7 @@ func (g *grpcServer) Register() error { | |||||||
| 	g.Unlock() | 	g.Unlock() | ||||||
|  |  | ||||||
| 	if !registered { | 	if !registered { | ||||||
| 		log.Logf("Registering node: %s", node.Id) | 		log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// create registry options | 	// create registry options | ||||||
|   | |||||||
| @@ -1,28 +0,0 @@ | |||||||
| # gRPC Service |  | ||||||
|  |  | ||||||
| A simplified experience for building gRPC services.  |  | ||||||
|  |  | ||||||
| ## Overview |  | ||||||
|  |  | ||||||
| The **gRPC service** makes use of [go-micro](https://github.com/micro/go-micro) plugins to create a simpler framework for gRPC development.  |  | ||||||
| It interoperates with standard gRPC services seamlessly, including the [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway).  |  | ||||||
| The grpc service uses the go-micro broker, client and server plugins which make use of  |  | ||||||
| [github.com/grpc/grpc-go](https://github.com/grpc/grpc-go) internally.  |  | ||||||
| This means we ignore the go-micro codec and transport but provide a native grpc experience. |  | ||||||
|  |  | ||||||
| <img src="https://micro.mu/docs/images/go-grpc.svg" /> |  | ||||||
|  |  | ||||||
| ## Features |  | ||||||
|  |  | ||||||
| - **Service Discovery** - We make use of go-micro's registry and selector interfaces to provide pluggable discovery  |  | ||||||
| and client side load balancing. There's no need to dial connections, we'll do everything beneath the covers for you. |  | ||||||
|  |  | ||||||
| - **PubSub Messaging** - Where gRPC only provides you synchronous communication, the **gRPC service** uses the go-micro broker  |  | ||||||
| to provide asynchronous messaging while using the gRPC protocol. |  | ||||||
|  |  | ||||||
| - **Micro Ecosystem** - Make use of the existing micro ecosystem of tooling including our api gateway, web dashboard,  |  | ||||||
| command line interface and much more. We're enhancing gRPC with a simplified experience using micro. |  | ||||||
|  |  | ||||||
| ## I18n |  | ||||||
|  |  | ||||||
| ### [中文](README_cn.md) |  | ||||||
| @@ -1,25 +0,0 @@ | |||||||
| # Micro gRPC [](https://opensource.org/licenses/Apache-2.0) [](https://godoc.org/github.com/micro/go-micro/service/grpc) [](https://travis-ci.org/micro/go-micro/service/grpc) [](https://goreportcard.com/report/github.com/micro/go-micro/service/grpc) |  | ||||||
|  |  | ||||||
| Micro gRPC是micro的gRPC框架插件,简化开发基于gRPC的服务。 |  | ||||||
|  |  | ||||||
| ## 概览 |  | ||||||
|  |  | ||||||
| micro提供有基于Go的gRPC插件[go-micro](https://github.com/micro/go-micro),该插件可以在内部集成gPRC,并与之无缝交互,让开发gRPC更简单,并支持[grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway)。 |  | ||||||
|  |  | ||||||
| micro有面向gRPC的[客户端](https://github.com/micro/go-plugins/tree/master/client)和[服务端](https://github.com/micro/go-plugins/tree/master/server)插件,go-grpc库调用客户端/服务端插件生成micro需要的gRPC代码,而客户端/服务端插件都是从[github.com/grpc/grpc-go](https://github.com/grpc/grpc-go)扩展而来,也即是说,我们不需要去知道go-micro是如何编解码或传输就可以使用原生的gRPC。 |  | ||||||
|  |  | ||||||
| ## 特性 |  | ||||||
|  |  | ||||||
| - **服务发现** - go-micro的服务发现基于其[注册](https://github.com/micro/go-plugins/tree/master/registry)与[选择器](https://github.com/micro/go-micro/tree/master/selector)接口,实现了可插拔的服务发现与客户端侧的负载均衡,不需要拨号连接,micro已经把所有都封装好,大家只管用。 |  | ||||||
|  |  | ||||||
| - **消息发布订阅** - 因为gRPC只提供同步通信机制,而**Go gRPC**使用go-micro的[broker代理](https://github.com/micro/go-micro/tree/master/broker)提供异步消息,broker也是基于gRPC协议。 |  | ||||||
|  |  | ||||||
| - **Micro生态系统** - Micro生态系统包含工具链中,比如api网关、web管理控制台、CLI命令行接口等等。我们通过使用micro来增强gRPC框架的易用性。 |  | ||||||
|  |  | ||||||
| ## 示例 |  | ||||||
|  |  | ||||||
| 示例请查看[examples/greeter](https://github.com/micro/go-micro/service/grpc/tree/master/examples/greeter)。 |  | ||||||
|  |  | ||||||
| ## 开始使用 |  | ||||||
|  |  | ||||||
| 我们提供相关文档[docs](https://micro.mu/docs/go-grpc_cn.html),以便上手。 |  | ||||||
| @@ -1,58 +1,124 @@ | |||||||
| package grpc | package grpc | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"time" | 	"github.com/micro/go-micro/client" | ||||||
|  | 	gclient "github.com/micro/go-micro/client/grpc" | ||||||
| 	"github.com/micro/go-micro" | 	"github.com/micro/go-micro/server" | ||||||
| 	broker "github.com/micro/go-micro/broker" | 	gserver "github.com/micro/go-micro/server/grpc" | ||||||
| 	client "github.com/micro/go-micro/client/grpc" | 	"github.com/micro/go-micro/service" | ||||||
| 	server "github.com/micro/go-micro/server/grpc" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | type grpcService struct { | ||||||
|  | 	opts service.Options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newService(opts ...service.Option) service.Service { | ||||||
|  | 	options := service.NewOptions(opts...) | ||||||
|  |  | ||||||
|  | 	return &grpcService{ | ||||||
|  | 		opts: options, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Name() string { | ||||||
|  | 	return s.opts.Server.Options().Name | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Init initialises options. Additionally it calls cmd.Init | ||||||
|  | // which parses command line flags. cmd.Init is only called | ||||||
|  | // on first Init. | ||||||
|  | func (s *grpcService) Init(opts ...service.Option) { | ||||||
|  | 	// process options | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&s.opts) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Options() service.Options { | ||||||
|  | 	return s.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Client() client.Client { | ||||||
|  | 	return s.opts.Client | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Server() server.Server { | ||||||
|  | 	return s.opts.Server | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) String() string { | ||||||
|  | 	return "grpc" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Start() error { | ||||||
|  | 	for _, fn := range s.opts.BeforeStart { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := s.opts.Server.Start(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.AfterStart { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Stop() error { | ||||||
|  | 	var gerr error | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.BeforeStop { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			gerr = err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := s.opts.Server.Stop(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.AfterStop { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			gerr = err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return gerr | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *grpcService) Run() error { | ||||||
|  | 	if err := s.Start(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// wait on context cancel | ||||||
|  | 	<-s.opts.Context.Done() | ||||||
|  |  | ||||||
|  | 	return s.Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewService returns a grpc service compatible with go-micro.Service | // NewService returns a grpc service compatible with go-micro.Service | ||||||
| func NewService(opts ...micro.Option) micro.Service { | func NewService(opts ...service.Option) service.Service { | ||||||
| 	// our grpc client | 	// our grpc client | ||||||
| 	c := client.NewClient() | 	c := gclient.NewClient() | ||||||
| 	// our grpc server | 	// our grpc server | ||||||
| 	s := server.NewServer() | 	s := gserver.NewServer() | ||||||
| 	// our grpc broker |  | ||||||
| 	b := broker.NewBroker() |  | ||||||
|  |  | ||||||
| 	// create options with priority for our opts | 	// create options with priority for our opts | ||||||
| 	options := []micro.Option{ | 	options := []service.Option{ | ||||||
| 		micro.Client(c), | 		service.Client(c), | ||||||
| 		micro.Server(s), | 		service.Server(s), | ||||||
| 		micro.Broker(b), |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// append passed in opts | 	// append passed in opts | ||||||
| 	options = append(options, opts...) | 	options = append(options, opts...) | ||||||
|  |  | ||||||
| 	// generate and return a service | 	// generate and return a service | ||||||
| 	return micro.NewService(options...) | 	return newService(options...) | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewFunction returns a grpc service compatible with go-micro.Function |  | ||||||
| func NewFunction(opts ...micro.Option) micro.Function { |  | ||||||
| 	// our grpc client |  | ||||||
| 	c := client.NewClient() |  | ||||||
| 	// our grpc server |  | ||||||
| 	s := server.NewServer() |  | ||||||
| 	// our grpc broker |  | ||||||
| 	b := broker.NewBroker() |  | ||||||
|  |  | ||||||
| 	// create options with priority for our opts |  | ||||||
| 	options := []micro.Option{ |  | ||||||
| 		micro.Client(c), |  | ||||||
| 		micro.Server(s), |  | ||||||
| 		micro.Broker(b), |  | ||||||
| 		micro.RegisterTTL(time.Minute), |  | ||||||
| 		micro.RegisterInterval(time.Second * 30), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// append passed in opts |  | ||||||
| 	options = append(options, opts...) |  | ||||||
|  |  | ||||||
| 	// generate and return a function |  | ||||||
| 	return micro.NewFunction(options...) |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -7,8 +7,8 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro" |  | ||||||
| 	"github.com/micro/go-micro/registry/memory" | 	"github.com/micro/go-micro/registry/memory" | ||||||
|  | 	"github.com/micro/go-micro/service" | ||||||
| 	hello "github.com/micro/go-micro/service/grpc/proto" | 	hello "github.com/micro/go-micro/service/grpc/proto" | ||||||
| 	mls "github.com/micro/go-micro/util/tls" | 	mls "github.com/micro/go-micro/util/tls" | ||||||
| ) | ) | ||||||
| @@ -32,13 +32,13 @@ func TestGRPCService(t *testing.T) { | |||||||
|  |  | ||||||
| 	// create GRPC service | 	// create GRPC service | ||||||
| 	service := NewService( | 	service := NewService( | ||||||
| 		micro.Name("test.service"), | 		service.Name("test.service"), | ||||||
| 		micro.Registry(r), | 		service.Registry(r), | ||||||
| 		micro.AfterStart(func() error { | 		service.AfterStart(func() error { | ||||||
| 			wg.Done() | 			wg.Done() | ||||||
| 			return nil | 			return nil | ||||||
| 		}), | 		}), | ||||||
| 		micro.Context(ctx), | 		service.Context(ctx), | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	// register test handler | 	// register test handler | ||||||
| @@ -81,50 +81,6 @@ func TestGRPCService(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestGRPCFunction(t *testing.T) { |  | ||||||
| 	var wg sync.WaitGroup |  | ||||||
| 	wg.Add(1) |  | ||||||
|  |  | ||||||
| 	ctx, cancel := context.WithCancel(context.Background()) |  | ||||||
| 	defer cancel() |  | ||||||
|  |  | ||||||
| 	// create service |  | ||||||
| 	fn := NewFunction( |  | ||||||
| 		micro.Name("test.function"), |  | ||||||
| 		micro.Registry(memory.NewRegistry()), |  | ||||||
| 		micro.AfterStart(func() error { |  | ||||||
| 			wg.Done() |  | ||||||
| 			return nil |  | ||||||
| 		}), |  | ||||||
| 		micro.Context(ctx), |  | ||||||
| 	) |  | ||||||
|  |  | ||||||
| 	// register test handler |  | ||||||
| 	hello.RegisterTestHandler(fn.Server(), &testHandler{}) |  | ||||||
|  |  | ||||||
| 	// run service |  | ||||||
| 	go fn.Run() |  | ||||||
|  |  | ||||||
| 	// wait for start |  | ||||||
| 	wg.Wait() |  | ||||||
|  |  | ||||||
| 	// create client |  | ||||||
| 	test := hello.NewTestService("test.function", fn.Client()) |  | ||||||
|  |  | ||||||
| 	// call service |  | ||||||
| 	rsp, err := test.Call(context.Background(), &hello.Request{ |  | ||||||
| 		Name: "John", |  | ||||||
| 	}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatal(err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// check message |  | ||||||
| 	if rsp.Msg != "Hello John" { |  | ||||||
| 		t.Fatalf("unexpected response %s", rsp.Msg) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestGRPCTLSService(t *testing.T) { | func TestGRPCTLSService(t *testing.T) { | ||||||
| 	var wg sync.WaitGroup | 	var wg sync.WaitGroup | ||||||
| 	wg.Add(1) | 	wg.Add(1) | ||||||
| @@ -147,13 +103,13 @@ func TestGRPCTLSService(t *testing.T) { | |||||||
|  |  | ||||||
| 	// create GRPC service | 	// create GRPC service | ||||||
| 	service := NewService( | 	service := NewService( | ||||||
| 		micro.Name("test.service"), | 		service.Name("test.service"), | ||||||
| 		micro.Registry(r), | 		service.Registry(r), | ||||||
| 		micro.AfterStart(func() error { | 		service.AfterStart(func() error { | ||||||
| 			wg.Done() | 			wg.Done() | ||||||
| 			return nil | 			return nil | ||||||
| 		}), | 		}), | ||||||
| 		micro.Context(ctx), | 		service.Context(ctx), | ||||||
| 		// set TLS config | 		// set TLS config | ||||||
| 		WithTLS(config), | 		WithTLS(config), | ||||||
| 	) | 	) | ||||||
|   | |||||||
| @@ -3,14 +3,14 @@ package grpc | |||||||
| import ( | import ( | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro" |  | ||||||
| 	gc "github.com/micro/go-micro/client/grpc" | 	gc "github.com/micro/go-micro/client/grpc" | ||||||
| 	gs "github.com/micro/go-micro/server/grpc" | 	gs "github.com/micro/go-micro/server/grpc" | ||||||
|  | 	"github.com/micro/go-micro/service" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // WithTLS sets the TLS config for the service | // WithTLS sets the TLS config for the service | ||||||
| func WithTLS(t *tls.Config) micro.Option { | func WithTLS(t *tls.Config) service.Option { | ||||||
| 	return func(o *micro.Options) { | 	return func(o *service.Options) { | ||||||
| 		o.Client.Init( | 		o.Client.Init( | ||||||
| 			gc.AuthTLS(t), | 			gc.AuthTLS(t), | ||||||
| 		) | 		) | ||||||
|   | |||||||
| @@ -2,20 +2,116 @@ | |||||||
| package mucp | package mucp | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	// TODO: change to go-micro/service | 	"github.com/micro/go-micro/client" | ||||||
| 	"github.com/micro/go-micro" | 	"github.com/micro/go-micro/server" | ||||||
| 	cmucp "github.com/micro/go-micro/client/mucp" | 	cmucp "github.com/micro/go-micro/client/mucp" | ||||||
| 	smucp "github.com/micro/go-micro/server/mucp" | 	smucp "github.com/micro/go-micro/server/mucp" | ||||||
|  | 	"github.com/micro/go-micro/service" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | type mucpService struct { | ||||||
|  | 	opts service.Options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newService(opts ...service.Option) service.Service { | ||||||
|  | 	options := service.NewOptions(opts...) | ||||||
|  |  | ||||||
|  | 	return &mucpService{ | ||||||
|  | 		opts: options, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Name() string { | ||||||
|  | 	return s.opts.Server.Options().Name | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Init initialises options. Additionally it calls cmd.Init | ||||||
|  | // which parses command line flags. cmd.Init is only called | ||||||
|  | // on first Init. | ||||||
|  | func (s *mucpService) Init(opts ...service.Option) { | ||||||
|  | 	// process options | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&s.opts) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Options() service.Options { | ||||||
|  | 	return s.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Client() client.Client { | ||||||
|  | 	return s.opts.Client | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Server() server.Server { | ||||||
|  | 	return s.opts.Server | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) String() string { | ||||||
|  | 	return "mucp" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Start() error { | ||||||
|  | 	for _, fn := range s.opts.BeforeStart { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := s.opts.Server.Start(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.AfterStart { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Stop() error { | ||||||
|  | 	var gerr error | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.BeforeStop { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			gerr = err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := s.opts.Server.Stop(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, fn := range s.opts.AfterStop { | ||||||
|  | 		if err := fn(); err != nil { | ||||||
|  | 			gerr = err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return gerr | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *mucpService) Run() error { | ||||||
|  | 	if err := s.Start(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// wait on context cancel | ||||||
|  | 	<-s.opts.Context.Done() | ||||||
|  |  | ||||||
|  | 	return s.Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewService returns a new mucp service | // NewService returns a new mucp service | ||||||
| func NewService(opts ...micro.Option) micro.Service { | func NewService(opts ...service.Option) service.Service { | ||||||
| 	options := []micro.Option{ | 	options := []service.Option{ | ||||||
| 		micro.Client(cmucp.NewClient()), | 		service.Client(cmucp.NewClient()), | ||||||
| 		micro.Server(smucp.NewServer()), | 		service.Server(smucp.NewServer()), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	options = append(options, opts...) | 	options = append(options, opts...) | ||||||
|  |  | ||||||
| 	return micro.NewService(options...) | 	return newService(options...) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -31,7 +31,7 @@ type Options struct { | |||||||
|  |  | ||||||
| type Option func(*Options) | type Option func(*Options) | ||||||
|  |  | ||||||
| func newOptions(opts ...Option) Options { | func NewOptions(opts ...Option) Options { | ||||||
| 	opt := Options{ | 	opt := Options{ | ||||||
| 		Broker:    broker.DefaultBroker, | 		Broker:    broker.DefaultBroker, | ||||||
| 		Client:    client.DefaultClient, | 		Client:    client.DefaultClient, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user