Compare commits

..

8 Commits

Author SHA1 Message Date
Asim Aslam
d3140c0fc2 Merge pull request #867 from milosgajdos83/rlock-mess
Avoid recursive RLock()
2019-10-18 11:35:08 +01:00
Milos Gajdos
3d5d9be02a Avoid recursive calls to RLock()
Topology calls itsel recursively invoking RLock. This, according to go
documentation is wrong. This commit moves the body of Topology function
to a non-thread safe unexported function to keep locsk at check!
2019-10-18 11:26:43 +01:00
Asim Aslam
5c38f38dd9 No need to lock here since Topology read locks and makes copies 2019-10-18 11:26:43 +01:00
Asim Aslam
63fd8b9d1b Merge pull request #864 from micro/strip-topic
strip topic from http broker subscribe service name
2019-10-17 18:48:46 +01:00
Asim Aslam
3aedea4c56 strip topic from http broker subscribe service name 2019-10-17 18:37:37 +01:00
Asim Aslam
0da9dff077 Merge pull request #863 from micro/certmagice2e
E2E tests for certmagic ACME provider
2019-10-17 16:42:33 +01:00
Jake Sanders
05774f2c76 Don't touch go.mod 2019-10-17 16:35:09 +01:00
Jake Sanders
4885bba2ac E2E tests for certmagic ACME provider
* Actually set the CA
* Fix the certmangic.storage interface to return the correct error type
* Write an e2e test for certmagic against the let's encrypt staging CA
2019-10-17 16:31:02 +01:00
5 changed files with 106 additions and 16 deletions

View File

