Compare commits

...

7 Commits

Author SHA1 Message Date
8e09142b90 Merge pull request 'check subscribe errors' (#162) from subscribeerr into v3
Reviewed-on: #162
2023-05-13 16:09:47 +03:00
13bf2bbb1e check subscribe errors
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-13 16:09:21 +03:00
ba292901d7 fixup build
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 21:14:55 +03:00
6d1b6c7014 allow to expose some method via http.HandlerFunc
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 21:13:11 +03:00
1276f99159 export Server to allow to cast
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-09 21:12:47 +03:00
ee5cbbbcb6 Merge pull request 'issue-155: add swagger-ui handler' (#157) from swaggerui into v3
Reviewed-on: #157
2023-05-04 06:51:42 +03:00
ce0abd2de3 issue-155: add swagger-ui handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-05-04 06:48:09 +03:00
18 changed files with 636 additions and 50 deletions

View File

@@ -60,7 +60,280 @@ func (h *httpHandler) Options() server.HandlerOptions {
return h.opts return h.opts
} }
func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Server) HTTPHandlerFunc(handler interface{}) (http.HandlerFunc, error) {
if handler == nil {
return nil, fmt.Errorf("invalid handler specified: %v", handler)
}
rtype := reflect.TypeOf(handler)
if rtype.NumIn() != 3 {
return nil, fmt.Errorf("invalid handler, NumIn != 3: %v", rtype.NumIn())
}
argType := rtype.In(1)
replyType := rtype.In(2)
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("invalid handler, argument type not exported: %v", argType)
}
if replyType.Kind() != reflect.Ptr {
return nil, fmt.Errorf("invalid handler, reply type not a pointer: %v", replyType)
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
return nil, fmt.Errorf("invalid handler, reply type not exported: %v", replyType)
}
if rtype.NumOut() != 1 {
return nil, fmt.Errorf("invalid handler, has wrong number of outs: %v", rtype.NumOut())
}
// The return type of the method must be error.
if returnType := rtype.Out(0); returnType != typeOfError {
return nil, fmt.Errorf("invalid handler, returns %v not error", returnType.String())
}
return func(w http.ResponseWriter, r *http.Request) {
ct := DefaultContentType
if htype := r.Header.Get(metadata.HeaderContentType); htype != "" {
ct = htype
}
ctx := context.WithValue(r.Context(), rspCodeKey{}, &rspCodeVal{})
ctx = context.WithValue(ctx, rspHeaderKey{}, &rspHeaderVal{})
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(len(r.Header) + 8)
}
for k, v := range r.Header {
md[k] = strings.Join(v, ", ")
}
md["RemoteAddr"] = r.RemoteAddr
md["Method"] = r.Method
md["URL"] = r.URL.String()
md["Proto"] = r.Proto
md["ContentLength"] = fmt.Sprintf("%d", r.ContentLength)
md["TransferEncoding"] = strings.Join(r.TransferEncoding, ",")
md["Host"] = r.Host
md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md)
path := r.URL.Path
if r.Body != nil {
defer r.Body.Close()
}
matches := make(map[string]interface{})
var match bool
var hldr *patHandler
var handler *httpHandler
for _, shdlr := range h.handlers {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(r.Method, path)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
break
} else if err == rhttp.ErrMethodNotAllowed && !h.registerRPC {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = w.Write([]byte("not matching route found"))
return
}
}
if !match && h.registerRPC {
microMethod, mok := md.Get(metadata.HeaderEndpoint)
if mok {
serviceMethod := strings.Split(microMethod, ".")
if len(serviceMethod) == 2 {
if shdlr, ok := h.handlers[serviceMethod[0]]; ok {
hdlr := shdlr.(*httpHandler)
fh, mp, err := hdlr.handlers.Search(http.MethodPost, "/"+microMethod)
if err == nil {
match = true
for k, v := range mp {
matches[k] = v
}
hldr = fh.(*patHandler)
handler = hdlr
}
}
}
}
}
// get fields from url values
if len(r.URL.RawQuery) > 0 {
umd, cerr := rflutil.URLMap(r.URL.RawQuery)
if cerr != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(cerr.Error()))
return
}
for k, v := range umd {
matches[k] = v
}
}
cf, err := h.newCodec(ct)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var argv, replyv reflect.Value
// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if hldr.mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(hldr.mtype.ArgType.Elem())
} else {
argv = reflect.New(hldr.mtype.ArgType)
argIsValue = true
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(hldr.mtype.ReplyType.Elem())
function := hldr.mtype.method.Func
var returnValues []reflect.Value
if r.Body != nil {
var buf []byte
buf, err = io.ReadAll(r.Body)
if err != nil && err != io.EOF {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
}
matches = rflutil.FlattenMap(matches)
if err = rflutil.Merge(argv.Interface(), matches, rflutil.SliceAppend(true), rflutil.Tags([]string{"protobuf", "json"})); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return
}
hr := &rpcRequest{
codec: cf,
service: handler.sopts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", hldr.name, hldr.mtype.method.Name),
payload: argv.Interface(),
header: md,
}
// define the handler func
fn := func(fctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{hldr.rcvr, hldr.mtype.prepareContext(fctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(0)
}
if nmd, ok := metadata.FromOutgoingContext(fctx); ok {
for k, v := range nmd {
md.Set(k, v)
}
}
metadata.SetOutgoingContext(ctx, md)
return err
}
// wrap the handler func
for i := len(handler.sopts.HdlrWrappers); i > 0; i-- {
fn = handler.sopts.HdlrWrappers[i-1](fn)
}
if ct == "application/x-www-form-urlencoded" {
cf, err = h.newCodec(DefaultContentType)
if err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
ct = DefaultContentType
}
scode := int(200)
appErr := fn(ctx, hr, replyv.Interface())
w.Header().Set(metadata.HeaderContentType, ct)
if md, ok := metadata.FromOutgoingContext(ctx); ok {
for k, v := range md {
w.Header().Set(k, v)
}
}
if md := getRspHeader(ctx); md != nil {
for k, v := range md {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
}
if nct := w.Header().Get(metadata.HeaderContentType); nct != ct {
if cf, err = h.newCodec(nct); err != nil {
h.errorHandler(ctx, nil, w, r, err, http.StatusBadRequest)
return
}
}
var buf []byte
if appErr != nil {
switch verr := appErr.(type) {
case *errors.Error:
scode = int(verr.Code)
buf, err = cf.Marshal(verr)
case *Error:
buf, err = cf.Marshal(verr.err)
default:
buf, err = cf.Marshal(appErr)
}
} else {
buf, err = cf.Marshal(replyv.Interface())
}
if err != nil && handler.sopts.Logger.V(logger.ErrorLevel) {
handler.sopts.Logger.Errorf(handler.sopts.Context, "handler err: %v", err)
return
}
if nscode := GetRspCode(ctx); nscode != 0 {
scode = nscode
}
w.WriteHeader(scode)
if _, cerr := w.Write(buf); cerr != nil {
handler.sopts.Logger.Errorf(ctx, "write failed: %v", cerr)
}
}, nil
}
func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// check for http.HandlerFunc handlers // check for http.HandlerFunc handlers
if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil { if ph, _, err := h.pathHandlers.Search(r.Method, r.URL.Path); err == nil {
ph.(http.HandlerFunc)(w, r) ph.(http.HandlerFunc)(w, r)
@@ -91,7 +364,9 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
md["RequestURI"] = r.RequestURI md["RequestURI"] = r.RequestURI
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
defer r.Body.Close() if r.Body != nil {
defer r.Body.Close()
}
path := r.URL.Path path := r.URL.Path
if !strings.HasPrefix(path, "/") { if !strings.HasPrefix(path, "/") {
@@ -192,15 +467,18 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
function := hldr.mtype.method.Func function := hldr.mtype.method.Func
var returnValues []reflect.Value var returnValues []reflect.Value
buf, err := io.ReadAll(r.Body) if r.Body != nil {
if err != nil && err != io.EOF { var buf []byte
h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError) buf, err = io.ReadAll(r.Body)
return if err != nil && err != io.EOF {
} h.errorHandler(ctx, handler, w, r, err, http.StatusInternalServerError)
return
}
if err = cf.Unmarshal(buf, argv.Interface()); err != nil { if err = cf.Unmarshal(buf, argv.Interface()); err != nil {
h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest) h.errorHandler(ctx, handler, w, r, err, http.StatusBadRequest)
return return
}
} }
matches = rflutil.FlattenMap(matches) matches = rflutil.FlattenMap(matches)
@@ -279,6 +557,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
var buf []byte
if appErr != nil { if appErr != nil {
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:

12
handler/generate.go Normal file
View File

@@ -0,0 +1,12 @@
package handler
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
)
//go:generate sh -c "curl -L https://github.com/swagger-api/swagger-ui/archive/refs/tags/v4.18.3.zip -o - | bsdtar -C swagger-ui --strip-components=2 -xv swagger-ui-4.18.3/dist && rm swagger-ui/*.map swagger-ui/*-es-*.js swagger-ui/swagger-ui.js swagger-ui/swagger-initializer.js"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./meter/meter.proto"
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ ./health/health.proto"

View File

@@ -1,8 +0,0 @@
package health
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ health.proto"
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
)

View File

@@ -1,8 +0,0 @@
package meter
//go:generate sh -c "protoc -I./ -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) --go-micro_out='components=micro|http|server',standalone=false,debug=true,paths=source_relative:./ meter.proto"
import (
// import required packages
_ "go.unistack.org/micro-proto/v3/openapiv3"
)

Binary file not shown.

After

Width:  |  Height:  |  Size: 665 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 628 B

View File

@@ -0,0 +1,16 @@
html {
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after {
box-sizing: inherit;
}
body {
margin: 0;
background: #fafafa;
}

View File

@@ -0,0 +1,19 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script src="./swagger-initializer.js" charset="UTF-8"> </script>
</body>
</html>

View File

@@ -0,0 +1,79 @@
<!doctype html>
<html lang="en-US">
<head>
<title>Swagger UI: OAuth2 Redirect</title>
</head>
<body>
<script>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1).replace('?', '&');
} else {
qp = location.search.substring(1);
}
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
}
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server. The passed state wasn't returned from auth server."
});
}
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
}
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server."
});
}
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
}
window.close();
}
if (document.readyState !== 'loading') {
run();
} else {
document.addEventListener('DOMContentLoaded', function () {
run();
});
}
</script>
</body>
</html>

