diff --git a/examples/client/pub/pub.go b/examples/client/pub/pub.go new file mode 100644 index 00000000..0b3fa80f --- /dev/null +++ b/examples/client/pub/pub.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + c "github.com/micro/go-micro/context" + example "github.com/micro/go-micro/examples/server/proto/example" + "golang.org/x/net/context" +) + +// publishes a message +func pub(i int) { + msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{ + Say: fmt.Sprintf("This is a publication %d", i), + }) + + // create context with metadata + ctx := c.WithMetadata(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + // publish message + if err := client.Publish(ctx, msg); err != nil { + fmt.Println("pub err: ", err) + return + } + + fmt.Printf("Published %d: %v\n", i, msg) +} + +func main() { + cmd.Init() + fmt.Println("\n--- Publisher example ---\n") + // for i := 0; i < 10; i++ { + i := 0 + for { + pub(i) + i++ + time.Sleep(time.Millisecond * 5) + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 21f5cf3c..76633762 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -208,7 +208,7 @@ func (s *rpcServer) Register() error { defer s.Unlock() for sb, _ := range s.subscribers { - handler := s.createSubHandler(sb) + handler := s.createSubHandler(sb, s.opts) sub, err := config.broker.Subscribe(sb.Topic(), handler) if err != nil { return err @@ -279,7 +279,7 @@ func (s *rpcServer) Start() error { registerHealthChecker(s) config := s.Config() - ts, err := config.transport.Listen(s.opts.address) + ts, err := config.transport.Listen(config.address) if err != nil { return err } diff --git a/server/subscriber.go b/server/subscriber.go index d5583db9..8f769707 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -154,28 +154,23 @@ func validateSubscriber(sub Subscriber) error { return nil } -func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { +func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { return func(msg *broker.Message) { cf, err := s.newCodec(msg.Header["Content-Type"]) if err != nil { return } - b := &buffer{bytes.NewBuffer(msg.Body)} - co := cf(b) - if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { - return - } - hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v } delete(hdr, "Content-Type") ctx := c.WithMetadata(context.Background(), hdr) - rctx := reflect.ValueOf(ctx) - for _, handler := range sb.handlers { + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + var isVal bool var req reflect.Value @@ -185,26 +180,45 @@ func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { req = reflect.New(handler.reqType) isVal = true } + if isVal { + req = req.Elem() + } + + b := &buffer{bytes.NewBuffer(msg.Body)} + co := cf(b) + defer co.Close() + + if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + continue + } if err := co.ReadBody(req.Interface()); err != nil { continue } - if isVal { - req = req.Elem() + fn := func(ctx context.Context, msg interface{}) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg)) + + returnValues := handler.method.Call(vals) + if err := returnValues[0].Interface(); err != nil { + return err.(error) + } + return nil } - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) + for i := len(opts.subWrappers); i > 0; i-- { + fn = opts.subWrappers[i-1](fn) } - if handler.ctxType != nil { - vals = append(vals, rctx) - } - - vals = append(vals, req) - go handler.method.Call(vals) + go fn(ctx, req.Interface()) } } }