@@ -3,7 +3,9 @@ package certmagic
import (
"log"
"math/rand"
"net"
"time"
"github.com/mholt/certmagic"
@@ -15,6 +17,7 @@ type certmagicProvider struct {
}
func (c *certmagicProvider) NewListener(ACMEHosts ...string) (net.Listener, error) {
certmagic.Default.CA = c.opts.CA
if c.opts.ChallengeProvider != nil {
// Enabling DNS Challenge disables the other challenges
certmagic.Default.DNSProvider = c.opts.ChallengeProvider
@@ -22,6 +25,16 @@ func (c *certmagicProvider) NewListener(ACMEHosts ...string) (net.Listener, erro
if c.opts.OnDemand {
certmagic.Default.OnDemand = new(certmagic.OnDemandConfig)
}
if c.opts.Cache != nil {
// already validated by new()
certmagic.Default.Storage = c.opts.Cache.(certmagic.Storage)
}
// If multiple instances of the provider are running, inject some
// randomness so they don't collide
rand.Seed(time.Now().UnixNano())
randomDuration := (7 * 24 * time.Hour) + (time.Duration(rand.Intn(504)) * time.Hour)
certmagic.Default.RenewDurationBefore = randomDuration
return certmagic.Listen(ACMEHosts)
}

View File

@@ -1,6 +1,7 @@
package certmagic
import (
"net/http"
"os"
"reflect"
"sort"
@@ -185,3 +186,46 @@ func TestStorageImplementation(t *testing.T) {
// happens
New(acme.Cache(s))
}
// Full test with a real zone, with against LE staging
func TestE2e(t *testing.T) {
apiToken, accountID := os.Getenv("CF_API_TOKEN"), os.Getenv("CF_ACCOUNT_ID")
kvID := os.Getenv("KV_NAMESPACE_ID")
if len(apiToken) == 0 || len(accountID) == 0 || len(kvID) == 0 {
t.Skip("No Cloudflare API keys available, skipping test")
}
testLock := memory.NewLock()
testStore, err := cloudflarestorage.New(
options.WithValue("CF_API_TOKEN", apiToken),
options.WithValue("CF_ACCOUNT_ID", accountID),
options.WithValue("KV_NAMESPACE_ID", kvID),
)
if err != nil {
t.Fatal(err.Error())
}
testStorage := NewStorage(testLock, testStore)
conf := cloudflare.NewDefaultConfig()
conf.AuthToken = apiToken
conf.ZoneToken = apiToken
testChallengeProvider, err := cloudflare.NewDNSProviderConfig(conf)
if err != nil {
t.Fatal(err.Error())
}
testProvider := New(
acme.AcceptToS(true),
acme.Cache(testStorage),
acme.CA(acme.LetsEncryptStagingCA),
acme.ChallengeProvider(testChallengeProvider),
acme.OnDemand(false),
)
listener, err := testProvider.NewListener("*.micro.mu", "micro.mu")
if err != nil {
t.Fatal(err.Error())
}
go http.Serve(listener, http.NotFoundHandler())
time.Sleep(10 * time.Minute)
}

View File

@@ -3,6 +3,7 @@ package certmagic
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"path"
"strings"
@@ -55,6 +56,9 @@ func (s *storage) Store(key string, value []byte) error {
}
func (s *storage) Load(key string) ([]byte, error) {
if !s.Exists(key) {
return nil, certmagic.ErrNotExist(errors.New(key + " doesn't exist"))
}
records, err := s.store.Read(key)
if err != nil {
return nil, err
@@ -77,7 +81,7 @@ func (s *storage) Delete(key string) error {
}
func (s *storage) Exists(key string) bool {
_, err := s.store.Read()
_, err := s.store.Read(key)
if err != nil {
return false
}
@@ -132,3 +136,11 @@ func (s *storage) Stat(key string) (certmagic.KeyInfo, error) {
IsTerminal: false,
}, nil
}
// NewStorage returns a certmagic.Storage backed by a go-micro/lock and go-micro/store
func NewStorage(lock lock.Lock, store store.Store) certmagic.Storage {
return &storage{
lock: lock,
store: store,
}
}

View File

@@ -527,7 +527,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
s, err := h.r.GetService(topic)
if err != nil {
h.RUnlock()
// ignore error
@@ -565,13 +565,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
continue
}
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes {
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
@@ -584,7 +600,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
@@ -647,6 +663,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
@@ -657,7 +675,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
}
service := &registry.Service{
Name: "topic:" + topic,
Name: topic,
Version: version,
Nodes: []*registry.Node{node},
}

View File

@@ -170,9 +170,6 @@ func (n *node) Nodes() []Node {
// GetPeerNode returns a node from node MaxDepth topology
// It returns nil if the peer was not found
func (n *node) GetPeerNode(id string) *node {
n.RLock()
defer n.RUnlock()
// get node topology up to MaxDepth
top := n.Topology(MaxDepth)
@@ -240,12 +237,9 @@ func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node {
return pruned
}
// Topology returns a copy of the node topology down to given depth
// NOTE: the returned node is a node graph - not a single node
func (n *node) Topology(depth uint) *node {
n.RLock()
defer n.RUnlock()
// getTopology traverses node graph and builds node topology
// NOTE: this function is not thread safe
func (n *node) getTopology(depth uint) *node {
// make a copy of yourself
node := &node{
id: n.id,
@@ -265,7 +259,7 @@ func (n *node) Topology(depth uint) *node {
// iterate through our peers and update the node peers
for _, peer := range n.peers {
nodePeer := peer.Topology(depth)
nodePeer := peer.getTopology(depth)
if _, ok := node.peers[nodePeer.id]; !ok {
node.peers[nodePeer.id] = nodePeer
}
@@ -274,6 +268,15 @@ func (n *node) Topology(depth uint) *node {
return node
}
// Topology returns a copy of the node topology down to given depth
// NOTE: the returned node is a node graph - not a single node
func (n *node) Topology(depth uint) *node {
n.RLock()
defer n.RUnlock()
return n.getTopology(depth)
}
// Peers returns node peers up to MaxDepth
func (n *node) Peers() []Node {
n.RLock()
@@ -281,7 +284,7 @@ func (n *node) Peers() []Node {
var peers []Node
for _, nodePeer := range n.peers {
peer := nodePeer.Topology(MaxDepth)
peer := nodePeer.getTopology(MaxDepth)
peers = append(peers, peer)
}