View File

@@ -0,0 +1,20 @@
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
// the following lines will be replaced by docker/configurator, when it runs in a docker-container
window.ui = SwaggerUIBundle({
url: "{{ .SWAGGER }}",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout"
});
//</editor-fold>
};

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,142 @@
package swaggerui // import "go.unistack.org/micro-server-http/v3/handler/swagger-ui"
import (
"embed"
"html/template"
"net/http"
"path"
"reflect"
)
//go:embed *.js *.css *.html *.png
var assets embed.FS
var (
Handler = func(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || path.Base(r.URL.Path) != "swagger-initializer.js" {
http.StripPrefix(prefix, http.FileServer(http.FS(assets))).ServeHTTP(w, r)
return
}
tpl := template.New("swagger-initializer.js").Funcs(TemplateFuncs)
ptpl, err := tpl.Parse(Template)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
if err := ptpl.Execute(w, Config); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
}
}
TemplateFuncs = template.FuncMap{
"isInt": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Int, reflect.Int8, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
return true
default:
return false
}
},
"isBool": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Bool:
return true
default:
return false
}
},
"isString": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.String:
return true
default:
return false
}
},
"isSlice": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Slice:
return true
default:
return false
}
},
"isMap": func(i interface{}) bool {
v := reflect.ValueOf(i)
switch v.Kind() {
case reflect.Map:
return true
default:
return false
}
},
}
Template = `
window.onload = function() {
//<editor-fold desc="Changeable Configuration Block">
window.ui = SwaggerUIBundle({
{{- range $k, $v := . }}
{{- if (eq (printf "%s" $v) "") -}}
{{- continue -}}
{{ end }}
{{ $k }}: {{ if isBool $v -}}
{{- $v -}},
{{- else if isInt $v -}}
{{- $v -}},
{{- else if isString $v -}}
"{{- $v -}}",
{{- else if and (isSlice $v) (or (eq (printf "%s" $k) "presets") (eq (printf "%s" $k) "plugins")) -}}
[
{{- range $v }}
{{ . }},
{{- end }}
],
{{- end -}}
{{ end }}
});
//</editor-fold>
};`
Config = map[string]interface{}{
"configUrl": "",
"dom_id": "#swagger-ui",
/*
"domNode": "",
"spec": "",
"urls": []interface{}{
map[string]interface{}{
"url": "",
"name": "",
},
},
},
*/
"url": "https://petstore.swagger.io/v2/swagger.json",
"deepLinking": true,
"displayOperationId": false,
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
"displayRequestDuration": true,
"filter": true,
"operationsSorter": "alpha",
"showExtensions": true,
"tryItOutEnabled": true,
"presets": []string{
"SwaggerUIBundle.presets.apis",
"SwaggerUIStandalonePreset",
},
"plugins": []string{
"SwaggerUIBundle.plugins.DownloadUrl",
},
"layout": "StandaloneLayout",
}
)

