commit
c73a88e801
@ -105,7 +105,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
|
|||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
|
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
|
||||||
err = client.Call(ctx, request.Method(), request.Request(), response)
|
err = client.Call(ctx, request.Service(), request.Method(), request.Request(), response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -137,7 +137,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request,
|
|||||||
}
|
}
|
||||||
|
|
||||||
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
|
client := newClientWithCodec(newRpcPlusCodec(msg, c, cf))
|
||||||
call := client.StreamGo(request.Method(), request.Request(), responseChan)
|
call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan)
|
||||||
|
|
||||||
return &rpcStream{
|
return &rpcStream{
|
||||||
request: request,
|
request: request,
|
||||||
|
@ -64,14 +64,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne
|
|||||||
|
|
||||||
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
|
func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error {
|
||||||
m := &codec.Message{
|
m := &codec.Message{
|
||||||
Id: req.Seq,
|
Id: req.Seq,
|
||||||
Method: req.ServiceMethod,
|
Target: req.Service,
|
||||||
Type: codec.Request,
|
Method: req.ServiceMethod,
|
||||||
|
Type: codec.Request,
|
||||||
|
Headers: map[string]string{},
|
||||||
}
|
}
|
||||||
if err := c.codec.Write(m, body); err != nil {
|
if err := c.codec.Write(m, body); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.req.Body = c.buf.wbuf.Bytes()
|
c.req.Body = c.buf.wbuf.Bytes()
|
||||||
|
for k, v := range m.Headers {
|
||||||
|
c.req.Header[k] = v
|
||||||
|
}
|
||||||
return c.client.Send(c.req)
|
return c.client.Send(c.req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ var errShutdown = errors.New("connection is shut down")
|
|||||||
|
|
||||||
// call represents an active RPC.
|
// call represents an active RPC.
|
||||||
type call struct {
|
type call struct {
|
||||||
|
Service string
|
||||||
ServiceMethod string // The name of the service and method to call.
|
ServiceMethod string // The name of the service and method to call.
|
||||||
Args interface{} // The argument to the function (*struct).
|
Args interface{} // The argument to the function (*struct).
|
||||||
Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming).
|
Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming).
|
||||||
@ -65,6 +66,7 @@ type clientCodec interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
|
Service string
|
||||||
ServiceMethod string // format: "Service.Method"
|
ServiceMethod string // format: "Service.Method"
|
||||||
Seq uint64 // sequence number chosen by client
|
Seq uint64 // sequence number chosen by client
|
||||||
next *request // for free list in Server
|
next *request // for free list in Server
|
||||||
@ -95,6 +97,7 @@ func (client *client) send(call *call) {
|
|||||||
client.mutex.Unlock()
|
client.mutex.Unlock()
|
||||||
|
|
||||||
// Encode and send the request.
|
// Encode and send the request.
|
||||||
|
client.request.Service = call.Service
|
||||||
client.request.Seq = seq
|
client.request.Seq = seq
|
||||||
client.request.ServiceMethod = call.ServiceMethod
|
client.request.ServiceMethod = call.ServiceMethod
|
||||||
err := client.codec.WriteRequest(&client.request, call.Args)
|
err := client.codec.WriteRequest(&client.request, call.Args)
|
||||||
@ -241,12 +244,13 @@ func (client *client) Close() error {
|
|||||||
// the invocation. The done channel will signal when the call is complete by returning
|
// the invocation. The done channel will signal when the call is complete by returning
|
||||||
// the same call object. If done is nil, Go will allocate a new channel.
|
// the same call object. If done is nil, Go will allocate a new channel.
|
||||||
// If non-nil, done must be buffered or Go will deliberately crash.
|
// If non-nil, done must be buffered or Go will deliberately crash.
|
||||||
func (client *client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call {
|
func (client *client) Go(ctx context.Context, service, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call {
|
||||||
span := trace.NewSpanFromContext(ctx)
|
span := trace.NewSpanFromContext(ctx)
|
||||||
span.StartClient(serviceMethod)
|
span.StartClient(serviceMethod)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
cal := new(call)
|
cal := new(call)
|
||||||
|
cal.Service = service
|
||||||
cal.ServiceMethod = serviceMethod
|
cal.ServiceMethod = serviceMethod
|
||||||
cal.Args = args
|
cal.Args = args
|
||||||
cal.Reply = reply
|
cal.Reply = reply
|
||||||
@ -268,7 +272,7 @@ func (client *client) Go(ctx context.Context, serviceMethod string, args interfa
|
|||||||
|
|
||||||
// StreamGo invokes the streaming function asynchronously. It returns the call structure representing
|
// StreamGo invokes the streaming function asynchronously. It returns the call structure representing
|
||||||
// the invocation.
|
// the invocation.
|
||||||
func (client *client) StreamGo(serviceMethod string, args interface{}, replyStream interface{}) *call {
|
func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call {
|
||||||
// first check the replyStream object is a stream of pointers to a data structure
|
// first check the replyStream object is a stream of pointers to a data structure
|
||||||
typ := reflect.TypeOf(replyStream)
|
typ := reflect.TypeOf(replyStream)
|
||||||
// FIXME: check the direction of the channel, maybe?
|
// FIXME: check the direction of the channel, maybe?
|
||||||
@ -278,6 +282,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre
|
|||||||
}
|
}
|
||||||
|
|
||||||
call := new(call)
|
call := new(call)
|
||||||
|
call.Service = service
|
||||||
call.ServiceMethod = serviceMethod
|
call.ServiceMethod = serviceMethod
|
||||||
call.Args = args
|
call.Args = args
|
||||||
call.Reply = replyStream
|
call.Reply = replyStream
|
||||||
@ -288,7 +293,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre
|
|||||||
}
|
}
|
||||||
|
|
||||||
// call invokes the named function, waits for it to complete, and returns its error status.
|
// call invokes the named function, waits for it to complete, and returns its error status.
|
||||||
func (client *client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
|
func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error {
|
||||||
call := <-client.Go(ctx, serviceMethod, args, reply, make(chan *call, 1)).Done
|
call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done
|
||||||
return call.Error
|
return call.Error
|
||||||
}
|
}
|
||||||
|
7
examples/mercury/README.md
Normal file
7
examples/mercury/README.md
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
# Mercury
|
||||||
|
|
||||||
|
An **experimental** integration for [mondo/mercury](https://github.com/mondough/mercury)
|
||||||
|
|
||||||
|
mercury/{client,server} are standard mercury implementations for compatibility testing sake.
|
||||||
|
|
||||||
|
micro/{client,server} are micro implementations of mercury's request/response system.
|
30
examples/mercury/mercury/client/client.go
Normal file
30
examples/mercury/mercury/client/client.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
hello "github.com/micro/micro/examples/greeter/server/proto/hello"
|
||||||
|
"github.com/mondough/mercury"
|
||||||
|
tmsg "github.com/mondough/typhon/message"
|
||||||
|
"github.com/mondough/typhon/rabbit"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
req := mercury.NewRequest()
|
||||||
|
req.SetService("foo")
|
||||||
|
req.SetEndpoint("Say.Hello")
|
||||||
|
req.SetBody(&hello.Request{
|
||||||
|
Name: "John",
|
||||||
|
})
|
||||||
|
tmsg.ProtoMarshaler().MarshalBody(req)
|
||||||
|
trans := rabbit.NewTransport()
|
||||||
|
rsp, err := trans.Send(req, time.Second)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tmsg.ProtoUnmarshaler(new(hello.Response)).UnmarshalPayload(rsp)
|
||||||
|
|
||||||
|
fmt.Println(rsp.Body())
|
||||||
|
}
|
34
examples/mercury/mercury/server/server.go
Normal file
34
examples/mercury/mercury/server/server.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/mondough/mercury"
|
||||||
|
"github.com/mondough/mercury/server"
|
||||||
|
"github.com/mondough/mercury/service"
|
||||||
|
"github.com/mondough/typhon/rabbit"
|
||||||
|
|
||||||
|
hello "github.com/micro/micro/examples/greeter/server/proto/hello"
|
||||||
|
)
|
||||||
|
|
||||||
|
func handler(req mercury.Request) (mercury.Response, error) {
|
||||||
|
request := req.Body().(*hello.Request)
|
||||||
|
rsp := req.Response(&hello.Response{
|
||||||
|
Msg: "Hey " + request.Name,
|
||||||
|
})
|
||||||
|
return rsp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
s := service.Init(service.Config{
|
||||||
|
Name: "foo",
|
||||||
|
Transport: rabbit.NewTransport(),
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Server().AddEndpoints(server.Endpoint{
|
||||||
|
Name: "Say.Hello",
|
||||||
|
Handler: handler,
|
||||||
|
Request: new(hello.Request),
|
||||||
|
Response: new(hello.Response),
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Run()
|
||||||
|
}
|
37
examples/mercury/micro/client/client.go
Normal file
37
examples/mercury/micro/client/client.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/client"
|
||||||
|
mcodec "github.com/micro/go-plugins/codec/mercury"
|
||||||
|
"github.com/micro/go-plugins/selector/mercury"
|
||||||
|
"github.com/micro/go-plugins/transport/rabbitmq"
|
||||||
|
hello "github.com/micro/micro/examples/greeter/server/proto/hello"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
rabbitmq.DefaultExchange = "b2a"
|
||||||
|
rabbitmq.DefaultRabbitURL = "amqp://localhost:5672"
|
||||||
|
|
||||||
|
c := client.NewClient(
|
||||||
|
client.Selector(mercury.NewSelector()),
|
||||||
|
client.Transport(rabbitmq.NewTransport([]string{})),
|
||||||
|
client.Codec("application/x-protobuf", mcodec.NewCodec),
|
||||||
|
client.ContentType("application/x-protobuf"),
|
||||||
|
)
|
||||||
|
|
||||||
|
req := c.NewRequest("foo", "Say.Hello", &hello.Request{
|
||||||
|
Name: "John",
|
||||||
|
})
|
||||||
|
|
||||||
|
rsp := &hello.Response{}
|
||||||
|
|
||||||
|
if err := c.Call(context.Background(), req, rsp); err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(rsp)
|
||||||
|
}
|
38
examples/mercury/micro/server/server.go
Normal file
38
examples/mercury/micro/server/server.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"github.com/micro/go-micro/server"
|
||||||
|
mcodec "github.com/micro/go-plugins/codec/mercury"
|
||||||
|
"github.com/micro/go-plugins/transport/rabbitmq"
|
||||||
|
hello "github.com/micro/micro/examples/greeter/server/proto/hello"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Say struct{}
|
||||||
|
|
||||||
|
func (s *Say) Hello(ctx context.Context, req *hello.Request, rsp *hello.Response) error {
|
||||||
|
rsp.Msg = "Hey " + req.Name
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
rabbitmq.DefaultExchange = "b2a"
|
||||||
|
rabbitmq.DefaultRabbitURL = "amqp://localhost:5672"
|
||||||
|
|
||||||
|
s := server.NewServer(
|
||||||
|
server.Name("foo"),
|
||||||
|
server.Id("foo"),
|
||||||
|
server.Address("foo"),
|
||||||
|
server.Transport(rabbitmq.NewTransport([]string{})),
|
||||||
|
server.Codec("application/x-protobuf", mcodec.NewCodec),
|
||||||
|
)
|
||||||
|
s.Handle(
|
||||||
|
s.NewHandler(&Say{}),
|
||||||
|
)
|
||||||
|
|
||||||
|
s.Start()
|
||||||
|
select {}
|
||||||
|
}
|
@ -61,7 +61,10 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.Ne
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *rpcPlusCodec) ReadRequestHeader(r *request) error {
|
func (c *rpcPlusCodec) ReadRequestHeader(r *request) error {
|
||||||
var m codec.Message
|
m := codec.Message{
|
||||||
|
Headers: c.req.Header,
|
||||||
|
}
|
||||||
|
|
||||||
err := c.codec.ReadHeader(&m, codec.Request)
|
err := c.codec.ReadHeader(&m, codec.Request)
|
||||||
r.ServiceMethod = m.Method
|
r.ServiceMethod = m.Method
|
||||||
r.Seq = m.Id
|
r.Seq = m.Id
|
||||||
@ -75,16 +78,19 @@ func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error {
|
|||||||
func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) error {
|
func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) error {
|
||||||
c.buf.wbuf.Reset()
|
c.buf.wbuf.Reset()
|
||||||
m := &codec.Message{
|
m := &codec.Message{
|
||||||
Method: r.ServiceMethod,
|
Method: r.ServiceMethod,
|
||||||
Id: r.Seq,
|
Id: r.Seq,
|
||||||
Error: r.Error,
|
Error: r.Error,
|
||||||
Type: codec.Response,
|
Type: codec.Response,
|
||||||
|
Headers: map[string]string{},
|
||||||
}
|
}
|
||||||
if err := c.codec.Write(m, body); err != nil {
|
if err := c.codec.Write(m, body); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.Headers["Content-Type"] = c.req.Header["Content-Type"]
|
||||||
return c.socket.Send(&transport.Message{
|
return c.socket.Send(&transport.Message{
|
||||||
Header: map[string]string{"Content-Type": c.req.Header["Content-Type"]},
|
Header: m.Headers,
|
||||||
Body: c.buf.wbuf.Bytes(),
|
Body: c.buf.wbuf.Bytes(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user