Added service.Router Route CRUD. Outlined watcher and run()

This commit is contained in:
Milos Gajdos 2019-07-26 14:05:03 +01:00
parent b6fb969ab9
commit ddad43bd77
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
3 changed files with 139 additions and 11 deletions

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"sync" "sync"
"github.com/google/uuid"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/network/router" "github.com/micro/go-micro/network/router"
pb "github.com/micro/go-micro/network/router/proto" pb "github.com/micro/go-micro/network/router/proto"
@ -16,9 +17,10 @@ var (
) )
type svc struct { type svc struct {
router pb.RouterService opts router.Options
opts router.Options router pb.RouterService
status router.Status status router.Status
watchers map[string]*svcWatcher
sync.RWMutex sync.RWMutex
} }
@ -37,9 +39,10 @@ func NewRouter(opts ...router.Option) router.Router {
// NOTE: should we have Client/Service option in router.Options? // NOTE: should we have Client/Service option in router.Options?
s := &svc{ s := &svc{
opts: options, opts: options,
status: router.Status{Code: router.Stopped, Error: nil}, router: pb.NewRouterService(router.DefaultName, client),
router: pb.NewRouterService(router.DefaultName, client), status: router.Status{Code: router.Stopped, Error: nil},
watchers: make(map[string]*svcWatcher),
} }
return s return s
@ -58,8 +61,22 @@ func (s *svc) Options() router.Options {
return s.opts return s.opts
} }
// Run runs the router.
// It returns error if the router is already running.
func (s *svc) run() {
s.Lock()
defer s.Unlock()
switch s.status.Code {
case router.Stopped, router.Error:
// TODO: start event stream watcher
// TODO: start watchError monitor
}
}
// Advertise advertises routes to the network // Advertise advertises routes to the network
func (s *svc) Advertise() (<-chan *router.Advert, error) { func (s *svc) Advertise() (<-chan *router.Advert, error) {
// TODO: start advert stream watcher
return nil, nil return nil, nil
} }
@ -70,17 +87,56 @@ func (s *svc) Process(a *router.Advert) error {
// Create new route in the routing table // Create new route in the routing table
func (s *svc) Create(r router.Route) error { func (s *svc) Create(r router.Route) error {
return ErrNotImplemented route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Create(context.Background(), route); err != nil {
return err
}
return nil
} }
// Delete deletes existing route from the routing table // Delete deletes existing route from the routing table
func (s *svc) Delete(r router.Route) error { func (s *svc) Delete(r router.Route) error {
return ErrNotImplemented route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Delete(context.Background(), route); err != nil {
return err
}
return nil
} }
// Update updates route in the routing table // Update updates route in the routing table
func (s *svc) Update(r router.Route) error { func (s *svc) Update(r router.Route) error {
return ErrNotImplemented route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Update(context.Background(), route); err != nil {
return err
}
return nil
} }
// List returns the list of all routes in the table // List returns the list of all routes in the table
@ -138,7 +194,25 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
// Watch returns a watcher which allows to track updates to the routing table // Watch returns a watcher which allows to track updates to the routing table
func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) { func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil wopts := router.WatchOptions{
Service: "*",
}
for _, o := range opts {
o(&wopts)
}
w := &svcWatcher{
opts: wopts,
resChan: make(chan *router.Event, 10),
done: make(chan struct{}),
}
s.Lock()
s.watchers[uuid.New().String()] = w
s.Unlock()
return w, nil
} }
// Status returns router status // Status returns router status

View File

@ -0,0 +1,49 @@
package service
import (
"sync"
"github.com/micro/go-micro/network/router"
)
type svcWatcher struct {
opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
sync.RWMutex
}
// Next is a blocking call that returns watch result
func (w *svcWatcher) Next() (*router.Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Service {
case res.Route.Service, "*":
return res, nil
default:
continue
}
case <-w.done:
return nil, router.ErrWatcherStopped
}
}
}
// Chan returns event channel
func (w *svcWatcher) Chan() (<-chan *router.Event, error) {
return w.resChan, nil
}
// Stop stops watcher
func (w *svcWatcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return
default:
close(w.done)
}
}

View File

@ -2,6 +2,7 @@ package router
import ( import (
"errors" "errors"
"sync"
"time" "time"
) )
@ -78,10 +79,11 @@ type tableWatcher struct {
opts WatchOptions opts WatchOptions
resChan chan *Event resChan chan *Event
done chan struct{} done chan struct{}
sync.RWMutex
} }
// Next returns the next noticed action taken on table // Next returns the next noticed action taken on table
// TODO: think this through properly; right now we only watch service // TODO: right now we only allow to watch particular service
func (w *tableWatcher) Next() (*Event, error) { func (w *tableWatcher) Next() (*Event, error) {
for { for {
select { select {
@ -105,6 +107,9 @@ func (w *tableWatcher) Chan() (<-chan *Event, error) {
// Stop stops routing table watcher // Stop stops routing table watcher
func (w *tableWatcher) Stop() { func (w *tableWatcher) Stop() {
w.Lock()
defer w.Unlock()
select { select {
case <-w.done: case <-w.done:
return return