store handler implementation
This commit is contained in:
		
							
								
								
									
										89
									
								
								store/service/handler/handler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								store/service/handler/handler.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | ||||
| package handler | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"io" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/errors" | ||||
| 	"github.com/micro/go-micro/store" | ||||
| 	pb "github.com/micro/go-micro/store/service/proto" | ||||
| ) | ||||
|  | ||||
| type Store struct { | ||||
| 	Store store.Store | ||||
| } | ||||
|  | ||||
| func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error { | ||||
| 	vals, err := s.Store.Read(req.Keys...) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.store", err.Error()) | ||||
| 	} | ||||
| 	for _, val := range vals { | ||||
| 		rsp.Records = append(rsp.Records, &pb.Record{ | ||||
| 			Key:    val.Key, | ||||
| 			Value:  val.Value, | ||||
| 			Expiry: int64(val.Expiry.Seconds()), | ||||
| 		}) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error { | ||||
| 	var records []*store.Record | ||||
|  | ||||
| 	for _, record := range req.Records { | ||||
| 		records = append(records, &store.Record{ | ||||
| 			Key:    record.Key, | ||||
| 			Value:  record.Value, | ||||
| 			Expiry: time.Duration(record.Expiry) * time.Second, | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	err := s.Store.Write(records...) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.store", err.Error()) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error { | ||||
| 	err := s.Store.Delete(req.Keys...) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.store", err.Error()) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error { | ||||
| 	var vals []*store.Record | ||||
| 	var err error | ||||
|  | ||||
| 	if len(req.Key) > 0 { | ||||
| 		vals, err = s.Store.Read(req.Key) | ||||
| 	} else { | ||||
| 		vals, err = s.Store.Sync() | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.store", err.Error()) | ||||
| 	} | ||||
| 	rsp := new(pb.SyncResponse) | ||||
|  | ||||
| 	// TODO: batch sync | ||||
| 	for _, val := range vals { | ||||
| 		rsp.Records = append(rsp.Records, &pb.Record{ | ||||
| 			Key:    val.Key, | ||||
| 			Value:  val.Value, | ||||
| 			Expiry: int64(val.Expiry.Seconds()), | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	err = stream.Send(rsp) | ||||
| 	if err == io.EOF { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.store", err.Error()) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user