Checkpoint the world
This commit is contained in:
		| @@ -36,3 +36,6 @@ Example usage: | |||||||
|  |  | ||||||
| // Wrapper wraps a client and returns a client | // Wrapper wraps a client and returns a client | ||||||
| type Wrapper func(Client) Client | type Wrapper func(Client) Client | ||||||
|  |  | ||||||
|  | // StreamWrapper wraps a Stream and returns the equivalent | ||||||
|  | type StreamWrapper func(Streamer) Streamer | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ import ( | |||||||
| 	"golang.org/x/net/context" | 	"golang.org/x/net/context" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Implements the streamer interface | ||||||
| type rpcStream struct { | type rpcStream struct { | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	seq     uint64 | 	seq     uint64 | ||||||
|   | |||||||
| @@ -56,9 +56,7 @@ func call(i int) { | |||||||
|  |  | ||||||
| func stream() { | func stream() { | ||||||
| 	// Create new request to service go.micro.srv.example, method Example.Call | 	// Create new request to service go.micro.srv.example, method Example.Call | ||||||
| 	req := client.NewRequest("go.micro.srv.example", "Example.Stream", &example.StreamingRequest{ | 	req := client.NewRequest("go.micro.srv.example", "Example.Stream", &example.StreamingRequest{}) | ||||||
| 		Count: int64(10), |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	stream, err := client.Stream(context.Background(), req) | 	stream, err := client.Stream(context.Background(), req) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -66,6 +64,15 @@ func stream() { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	fmt.Println("sending request") | ||||||
|  | 	if err := stream.Send(&example.StreamingRequest{ | ||||||
|  | 		Count: int64(10), | ||||||
|  | 	}); err != nil { | ||||||
|  | 		fmt.Println("err", err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	fmt.Println("sent request") | ||||||
|  |  | ||||||
| 	for stream.Error() == nil { | 	for stream.Error() == nil { | ||||||
| 		rsp := &example.StreamingResponse{} | 		rsp := &example.StreamingResponse{} | ||||||
| 		err := stream.Recv(rsp) | 		err := stream.Recv(rsp) | ||||||
| @@ -88,14 +95,14 @@ func stream() { | |||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
| 	cmd.Init() | 	cmd.Init() | ||||||
| 	fmt.Println("\n--- Call example ---\n") | 	//	fmt.Println("\n--- Call example ---\n") | ||||||
| 	for i := 0; i < 10; i++ { | 	//	for i := 0; i < 10; i++ { | ||||||
| 		call(i) | 	//		call(i) | ||||||
| 	} | 	//	} | ||||||
|  |  | ||||||
| 	fmt.Println("\n--- Streamer example ---\n") | 	fmt.Println("\n--- Streamer example ---\n") | ||||||
| 	stream() | 	stream() | ||||||
|  |  | ||||||
| 	fmt.Println("\n--- Publisher example ---\n") | 	//	fmt.Println("\n--- Publisher example ---\n") | ||||||
| 	pub() | 	//	pub() | ||||||
| } | } | ||||||
|   | |||||||
| @@ -18,16 +18,24 @@ func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.R | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (e *Example) Stream(ctx context.Context, req *example.StreamingRequest, response func(interface{}) error) error { | func (e *Example) Stream(ctx context.Context, stream server.Streamer) error { | ||||||
|  | 	log.Info("Executing streaming handler") | ||||||
|  | 	req := &example.StreamingRequest{} | ||||||
|  |  | ||||||
|  | 	// We just want to receive 1 request and then process here | ||||||
|  | 	if err := stream.Recv(req); err != nil { | ||||||
|  | 		log.Errorf("Error receiving streaming request: %v", err) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	log.Infof("Received Example.Stream request with count: %d", req.Count) | 	log.Infof("Received Example.Stream request with count: %d", req.Count) | ||||||
|  |  | ||||||
| 	for i := 0; i < int(req.Count); i++ { | 	for i := 0; i < int(req.Count); i++ { | ||||||
| 		log.Infof("Responding: %d", i) | 		log.Infof("Responding: %d", i) | ||||||
|  |  | ||||||
| 		r := &example.StreamingResponse{ | 		if err := stream.Send(&example.StreamingResponse{ | ||||||
| 			Count: int64(i), | 			Count: int64(i), | ||||||
| 		} | 		}); err != nil { | ||||||
|  |  | ||||||
| 		if err := response(r); err != nil { |  | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -1,14 +1,13 @@ | |||||||
| package server | package server | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"errors" |  | ||||||
| 	"io" |  | ||||||
| 	"log" | 	"log" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| 	"golang.org/x/net/context" | 	"golang.org/x/net/context" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Implements the Streamer interface | ||||||
| type rpcStream struct { | type rpcStream struct { | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	seq     uint64 | 	seq     uint64 | ||||||
| @@ -39,7 +38,7 @@ func (r *rpcStream) Send(msg interface{}) error { | |||||||
| 		Seq:           seq, | 		Seq:           seq, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	err := codec.WriteResponse(&resp, msg, false) | 	err := r.codec.WriteResponse(&resp, msg, false) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Println("rpc: writing response:", err) | 		log.Println("rpc: writing response:", err) | ||||||
| 	} | 	} | ||||||
| @@ -52,13 +51,13 @@ func (r *rpcStream) Recv(msg interface{}) error { | |||||||
|  |  | ||||||
| 	req := request{} | 	req := request{} | ||||||
|  |  | ||||||
| 	if err := codec.ReadRequestHeader(&req); err != nil { | 	if err := r.codec.ReadRequestHeader(&req); err != nil { | ||||||
| 		// discard body | 		// discard body | ||||||
| 		codec.ReadRequestBody(nil) | 		r.codec.ReadRequestBody(nil) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err = codec.ReadRequestBody(msg); err != nil { | 	if err := r.codec.ReadRequestBody(msg); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -102,14 +102,19 @@ func prepareMethod(method reflect.Method) *methodType { | |||||||
| 	mtype := method.Type | 	mtype := method.Type | ||||||
| 	mname := method.Name | 	mname := method.Name | ||||||
| 	var replyType, argType, contextType reflect.Type | 	var replyType, argType, contextType reflect.Type | ||||||
|  | 	var stream bool | ||||||
|  |  | ||||||
| 	stream := false |  | ||||||
| 	// Method must be exported. | 	// Method must be exported. | ||||||
| 	if method.PkgPath != "" { | 	if method.PkgPath != "" { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	switch mtype.NumIn() { | 	switch mtype.NumIn() { | ||||||
|  | 	case 3: | ||||||
|  | 		// assuming streaming | ||||||
|  | 		argType = mtype.In(2) | ||||||
|  | 		contextType = mtype.In(1) | ||||||
|  | 		stream = true | ||||||
| 	case 4: | 	case 4: | ||||||
| 		// method that takes a context | 		// method that takes a context | ||||||
| 		argType = mtype.In(2) | 		argType = mtype.In(2) | ||||||
| @@ -120,44 +125,34 @@ func prepareMethod(method reflect.Method) *methodType { | |||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// First arg need not be a pointer. | 	if stream { | ||||||
| 	if !isExportedOrBuiltinType(argType) { | 		// check stream type | ||||||
| 		log.Println(mname, "argument type not exported:", argType) | 		streamType := reflect.TypeOf((*Streamer)(nil)).Elem() | ||||||
| 		return nil | 		if !argType.Implements(streamType) { | ||||||
| 	} | 			log.Println(mname, "argument does not implement Streamer interface:", argType) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		// if not stream check the replyType | ||||||
|  |  | ||||||
| 	// the second argument will tell us if it's a streaming call | 		// First arg need not be a pointer. | ||||||
| 	// or a regular call | 		if !isExportedOrBuiltinType(argType) { | ||||||
| 	if replyType.Kind() == reflect.Func { | 			log.Println(mname, "argument type not exported:", argType) | ||||||
| 		// this is a streaming call |  | ||||||
| 		stream = true |  | ||||||
| 		if replyType.NumIn() != 1 { |  | ||||||
| 			log.Println("method", mname, "sendReply has wrong number of ins:", replyType.NumIn()) |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 		if replyType.In(0).Kind() != reflect.Interface { |  | ||||||
| 			log.Println("method", mname, "sendReply parameter type not an interface:", replyType.In(0)) |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 		if replyType.NumOut() != 1 { |  | ||||||
| 			log.Println("method", mname, "sendReply has wrong number of outs:", replyType.NumOut()) |  | ||||||
| 			return nil |  | ||||||
| 		} |  | ||||||
| 		if returnType := replyType.Out(0); returnType != typeOfError { |  | ||||||
| 			log.Println("method", mname, "sendReply returns", returnType.String(), "not error") |  | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	} else if replyType.Kind() != reflect.Ptr { | 		if replyType.Kind() != reflect.Ptr { | ||||||
| 		log.Println("method", mname, "reply type not a pointer:", replyType) | 			log.Println("method", mname, "reply type not a pointer:", replyType) | ||||||
| 		return nil | 			return nil | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// Reply type must be exported. | ||||||
|  | 		if !isExportedOrBuiltinType(replyType) { | ||||||
|  | 			log.Println("method", mname, "reply type not exported:", replyType) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Reply type must be exported. |  | ||||||
| 	if !isExportedOrBuiltinType(replyType) { |  | ||||||
| 		log.Println("method", mname, "reply type not exported:", replyType) |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	// Method needs one out. | 	// Method needs one out. | ||||||
| 	if mtype.NumOut() != 1 { | 	if mtype.NumOut() != 1 { | ||||||
| 		log.Println("method", mname, "has wrong number of outs:", mtype.NumOut()) | 		log.Println("method", mname, "has wrong number of outs:", mtype.NumOut()) | ||||||
| @@ -242,10 +237,11 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, | |||||||
| 		service:     s.name, | 		service:     s.name, | ||||||
| 		contentType: ct, | 		contentType: ct, | ||||||
| 		method:      req.ServiceMethod, | 		method:      req.ServiceMethod, | ||||||
| 		request:     argv.Interface(), |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if !mtype.stream { | 	if !mtype.stream { | ||||||
|  | 		r.request = argv.Interface() | ||||||
|  |  | ||||||
| 		fn := func(ctx context.Context, req Request, rsp interface{}) error { | 		fn := func(ctx context.Context, req Request, rsp interface{}) error { | ||||||
| 			returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)}) | 			returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rsp)}) | ||||||
|  |  | ||||||
| @@ -276,40 +272,16 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, | |||||||
| 	// keep track of the type, to make sure we return | 	// keep track of the type, to make sure we return | ||||||
| 	// the same one consistently | 	// the same one consistently | ||||||
| 	var lastError error | 	var lastError error | ||||||
| 	var firstType reflect.Type |  | ||||||
|  |  | ||||||
| 	sendReply := func(oneReply interface{}) error { | 	stream := &rpcStream{ | ||||||
|  | 		context: ctx, | ||||||
| 		// we already triggered an error, we're done | 		codec:   codec, | ||||||
| 		if lastError != nil { | 		request: r, | ||||||
| 			return lastError |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// check the oneReply has the right type using reflection |  | ||||||
| 		typ := reflect.TypeOf(oneReply) |  | ||||||
| 		if firstType == nil { |  | ||||||
| 			firstType = typ |  | ||||||
| 		} else { |  | ||||||
| 			if firstType != typ { |  | ||||||
| 				log.Println("passing wrong type to sendReply", |  | ||||||
| 					firstType, "!=", typ) |  | ||||||
| 				lastError = errors.New("rpc: passing wrong type to sendReply") |  | ||||||
| 				return lastError |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		lastError = server.sendResponse(sending, req, oneReply, codec, "", false) |  | ||||||
| 		if lastError != nil { |  | ||||||
| 			return lastError |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// we manage to send, we're good |  | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Invoke the method, providing a new value for the reply. | 	// Invoke the method, providing a new value for the reply. | ||||||
| 	fn := func(ctx context.Context, req Request, rspFn interface{}) error { | 	fn := func(ctx context.Context, req Request, stream interface{}) error { | ||||||
| 		returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(req.Request()), reflect.ValueOf(rspFn)}) | 		returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(stream)}) | ||||||
| 		if err := returnValues[0].Interface(); err != nil { | 		if err := returnValues[0].Interface(); err != nil { | ||||||
| 			// the function returned an error, we use that | 			// the function returned an error, we use that | ||||||
| 			return err.(error) | 			return err.(error) | ||||||
| @@ -331,7 +303,7 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex, | |||||||
| 	r.stream = true | 	r.stream = true | ||||||
|  |  | ||||||
| 	errmsg := "" | 	errmsg := "" | ||||||
| 	if err := fn(ctx, r, reflect.ValueOf(sendReply).Interface()); err != nil { | 	if err := fn(ctx, r, stream); err != nil { | ||||||
| 		errmsg = err.Error() | 		errmsg = err.Error() | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -418,6 +390,12 @@ func (server *server) readRequest(codec serverCodec) (service *service, mtype *m | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// is it a streaming request? then we don't read the body | ||||||
|  | 	if mtype.stream { | ||||||
|  | 		codec.ReadRequestBody(nil) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// Decode the argument value. | 	// Decode the argument value. | ||||||
| 	argIsValue := false // if true, need to indirect before calling. | 	argIsValue := false // if true, need to indirect before calling. | ||||||
| 	if mtype.ArgType.Kind() == reflect.Ptr { | 	if mtype.ArgType.Kind() == reflect.Ptr { | ||||||
|   | |||||||
| @@ -20,4 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc | |||||||
| // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent | // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent | ||||||
| type SubscriberWrapper func(SubscriberFunc) SubscriberFunc | type SubscriberWrapper func(SubscriberFunc) SubscriberFunc | ||||||
|  |  | ||||||
| type StreamWrapper func(Streamer) Streamer | // StreamerWrapper wraps a Streamer interface and returns the equivalent. | ||||||
|  | // Because streams exist for the lifetime of a method invocation this | ||||||
|  | // is a convenient way to wrap a Stream as its in use for trace, monitoring, | ||||||
|  | // metrics, etc. | ||||||
|  | type StreamerWrapper func(Streamer) Streamer | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user