From ee43cf634964330f4ebf26a2a52407fb7b6ce673 Mon Sep 17 00:00:00 2001
From: Vasiliy Tolstov <v.tolstov@unistack.org>
Date: Fri, 16 Oct 2020 16:48:52 +0300
Subject: [PATCH] fixup for never micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
---
 go.mod |  2 +-
 go.sum |  4 ++--
 tcp.go | 31 +++++++++++++++++++++----------
 3 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/go.mod b/go.mod
index aa0a142..cb393a2 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,6 @@ module github.com/unistack-org/micro-server-tcp
 go 1.13
 
 require (
-	github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47
+	github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f
 	golang.org/x/net v0.0.0-20200904194848-62affa334b73
 )
diff --git a/go.sum b/go.sum
index aa09c2c..2de06cf 100644
--- a/go.sum
+++ b/go.sum
@@ -289,8 +289,8 @@ github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.m
 github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4=
 github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c=
 github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE=
-github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 h1:3d/HgT7Iq/UIw5OGyzfUeZPJwydhBohh9shyGJH14EA=
-github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=
+github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f h1:9PLnkfb9vdn1yHlKLIGo5AiSNzqerZscsm9R+uW+DAw=
+github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=
 github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
 github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
diff --git a/tcp.go b/tcp.go
index 711bbf8..6b94fd1 100644
--- a/tcp.go
+++ b/tcp.go
@@ -16,7 +16,6 @@ import (
 	"github.com/unistack-org/micro/v3/logger"
 	"github.com/unistack-org/micro/v3/registry"
 	"github.com/unistack-org/micro/v3/server"
-	regutil "github.com/unistack-org/micro/v3/util/registry"
 	"golang.org/x/net/netutil"
 )
 
@@ -139,12 +138,12 @@ func (h *tcpServer) Register() error {
 	eps := h.hd.Endpoints()
 	h.Unlock()
 
-	service, err := regutil.NewService(h)
+	service, err := server.NewRegistryService(h)
 	if err != nil {
 		return err
 	}
-	service.Metadata["protocol"] = "tcp"
-	service.Metadata["transport"] = "tcp"
+	service.Nodes[0].Metadata["protocol"] = "tcp"
+	service.Nodes[0].Metadata["transport"] = "tcp"
 	service.Endpoints = eps
 
 	h.Lock()
@@ -183,18 +182,22 @@ func (h *tcpServer) Register() error {
 	}
 	h.registered = true
 
+	subCtx := h.opts.Context
+
 	for sb := range h.subscribers {
 		handler := h.createSubHandler(sb, opts)
 		var subOpts []broker.SubscribeOption
 		if queue := sb.Options().Queue; len(queue) > 0 {
 			subOpts = append(subOpts, broker.Queue(queue))
 		}
-
+		if cx := sb.Options().Context; cx != nil {
+			subCtx = cx
+		}
 		if !sb.Options().AutoAck {
 			subOpts = append(subOpts, broker.DisableAutoAck())
 		}
 
-		sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
+		sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
 		if err != nil {
 			return err
 		}
@@ -210,7 +213,7 @@ func (h *tcpServer) Deregister() error {
 
 	logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
 
-	service, err := regutil.NewService(h)
+	service, err := server.NewRegistryService(h)
 	if err != nil {
 		return err
 	}
@@ -225,10 +228,18 @@ func (h *tcpServer) Deregister() error {
 	}
 	h.registered = false
 
+	subCtx := h.opts.Context
+
 	for sb, subs := range h.subscribers {
+		if cx := sb.Options().Context; cx != nil {
+			subCtx = cx
+		}
 		for _, sub := range subs {
 			logger.Infof("Unsubscribing from topic: %s", sub.Topic())
-			sub.Unsubscribe()
+			if err := sub.Unsubscribe(subCtx); err != nil {
+				logger.Errorf("failed to unsubscribe topic: %s error: %v", sb.Topic(), err)
+				return err
+			}
 		}
 		h.subscribers[sb] = nil
 	}
@@ -285,7 +296,7 @@ func (h *tcpServer) Start() error {
 	h.opts.Address = ts.Addr().String()
 	h.Unlock()
 
-	if err = opts.Broker.Connect(); err != nil {
+	if err = opts.Broker.Connect(h.opts.Context); err != nil {
 		return err
 	}
 
@@ -355,7 +366,7 @@ func (h *tcpServer) Start() error {
 		// deregister
 		h.Deregister()
 
-		opts.Broker.Disconnect()
+		opts.Broker.Disconnect(h.opts.Context)
 	}()
 
 	return nil