Compare commits
6 Commits
bcc06054f1
...
v3.10.21
Author | SHA1 | Date | |
---|---|---|---|
ce125b77c1 | |||
2ee8d4ed46 | |||
f58781d076 | |||
e1af4aa3a4 | |||
1d5e795443 | |||
a3a434d923 |
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
@@ -85,33 +86,12 @@ type Event interface {
|
||||
SetError(err error)
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||
}
|
||||
*m = append((*m)[0:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Message is used to transfer data
|
||||
type Message struct {
|
||||
// Header contains message metadata
|
||||
Header metadata.Metadata
|
||||
// Body contains message body
|
||||
Body RawMessage
|
||||
Body codec.RawMessage
|
||||
}
|
||||
|
||||
// NewMessage create broker message with topic filled
|
||||
|
@@ -84,3 +84,24 @@ func MarshalAppend(buf []byte, c Codec, v interface{}, opts ...Option) ([]byte,
|
||||
|
||||
return append(buf, mbuf...), nil
|
||||
}
|
||||
|
||||
// RawMessage is a raw encoded JSON value.
|
||||
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||
type RawMessage []byte
|
||||
|
||||
// MarshalJSON returns m as the JSON encoding of m.
|
||||
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||
if m == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return *m, nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON sets *m to a copy of data.
|
||||
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||
if m == nil {
|
||||
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||
}
|
||||
*m = append((*m)[0:0], data...)
|
||||
return nil
|
||||
}
|
||||
|
@@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
cx := config.Context
|
||||
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
n.registered = true
|
||||
if cacheService {
|
||||
n.rsvc = service
|
||||
@@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.subscribe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
t := new(time.Ticker)
|
||||
|
||||
@@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) subscribe() error {
|
||||
config := n.Options()
|
||||
|
||||
cx := config.Context
|
||||
var err error
|
||||
var sub broker.Subscriber
|
||||
|
||||
for sb := range n.subscribers {
|
||||
if sb.Options().Context != nil {
|
||||
cx = sb.Options().Context
|
||||
}
|
||||
|
||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
if sb.Options().Batch {
|
||||
// batch processing handler
|
||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||
} else {
|
||||
// single processing handler
|
||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noopServer) Stop() error {
|
||||
n.RLock()
|
||||
if !n.started {
|
||||
|
@@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||
return func(ps broker.Events) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
@@ -2,7 +2,9 @@ package time
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -13,39 +15,43 @@ func ParseDuration(s string) (time.Duration, error) {
|
||||
return 0, fmt.Errorf(`time: invalid duration "` + s + `"`)
|
||||
}
|
||||
|
||||
//var sb strings.Builder
|
||||
/*
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 'd':
|
||||
n, err := strconv.Atoi(s[idx:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
s[idx:i] = fmt.Sprintf("%d", n*24)
|
||||
default:
|
||||
sb.WriteRune(r)
|
||||
var p int
|
||||
var hours int
|
||||
loop:
|
||||
for i, r := range s {
|
||||
switch r {
|
||||
case 's', 'm':
|
||||
p = i
|
||||
break loop
|
||||
case 'h':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
}
|
||||
*/
|
||||
var td time.Duration
|
||||
var err error
|
||||
switch s[len(s)-1] {
|
||||
case 's', 'm', 'h':
|
||||
td, err = time.ParseDuration(s)
|
||||
case 'd':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
td *= 24
|
||||
}
|
||||
case 'y':
|
||||
if td, err = time.ParseDuration(s[:len(s)-1] + "h"); err == nil {
|
||||
year := time.Date(time.Now().Year(), time.December, 31, 0, 0, 0, 0, time.Local)
|
||||
days := year.YearDay()
|
||||
td *= 24 * time.Duration(days)
|
||||
hours += d
|
||||
p = i + 1
|
||||
case 'd':
|
||||
d, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
case 'y':
|
||||
n, err := strconv.Atoi(s[p:i])
|
||||
if err != nil {
|
||||
return 0, errors.New("time: invalid duration " + s)
|
||||
}
|
||||
var d int
|
||||
for j := n - 1; j >= 0; j-- {
|
||||
d += time.Date(time.Now().Year()+j, time.December, 31, 0, 0, 0, 0, time.Local).YearDay()
|
||||
}
|
||||
hours += d * 24
|
||||
p = i + 1
|
||||
}
|
||||
}
|
||||
|
||||
return td, err
|
||||
return time.ParseDuration(fmt.Sprintf("%dh%s", hours, s[p:]))
|
||||
}
|
||||
|
||||
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||
|
@@ -35,15 +35,14 @@ func TestUnmarshalJSON(t *testing.T) {
|
||||
func TestParseDuration(t *testing.T) {
|
||||
var td time.Duration
|
||||
var err error
|
||||
t.Skip()
|
||||
|
||||
td, err = ParseDuration("14d4h")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
}
|
||||
if td.String() != "336h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 336h0m0s : %s", td.String())
|
||||
if td.String() != "340h0m0s" {
|
||||
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
|
||||
}
|
||||
|
||||
td, err = ParseDuration("1y")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDuration error: %v", err)
|
||||
|
Reference in New Issue
Block a user