View File

@@ -0,0 +1,15 @@
package swaggerui
import (
"net/http"
"testing"
)
func TestTemplate(t *testing.T) {
t.Skip()
h := http.NewServeMux()
h.HandleFunc("/", Handler(""))
if err := http.ListenAndServe(":8080", h); err != nil {
t.Fatal(err)
}
}

53
http.go
View File

@@ -23,9 +23,9 @@ import (
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
) )
var _ server.Server = &httpServer{} var _ server.Server = (*Server)(nil)
type httpServer struct { type Server struct {
hd server.Handler hd server.Handler
rsvc *register.Service rsvc *register.Service
handlers map[string]server.Handler handlers map[string]server.Handler
@@ -40,7 +40,7 @@ type httpServer struct {
init bool init bool
} }
func (h *httpServer) newCodec(ct string) (codec.Codec, error) { func (h *Server) newCodec(ct string) (codec.Codec, error) {
if idx := strings.IndexRune(ct, ';'); idx >= 0 { if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx] ct = ct[:idx]
} }
@@ -53,14 +53,14 @@ func (h *httpServer) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (h *httpServer) Options() server.Options { func (h *Server) Options() server.Options {
h.Lock() h.Lock()
opts := h.opts opts := h.opts
h.Unlock() h.Unlock()
return opts return opts
} }
func (h *httpServer) Init(opts ...server.Option) error { func (h *Server) Init(opts ...server.Option) error {
if len(opts) == 0 && h.init { if len(opts) == 0 && h.init {
return nil return nil
} }
@@ -130,7 +130,7 @@ func (h *httpServer) Init(opts ...server.Option) error {
return nil return nil
} }
func (h *httpServer) Handle(handler server.Handler) error { func (h *Server) Handle(handler server.Handler) error {
// passed unknown handler // passed unknown handler
hdlr, ok := handler.(*httpHandler) hdlr, ok := handler.(*httpHandler)
if !ok { if !ok {
@@ -159,7 +159,7 @@ func (h *httpServer) Handle(handler server.Handler) error {
return nil return nil
} }
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...) options := server.NewHandlerOptions(opts...)
eps := make([]*register.Endpoint, 0, len(options.Metadata)) eps := make([]*register.Endpoint, 0, len(options.Metadata))
@@ -288,11 +288,11 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
return hdlr return hdlr
} }
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, handler, opts...) return newSubscriber(topic, handler, opts...)
} }
func (h *httpServer) Subscribe(sb server.Subscriber) error { func (h *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*httpSubscriber) sub, ok := sb.(*httpSubscriber)
if !ok { if !ok {
return fmt.Errorf("invalid subscriber: expected *httpSubscriber") return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
@@ -317,7 +317,7 @@ func (h *httpServer) Subscribe(sb server.Subscriber) error {
return nil return nil
} }
func (h *httpServer) Register() error { func (h *Server) Register() error {
var eps []*register.Endpoint var eps []*register.Endpoint
h.RLock() h.RLock()
for _, hdlr := range h.handlers { for _, hdlr := range h.handlers {
@@ -378,6 +378,17 @@ func (h *httpServer) Register() error {
} }
h.Lock() h.Lock()
h.registered = true
h.rsvc = service
h.Unlock()
return nil
}
func (h *Server) subscribe() error {
config := h.opts
for sb := range h.subscribers { for sb := range h.subscribers {
handler := h.createSubHandler(sb, config) handler := h.createSubHandler(sb, config)
var opts []broker.SubscribeOption var opts []broker.SubscribeOption
@@ -401,14 +412,10 @@ func (h *httpServer) Register() error {
h.subscribers[sb] = []broker.Subscriber{sub} h.subscribers[sb] = []broker.Subscriber{sub}
} }
h.registered = true
h.rsvc = service
h.Unlock()
return nil return nil
} }
func (h *httpServer) Deregister() error { func (h *Server) Deregister() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@@ -456,7 +463,7 @@ func (h *httpServer) Deregister() error {
return nil return nil
} }
func (h *httpServer) Start() error { func (h *Server) Start() error {
h.RLock() h.RLock()
config := h.opts config := h.opts
h.RUnlock() h.RUnlock()
@@ -539,6 +546,10 @@ func (h *httpServer) Start() error {
} }
} }
if err := h.subscribe(); err != nil {
return err
}
fn := handler fn := handler
if h.opts.Context != nil { if h.opts.Context != nil {
@@ -636,27 +647,27 @@ func (h *httpServer) Start() error {
return nil return nil
} }
func (h *httpServer) Stop() error { func (h *Server) Stop() error {
ch := make(chan error) ch := make(chan error)
h.exit <- ch h.exit <- ch
return <-ch return <-ch
} }
func (h *httpServer) String() string { func (h *Server) String() string {
return "http" return "http"
} }
func (h *httpServer) Name() string { func (h *Server) Name() string {
return h.opts.Name return h.opts.Name
} }
func NewServer(opts ...server.Option) *httpServer { func NewServer(opts ...server.Option) *Server {
options := server.NewOptions(opts...) options := server.NewOptions(opts...)
eh := DefaultErrorHandler eh := DefaultErrorHandler
if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil { if v, ok := options.Context.Value(errorHandlerKey{}).(errorHandler); ok && v != nil {
eh = v eh = v
} }
return &httpServer{ return &Server{
opts: options, opts: options,
exit: make(chan chan error), exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber), subscribers: make(map[*httpSubscriber][]broker.Subscriber),

View File

@@ -85,8 +85,8 @@ func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
type serverKey struct{} type serverKey struct{}
// Server provide ability to pass *http.Server // HTTPServer provide ability to pass *http.Server
func Server(hs *http.Server) server.Option { func HTTPServer(hs *http.Server) server.Option {
return server.SetOption(serverKey{}, hs) return server.SetOption(serverKey{}, hs)
} }

View File

@@ -101,7 +101,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
} }
} }
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { func (s *Server) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Event) error { return func(p broker.Event) error {
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]