Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
d3140c0fc2 | ||
|
3d5d9be02a | ||
|
5c38f38dd9 | ||
|
63fd8b9d1b | ||
|
3aedea4c56 | ||
|
0da9dff077 | ||
|
05774f2c76 | ||
|
4885bba2ac |
@@ -3,7 +3,9 @@ package certmagic
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/mholt/certmagic"
|
||||
|
||||
@@ -15,6 +17,7 @@ type certmagicProvider struct {
|
||||
}
|
||||
|
||||
func (c *certmagicProvider) NewListener(ACMEHosts ...string) (net.Listener, error) {
|
||||
certmagic.Default.CA = c.opts.CA
|
||||
if c.opts.ChallengeProvider != nil {
|
||||
// Enabling DNS Challenge disables the other challenges
|
||||
certmagic.Default.DNSProvider = c.opts.ChallengeProvider
|
||||
@@ -22,6 +25,16 @@ func (c *certmagicProvider) NewListener(ACMEHosts ...string) (net.Listener, erro
|
||||
if c.opts.OnDemand {
|
||||
certmagic.Default.OnDemand = new(certmagic.OnDemandConfig)
|
||||
}
|
||||
if c.opts.Cache != nil {
|
||||
// already validated by new()
|
||||
certmagic.Default.Storage = c.opts.Cache.(certmagic.Storage)
|
||||
}
|
||||
// If multiple instances of the provider are running, inject some
|
||||
// randomness so they don't collide
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randomDuration := (7 * 24 * time.Hour) + (time.Duration(rand.Intn(504)) * time.Hour)
|
||||
certmagic.Default.RenewDurationBefore = randomDuration
|
||||
|
||||
return certmagic.Listen(ACMEHosts)
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package certmagic
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
@@ -185,3 +186,46 @@ func TestStorageImplementation(t *testing.T) {
|
||||
// happens
|
||||
New(acme.Cache(s))
|
||||
}
|
||||
|
||||
// Full test with a real zone, with against LE staging
|
||||
func TestE2e(t *testing.T) {
|
||||
apiToken, accountID := os.Getenv("CF_API_TOKEN"), os.Getenv("CF_ACCOUNT_ID")
|
||||
kvID := os.Getenv("KV_NAMESPACE_ID")
|
||||
if len(apiToken) == 0 || len(accountID) == 0 || len(kvID) == 0 {
|
||||
t.Skip("No Cloudflare API keys available, skipping test")
|
||||
}
|
||||
|
||||
testLock := memory.NewLock()
|
||||
testStore, err := cloudflarestorage.New(
|
||||
options.WithValue("CF_API_TOKEN", apiToken),
|
||||
options.WithValue("CF_ACCOUNT_ID", accountID),
|
||||
options.WithValue("KV_NAMESPACE_ID", kvID),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
testStorage := NewStorage(testLock, testStore)
|
||||
|
||||
conf := cloudflare.NewDefaultConfig()
|
||||
conf.AuthToken = apiToken
|
||||
conf.ZoneToken = apiToken
|
||||
testChallengeProvider, err := cloudflare.NewDNSProviderConfig(conf)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
testProvider := New(
|
||||
acme.AcceptToS(true),
|
||||
acme.Cache(testStorage),
|
||||
acme.CA(acme.LetsEncryptStagingCA),
|
||||
acme.ChallengeProvider(testChallengeProvider),
|
||||
acme.OnDemand(false),
|
||||
)
|
||||
|
||||
listener, err := testProvider.NewListener("*.micro.mu", "micro.mu")
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
go http.Serve(listener, http.NotFoundHandler())
|
||||
time.Sleep(10 * time.Minute)
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package certmagic
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
@@ -55,6 +56,9 @@ func (s *storage) Store(key string, value []byte) error {
|
||||
}
|
||||
|
||||
func (s *storage) Load(key string) ([]byte, error) {
|
||||
if !s.Exists(key) {
|
||||
return nil, certmagic.ErrNotExist(errors.New(key + " doesn't exist"))
|
||||
}
|
||||
records, err := s.store.Read(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -77,7 +81,7 @@ func (s *storage) Delete(key string) error {
|
||||
}
|
||||
|
||||
func (s *storage) Exists(key string) bool {
|
||||
_, err := s.store.Read()
|
||||
_, err := s.store.Read(key)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@@ -132,3 +136,11 @@ func (s *storage) Stat(key string) (certmagic.KeyInfo, error) {
|
||||
IsTerminal: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewStorage returns a certmagic.Storage backed by a go-micro/lock and go-micro/store
|
||||
func NewStorage(lock lock.Lock, store store.Store) certmagic.Storage {
|
||||
return &storage{
|
||||
lock: lock,
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
@@ -527,7 +527,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
|
||||
// now attempt to get the service
|
||||
h.RLock()
|
||||
s, err := h.r.GetService("topic:" + topic)
|
||||
s, err := h.r.GetService(topic)
|
||||
if err != nil {
|
||||
h.RUnlock()
|
||||
// ignore error
|
||||
@@ -565,13 +565,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
continue
|
||||
}
|
||||
|
||||
var nodes []*registry.Node
|
||||
|
||||
for _, node := range service.Nodes {
|
||||
// only use nodes tagged with broker http
|
||||
if node.Metadata["broker"] != "http" {
|
||||
continue
|
||||
}
|
||||
|
||||
// look for nodes for the topic
|
||||
if node.Metadata["topic"] != topic {
|
||||
continue
|
||||
}
|
||||
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
|
||||
switch service.Version {
|
||||
// broadcast version means broadcast to all nodes
|
||||
case broadcastVersion:
|
||||
var success bool
|
||||
|
||||
// publish to all nodes
|
||||
for _, node := range service.Nodes {
|
||||
for _, node := range nodes {
|
||||
// publish async
|
||||
if err := pub(node, topic, b); err == nil {
|
||||
success = true
|
||||
@@ -584,7 +600,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
}
|
||||
default:
|
||||
// select node to publish to
|
||||
node := service.Nodes[rand.Int()%len(service.Nodes)]
|
||||
node := nodes[rand.Int()%len(nodes)]
|
||||
|
||||
// publish async to one node
|
||||
if err := pub(node, topic, b); err != nil {
|
||||
@@ -647,6 +663,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
Address: mnet.HostPort(addr, port),
|
||||
Metadata: map[string]string{
|
||||
"secure": fmt.Sprintf("%t", secure),
|
||||
"broker": "http",
|
||||
"topic": topic,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -657,7 +675,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
}
|
||||
|
||||
service := ®istry.Service{
|
||||
Name: "topic:" + topic,
|
||||
Name: topic,
|
||||
Version: version,
|
||||
Nodes: []*registry.Node{node},
|
||||
}
|
||||
|
@@ -170,9 +170,6 @@ func (n *node) Nodes() []Node {
|
||||
// GetPeerNode returns a node from node MaxDepth topology
|
||||
// It returns nil if the peer was not found
|
||||
func (n *node) GetPeerNode(id string) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
// get node topology up to MaxDepth
|
||||
top := n.Topology(MaxDepth)
|
||||
|
||||
@@ -240,12 +237,9 @@ func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node {
|
||||
return pruned
|
||||
}
|
||||
|
||||
// Topology returns a copy of the node topology down to given depth
|
||||
// NOTE: the returned node is a node graph - not a single node
|
||||
func (n *node) Topology(depth uint) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
// getTopology traverses node graph and builds node topology
|
||||
// NOTE: this function is not thread safe
|
||||
func (n *node) getTopology(depth uint) *node {
|
||||
// make a copy of yourself
|
||||
node := &node{
|
||||
id: n.id,
|
||||
@@ -265,7 +259,7 @@ func (n *node) Topology(depth uint) *node {
|
||||
|
||||
// iterate through our peers and update the node peers
|
||||
for _, peer := range n.peers {
|
||||
nodePeer := peer.Topology(depth)
|
||||
nodePeer := peer.getTopology(depth)
|
||||
if _, ok := node.peers[nodePeer.id]; !ok {
|
||||
node.peers[nodePeer.id] = nodePeer
|
||||
}
|
||||
@@ -274,6 +268,15 @@ func (n *node) Topology(depth uint) *node {
|
||||
return node
|
||||
}
|
||||
|
||||
// Topology returns a copy of the node topology down to given depth
|
||||
// NOTE: the returned node is a node graph - not a single node
|
||||
func (n *node) Topology(depth uint) *node {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
return n.getTopology(depth)
|
||||
}
|
||||
|
||||
// Peers returns node peers up to MaxDepth
|
||||
func (n *node) Peers() []Node {
|
||||
n.RLock()
|
||||
@@ -281,7 +284,7 @@ func (n *node) Peers() []Node {
|
||||
|
||||
var peers []Node
|
||||
for _, nodePeer := range n.peers {
|
||||
peer := nodePeer.Topology(MaxDepth)
|
||||
peer := nodePeer.getTopology(MaxDepth)
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user