add handlers, storage(Postgres, sqlite) (#3)
Reviewed-on: #3 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
This commit is contained in:
218
service/client_git/client.go
Normal file
218
service/client_git/client.go
Normal file
@@ -0,0 +1,218 @@
|
||||
package client_git
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"go.unistack.org/unistack-org/pkgdash/internal"
|
||||
"go.unistack.org/unistack-org/pkgdash/models"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/go-git/go-git/v5"
|
||||
"github.com/go-git/go-git/v5/plumbing/filemode"
|
||||
"github.com/go-git/go-git/v5/plumbing/object"
|
||||
"github.com/go-git/go-git/v5/storage/memory"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate"
|
||||
"go.unistack.org/unistack-org/pkgdash/storage"
|
||||
"golang.org/x/mod/modfile"
|
||||
"golang.org/x/mod/module"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
Run(ctx context.Context, st storage.Storage) chan *pb.AddPackageReq
|
||||
IsClose() bool
|
||||
Done() <-chan struct{}
|
||||
}
|
||||
|
||||
type client struct {
|
||||
worker chan *pb.AddPackageReq
|
||||
closed bool
|
||||
lock chan struct{}
|
||||
}
|
||||
|
||||
func NewClient(cap uint) Client {
|
||||
return &client{
|
||||
make(chan *pb.AddPackageReq, cap),
|
||||
false,
|
||||
make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) Run(ctx context.Context, st storage.Storage) chan *pb.AddPackageReq {
|
||||
go func() {
|
||||
defer close(c.worker)
|
||||
for {
|
||||
select {
|
||||
case data := <-c.worker:
|
||||
go func() {
|
||||
runner(ctx, st, data)
|
||||
}()
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, ctx.Err())
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return c.worker
|
||||
}
|
||||
|
||||
func (c *client) IsClose() bool {
|
||||
return c.closed
|
||||
}
|
||||
|
||||
// Done for locked goroutine
|
||||
func (c *client) Done() <-chan struct{} {
|
||||
return c.lock
|
||||
}
|
||||
|
||||
func runner(ctx context.Context, st storage.Storage, req *pb.AddPackageReq) {
|
||||
modules, err := getGoModule(ctx, req.Url.Value)
|
||||
if err != nil {
|
||||
logger.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof(ctx, "success get list mod", modules)
|
||||
|
||||
if req.Modules, err = st.InsertButchModules(ctx, modules); err != nil {
|
||||
logger.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = st.AddPackage(ctx, req); err != nil {
|
||||
logger.Error(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
func getGoModule(ctx context.Context, gitUrl string) ([]models.Module, error) {
|
||||
u, err := url.Parse(gitUrl)
|
||||
if err != nil {
|
||||
logger.Fatal(ctx, err)
|
||||
}
|
||||
|
||||
var rev string
|
||||
if idx := strings.Index(u.Path, "@"); idx > 0 {
|
||||
rev = u.Path[idx+1:]
|
||||
}
|
||||
|
||||
cloneOpts := &git.CloneOptions{
|
||||
URL: gitUrl,
|
||||
Progress: os.Stdout,
|
||||
}
|
||||
|
||||
if len(rev) == 0 {
|
||||
cloneOpts.SingleBranch = true
|
||||
cloneOpts.Depth = 1
|
||||
}
|
||||
|
||||
if err = cloneOpts.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repo, err := git.CloneContext(ctx, memory.NewStorage(), nil, cloneOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ref, err := repo.Head()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get head: %v", err)
|
||||
}
|
||||
|
||||
commit, err := repo.CommitObject(ref.Hash())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get commit: %v", err)
|
||||
}
|
||||
|
||||
tree, err := commit.Tree()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
unique := make(map[string]models.Module)
|
||||
var mvs []module.Version
|
||||
err = tree.Files().ForEach(func(file *object.File) error {
|
||||
if file == nil {
|
||||
err = errors.New("file pointer is nil")
|
||||
logger.Error(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
switch file.Mode {
|
||||
case filemode.Regular:
|
||||
if strings.HasSuffix(file.Name, "go.mod") {
|
||||
if mvs, err = Direct(file); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range mvs {
|
||||
unique[mvs[i].Path] = models.Module{
|
||||
Name: mvs[i].Path,
|
||||
Version: mvs[i].Version,
|
||||
LastVersion: mvs[i].Version,
|
||||
}
|
||||
}
|
||||
internal.Updates(internal.UpdateOptions{
|
||||
Pre: false,
|
||||
Major: false,
|
||||
Cached: false,
|
||||
Modules: mvs,
|
||||
OnUpdate: func(u internal.Update) {
|
||||
if u.Err != nil {
|
||||
logger.Errorf(ctx, "%s: failed: %v\n", u.Module.Path, u.Err)
|
||||
} else {
|
||||
val := unique[u.Module.Path]
|
||||
val.LastVersion = u.Version
|
||||
unique[u.Module.Path] = val
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
result := make([]models.Module, 0, len(unique))
|
||||
for _, v := range unique {
|
||||
result = append(result, v)
|
||||
}
|
||||
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return result[i].Name < result[j].Name
|
||||
})
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func Direct(file *object.File) ([]module.Version, error) {
|
||||
r, err := file.Reader()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
modfile, err := modfile.ParseLax("go.mod", data, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mods []module.Version
|
||||
for _, req := range modfile.Require {
|
||||
// if !req.Indirect {
|
||||
mods = append(mods, req.Mod)
|
||||
//}
|
||||
}
|
||||
/*
|
||||
sort.Slice(mods, func(i, j int) bool {
|
||||
return mods[i].Path < mods[j].Path
|
||||
})
|
||||
*/
|
||||
return mods, nil
|
||||
}
|
86
service/client_git/client_test.go
Normal file
86
service/client_git/client_test.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package client_git
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate"
|
||||
"go.unistack.org/unistack-org/pkgdash/storage"
|
||||
"go.unistack.org/unistack-org/pkgdash/storage/postgres"
|
||||
"go.unistack.org/unistack-org/pkgdash/storage/sqlite"
|
||||
"google.golang.org/protobuf/types/known/wrapperspb"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClientPG(t *testing.T) {
|
||||
dsn := fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=disable", "test", "123", "localhost", "5432", "postgres")
|
||||
conn, err := sql.Open("postgres", dsn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
if err = conn.Ping(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
st, err := postgres.NewStorage(conn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, ok := st.(storage.Storage)
|
||||
if !ok {
|
||||
t.Fatal("typecast error")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_ = cancel
|
||||
cli := NewClient(1)
|
||||
|
||||
ch := cli.Run(ctx, s)
|
||||
|
||||
data := &pb.AddPackageReq{
|
||||
Name: wrapperspb.String("test"),
|
||||
Url: wrapperspb.String("https://github.com/dantedenis/service_history.git"),
|
||||
}
|
||||
|
||||
ch <- data
|
||||
|
||||
<-cli.Done()
|
||||
}
|
||||
|
||||
func TestClientLite(t *testing.T) {
|
||||
conn, err := sql.Open("sqlite3", "../../identifier.sqlite")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
if err = conn.Ping(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
st, err := sqlite.NewStorage(conn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, ok := st.(storage.Storage)
|
||||
if !ok {
|
||||
t.Fatal("typecast error")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_ = cancel
|
||||
cli := NewClient(1)
|
||||
|
||||
ch := cli.Run(ctx, s)
|
||||
|
||||
data := &pb.AddPackageReq{
|
||||
Name: wrapperspb.String("test"),
|
||||
Url: wrapperspb.String("https://github.com/dantedenis/service_history.git"),
|
||||
}
|
||||
|
||||
ch <- data
|
||||
|
||||
<-cli.Done()
|
||||
}
|
@@ -6,13 +6,13 @@ import (
|
||||
intcfg "go.unistack.org/unistack-org/pkgdash/config"
|
||||
"go.unistack.org/unistack-org/pkgdash/handler"
|
||||
"go.unistack.org/unistack-org/pkgdash/handler/encoders"
|
||||
"go.unistack.org/unistack-org/pkgdash/service/client_git"
|
||||
"net/http"
|
||||
|
||||
//pbmicro "go.unistack.org/unistack-org/pkgdash/proto/micro"
|
||||
"go.unistack.org/unistack-org/pkgdash/storage"
|
||||
|
||||
cmsservice "go.unistack.org/cms-service"
|
||||
grpcsrv "go.unistack.org/micro-server-grpc/v3"
|
||||
"go.unistack.org/micro/v3"
|
||||
"go.unistack.org/micro/v3/config"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
@@ -43,12 +43,7 @@ func NewService(ctx context.Context) (micro.Service, error) {
|
||||
router = microuter.NewRouter()
|
||||
}
|
||||
|
||||
mgsrv := grpcsrv.NewServer(
|
||||
server.Register(reg),
|
||||
)
|
||||
|
||||
svc := micro.NewService(
|
||||
micro.Server(mgsrv),
|
||||
micro.Register(reg),
|
||||
micro.Router(router),
|
||||
micro.Config(cs...),
|
||||
@@ -59,28 +54,28 @@ func NewService(ctx context.Context) (micro.Service, error) {
|
||||
logger.Fatalf(ctx, "failed init writer: %v", err)
|
||||
}
|
||||
|
||||
h := handler.NewHandler(svc, writer)
|
||||
h := handler.NewHandler(svc, writer, client_git.NewClient(5))
|
||||
|
||||
if err := svc.Init(
|
||||
if err = svc.Init(
|
||||
micro.AfterStart(func(_ context.Context) error {
|
||||
return h.Init(svc.Options().Context)
|
||||
}),
|
||||
micro.BeforeStart(func(ctx context.Context) error {
|
||||
if err := config.Load(ctx, cs, config.LoadOverride(true)); err != nil {
|
||||
if err = config.Load(ctx, cs, config.LoadOverride(true)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := config.Validate(ctx, cfg); err != nil {
|
||||
if err = config.Validate(ctx, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := svc.Init(
|
||||
if err = svc.Init(
|
||||
micro.Name(cfg.Service.Name),
|
||||
micro.Version(cfg.Service.Version),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := svc.Server("http").Init(
|
||||
if err = svc.Server("http").Init(
|
||||
server.Address(cfg.App.Address),
|
||||
server.Name(cfg.Service.Name),
|
||||
server.Version(cfg.Service.Version),
|
||||
@@ -133,9 +128,11 @@ func NewService(ctx context.Context) (micro.Service, error) {
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.HandleFunc("/listPackage", h.ListPackage)
|
||||
mux.HandleFunc("/updateInfo", h.UpdateInfo)
|
||||
mux.HandleFunc("/addComment", h.AddComment)
|
||||
mux.HandleFunc("/listPackage", handler.Methods(http.MethodGet, h.ListPackage))
|
||||
mux.HandleFunc("/updatePackage", handler.Methods(http.MethodPost, h.UpdatePackage))
|
||||
mux.HandleFunc("/addComment", handler.Methods(http.MethodPut, h.AddComment))
|
||||
mux.HandleFunc("/addPackage", handler.Methods(http.MethodPost, h.AddPackage))
|
||||
mux.HandleFunc("/getModule", handler.Methods(http.MethodGet, h.GetModule))
|
||||
|
||||
if err = svc.Server().Handle(svc.Server().NewHandler(mux)); err != nil {
|
||||
logger.Fatalf(ctx, "failed to register handler: %v", err)
|
||||
|
Reference in New Issue
Block a user