diff --git a/handler/handlers.go b/handler/handlers.go index d8a6695..0ce644b 100644 --- a/handler/handlers.go +++ b/handler/handlers.go @@ -2,16 +2,16 @@ package handler import ( "context" - "io" - "net/http" - cmsstorage "go.unistack.org/cms-service/storage" "go.unistack.org/micro/v3" "go.unistack.org/micro/v3/errors" "go.unistack.org/unistack-org/pkgdash/config" pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate" + "go.unistack.org/unistack-org/pkgdash/service/client_git" "go.unistack.org/unistack-org/pkgdash/storage" "google.golang.org/protobuf/encoding/protojson" + "io" + "net/http" ) type Handler struct { @@ -21,6 +21,9 @@ type Handler struct { writer writer protojson.MarshalOptions protojson.UnmarshalOptions + + git client_git.Client + chanUrl chan *pb.AddPackageRsp } func (h *Handler) ListPackage(w http.ResponseWriter, r *http.Request) { @@ -42,12 +45,39 @@ func (h *Handler) ListPackage(w http.ResponseWriter, r *http.Request) { h.writer.Response(ctx, w, rsp) } -func (h *Handler) UpdateInfo(w http.ResponseWriter, r *http.Request) { +func (h *Handler) UpdatePackage(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := h.svc.Logger() - logger.Debug(ctx, "Start UpdateInfo") + logger.Debug(ctx, "Start UpdatePackage") - // TODO + defer r.Body.Close() + all, err := io.ReadAll(r.Body) + if err != nil { + logger.Error(ctx, err) + h.writer.Response(ctx, w, NewInternalError(err)) + return + } + + rsp := new(pb.UpdatePackageRsp) + if err = h.Unmarshal(all, rsp); err != nil { + logger.Error(ctx, err) + h.writer.Response(ctx, w, NewUnmarshalError(err)) + return + } + + if err = rsp.Validate(); err != nil { + logger.Error(ctx, err) + h.writer.Response(ctx, w, NewValidationError(err)) + return + } + + if err = h.store.UpdatePackage(ctx, rsp); err != nil { + logger.Error(ctx, err) + h.writer.Response(ctx, w, NewInternalError(err)) + return + } + + logger.Debug(ctx, "Success finish UpdatePackage") } func (h *Handler) AddComment(w http.ResponseWriter, r *http.Request) { @@ -111,21 +141,24 @@ func (h *Handler) AddPackage(w http.ResponseWriter, r *http.Request) { return } - // TODO: - // need logic for add module from go.mod - // err := setModules(req) - - if err = h.store.AddPackage(ctx, req); err != nil { - logger.Error(ctx, err) - h.writer.Response(ctx, w, NewInternalError(err)) - return + if h.git.IsClose() { + logger.Error(ctx, "chan is closed") + } else { + h.chanUrl <- req } logger.Debug(ctx, "Success finish addPackage") } -func NewHandler(svc micro.Service, w writer) *Handler { - return &Handler{svc: svc, writer: w} +func NewHandler(svc micro.Service, w writer, client client_git.Client) *Handler { + h := &Handler{ + svc: svc, + writer: w, + git: client, + } + h.EmitUnpopulated = true + h.UseProtoNames = false + return h } func (h *Handler) Init(ctx context.Context) error { @@ -138,6 +171,8 @@ func (h *Handler) Init(ctx context.Context) error { return errors.New(config.ServiceName, "error init storage", 1) } + h.chanUrl = h.git.Run(ctx, st) + h.store = st return nil diff --git a/main.go b/main.go index 2961783..779e647 100644 --- a/main.go +++ b/main.go @@ -65,7 +65,7 @@ func main() { cloneOpts.Depth = 1 } - if err := cloneOpts.Validate(); err != nil { + if err = cloneOpts.Validate(); err != nil { logger.Fatal(ctx, err) } diff --git a/models/entities.go b/models/entities.go index b1a5ea4..9ec3f86 100644 --- a/models/entities.go +++ b/models/entities.go @@ -7,19 +7,20 @@ import ( ) type Package struct { - ID int64 `db:"id" json:"id"` // package id - Name string `db:"name" json:"name"` // service name, last component path - URL string `db:"url" json:"url"` // scm url - Modules []int64 `db:"modules" json:"modules"` // parsed go.mod modules - Issues []int64 `db:"issues" json:"issues,omitempty"` // issues list - Comments []int64 `db:"comments" json:"comments,omitempty"` + ID int64 `db:"id" json:"id"` // package id + Name string `db:"name" json:"name"` // service name, last component path + URL string `db:"url" json:"url"` // scm url + Modules []uint64 `db:"modules" json:"modules"` // parsed go.mod modules + Issues []int64 `db:"issues" json:"issues,omitempty"` // issues list + Comments []int64 `db:"comments" json:"comments,omitempty"` } type Module struct { - ID int64 `db:"id"` - Name string `db:"name"` // module name - Version string `db:"version"` // module - Package int64 `db:"package"` + ID int64 `db:"id"` + Name string `db:"name"` // module name + Version string `db:"version"` // module + Package int64 `db:"package"` + LastVersion string `db:"last_version"` } type Issue struct { diff --git a/service/client_git/client.go b/service/client_git/client.go new file mode 100644 index 0000000..664d221 --- /dev/null +++ b/service/client_git/client.go @@ -0,0 +1,218 @@ +package client_git + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "go.unistack.org/unistack-org/pkgdash/internal" + "go.unistack.org/unistack-org/pkgdash/models" + "io" + "net/url" + "os" + "sort" + "strings" + + "github.com/go-git/go-git/v5" + "github.com/go-git/go-git/v5/plumbing/filemode" + "github.com/go-git/go-git/v5/plumbing/object" + "github.com/go-git/go-git/v5/storage/memory" + "go.unistack.org/micro/v3/logger" + pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate" + "go.unistack.org/unistack-org/pkgdash/storage" + "golang.org/x/mod/modfile" + "golang.org/x/mod/module" +) + +type Client interface { + Run(ctx context.Context, st storage.Storage) chan *pb.AddPackageRsp + IsClose() bool + Done() <-chan struct{} +} + +type client struct { + worker chan *pb.AddPackageRsp + closed bool + lock chan struct{} +} + +func NewClient(cap uint) Client { + return &client{ + make(chan *pb.AddPackageRsp, cap), + false, + make(chan struct{}), + } +} + +func (c *client) Run(ctx context.Context, st storage.Storage) chan *pb.AddPackageRsp { + go func() { + defer close(c.worker) + for { + select { + case data := <-c.worker: + go func() { + runner(ctx, st, data) + }() + case <-ctx.Done(): + logger.Info(ctx, ctx.Err()) + return + } + } + }() + + return c.worker +} + +func (c *client) IsClose() bool { + return c.closed +} + +// Done for locked goroutine +func (c *client) Done() <-chan struct{} { + return c.lock +} + +func runner(ctx context.Context, st storage.Storage, rsp *pb.AddPackageRsp) { + modules, err := getGoModule(ctx, rsp.Url.Value) + if err != nil { + logger.Error(ctx, err) + return + } + + logger.Infof(ctx, "success get list mod", modules) + + if rsp.Modules, err = st.InsertButchModules(ctx, modules); err != nil { + logger.Error(ctx, err) + return + } + + if err = st.AddPackage(ctx, rsp); err != nil { + logger.Error(ctx, err) + } +} + +func getGoModule(ctx context.Context, gitUrl string) ([]models.Module, error) { + u, err := url.Parse(gitUrl) + if err != nil { + logger.Fatal(ctx, err) + } + + var rev string + if idx := strings.Index(u.Path, "@"); idx > 0 { + rev = u.Path[idx+1:] + } + + cloneOpts := &git.CloneOptions{ + URL: gitUrl, + Progress: os.Stdout, + } + + if len(rev) == 0 { + cloneOpts.SingleBranch = true + cloneOpts.Depth = 1 + } + + if err = cloneOpts.Validate(); err != nil { + return nil, err + } + + repo, err := git.CloneContext(ctx, memory.NewStorage(), nil, cloneOpts) + if err != nil { + return nil, err + } + + ref, err := repo.Head() + if err != nil { + return nil, fmt.Errorf("failed to get head: %v", err) + } + + commit, err := repo.CommitObject(ref.Hash()) + if err != nil { + return nil, fmt.Errorf("failed to get commit: %v", err) + } + + tree, err := commit.Tree() + if err != nil { + return nil, err + } + + unique := make(map[string]models.Module) + var mvs []module.Version + err = tree.Files().ForEach(func(file *object.File) error { + if file == nil { + err = errors.New("file pointer is nil") + logger.Error(ctx, err) + return err + } + + switch file.Mode { + case filemode.Regular: + if strings.HasSuffix(file.Name, "go.mod") { + if mvs, err = Direct(file); err != nil { + return err + } + for i := range mvs { + unique[mvs[i].Path] = models.Module{ + Name: mvs[i].Path, + Version: mvs[i].Version, + LastVersion: mvs[i].Version, + } + } + internal.Updates(internal.UpdateOptions{ + Pre: false, + Major: false, + Cached: false, + Modules: mvs, + OnUpdate: func(u internal.Update) { + if u.Err != nil { + logger.Errorf(ctx, "%s: failed: %v\n", u.Module.Path, u.Err) + } else { + val := unique[u.Module.Path] + val.LastVersion = u.Version + unique[u.Module.Path] = val + } + }, + }) + } + } + return nil + }) + + result := make([]models.Module, 0, len(unique)) + for _, v := range unique { + result = append(result, v) + } + + sort.Slice(result, func(i, j int) bool { + return result[i].Name < result[j].Name + }) + + return result, err +} + +func Direct(file *object.File) ([]module.Version, error) { + r, err := file.Reader() + if err != nil { + return nil, err + } + defer r.Close() + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + modfile, err := modfile.ParseLax("go.mod", data, nil) + if err != nil { + return nil, err + } + var mods []module.Version + for _, req := range modfile.Require { + // if !req.Indirect { + mods = append(mods, req.Mod) + //} + } + /* + sort.Slice(mods, func(i, j int) bool { + return mods[i].Path < mods[j].Path + }) + */ + return mods, nil +} diff --git a/service/client_git/client_test.go b/service/client_git/client_test.go new file mode 100644 index 0000000..f6b07c1 --- /dev/null +++ b/service/client_git/client_test.go @@ -0,0 +1,49 @@ +package client_git + +import ( + "context" + "database/sql" + "fmt" + pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate" + "go.unistack.org/unistack-org/pkgdash/storage" + "go.unistack.org/unistack-org/pkgdash/storage/postgres" + "google.golang.org/protobuf/types/known/wrapperspb" + "testing" +) + +func TestClient(t *testing.T) { + dsn := fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=disable", "test", "123", "localhost", "5432", "postgres") + conn, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + if err = conn.Ping(); err != nil { + t.Fatal(err) + } + + st, err := postgres.NewStorage(conn) + if err != nil { + t.Fatal(err) + } + + s, ok := st.(storage.Storage) + if !ok { + t.Fatal("typecast error") + } + + ctx, cancel := context.WithCancel(context.Background()) + _ = cancel + cli := NewClient(1) + + ch := cli.Run(ctx, s) + + data := &pb.AddPackageRsp{ + Name: wrapperspb.String("test"), + Url: wrapperspb.String("https://github.com/dantedenis/service_history.git"), + } + + ch <- data + + <-cli.Done() +} diff --git a/service/service.go b/service/service.go index b275939..a80be6d 100644 --- a/service/service.go +++ b/service/service.go @@ -6,6 +6,7 @@ import ( intcfg "go.unistack.org/unistack-org/pkgdash/config" "go.unistack.org/unistack-org/pkgdash/handler" "go.unistack.org/unistack-org/pkgdash/handler/encoders" + "go.unistack.org/unistack-org/pkgdash/service/client_git" "net/http" //pbmicro "go.unistack.org/unistack-org/pkgdash/proto/micro" @@ -53,7 +54,7 @@ func NewService(ctx context.Context) (micro.Service, error) { logger.Fatalf(ctx, "failed init writer: %v", err) } - h := handler.NewHandler(svc, writer) + h := handler.NewHandler(svc, writer, client_git.NewClient(5)) if err := svc.Init( micro.AfterStart(func(_ context.Context) error { @@ -128,7 +129,7 @@ func NewService(ctx context.Context) (micro.Service, error) { mux := http.NewServeMux() mux.HandleFunc("/listPackage", handler.Methods(http.MethodGet, h.ListPackage)) - mux.HandleFunc("/updateInfo", handler.Methods(http.MethodPost, h.UpdateInfo)) + mux.HandleFunc("/updateInfo", handler.Methods(http.MethodPost, h.UpdatePackage)) mux.HandleFunc("/addComment", handler.Methods(http.MethodPut, h.AddComment)) mux.HandleFunc("/addPackage", handler.Methods(http.MethodPost, h.AddPackage)) diff --git a/storage/migrations/postgres/000001_init_schema.up.sql b/storage/migrations/postgres/000001_init_schema.up.sql index a8ad351..2e7fdd1 100644 --- a/storage/migrations/postgres/000001_init_schema.up.sql +++ b/storage/migrations/postgres/000001_init_schema.up.sql @@ -14,7 +14,8 @@ create table if not exists comment ( create table if not exists module ( id serial not null unique primary key , name varchar not null , - version varchar not null + version varchar not null , + last_version varchar not null ); create table if not exists issue ( @@ -34,4 +35,5 @@ create table if not exists package ( comments integer[] default '{}'::integer[] ); +create unique index module_info on module(name, version); diff --git a/storage/postgres/quries.go b/storage/postgres/quries.go index ad6f232..aa0e3c8 100644 --- a/storage/postgres/quries.go +++ b/storage/postgres/quries.go @@ -19,5 +19,10 @@ update package set comments = array_append(comments, (select * from insert_comm) ` queryAddPackage = ` insert into package(name, url, modules) values ($1, $2, $3); +` + queryInsMsgGetIDs = ` +insert into module(name, version, last_version) values +%s +returning id; ` ) diff --git a/storage/postgres/storage.go b/storage/postgres/storage.go index 84685fe..81e62ff 100644 --- a/storage/postgres/storage.go +++ b/storage/postgres/storage.go @@ -5,7 +5,9 @@ import ( "database/sql" "embed" "errors" + "fmt" pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate" + "strings" "github.com/golang-migrate/migrate/v4" mpgx "github.com/golang-migrate/migrate/v4/database/pgx" @@ -87,6 +89,10 @@ func (s *Postgres) MigrateDown() error { return nil } +func (s *Postgres) UpdatePackage(ctx context.Context, rsp *pb.UpdatePackageRsp) error { + panic("need implement") +} + func (s *Postgres) ListPackage(ctx context.Context) (models.ListPackage, error) { rows, err := s.db.QueryContext(ctx, queryListPackage) if err != nil { @@ -175,3 +181,57 @@ func (s *Postgres) AddPackage(ctx context.Context, rsp *pb.AddPackageRsp) error return err } + +func (s *Postgres) InsertButchModules(ctx context.Context, rsp []models.Module) ([]uint64, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + logger.Errorf(ctx, "AddPackage: unable to rollback: %v", rollbackErr) + } + } else { + err = tx.Commit() + } + }() + + query := generateQuery(rsp) + + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer func() { + if err = rows.Close(); err != nil { + return + } + err = rows.Err() + }() + + result := make([]uint64, 0) + for rows.Next() { + tmp := uint64(0) + if err = rows.Scan(&tmp); err != nil { + return nil, err + } + + result = append(result, tmp) + } + + return result, err +} + +func generateQuery(rsp []models.Module) string { + const pattern = `%c('%s', '%s', '%s')` + build := strings.Builder{} + comma := ' ' + for i := range rsp { + str := fmt.Sprintf(pattern, comma, rsp[i].Name, rsp[i].Version, rsp[i].LastVersion) + build.WriteString(str) + comma = ',' + } + + return fmt.Sprintf(queryInsMsgGetIDs, build.String()) +} diff --git a/storage/postgres/storage_test.go b/storage/postgres/storage_test.go new file mode 100644 index 0000000..9f228b1 --- /dev/null +++ b/storage/postgres/storage_test.go @@ -0,0 +1,25 @@ +package postgres + +import ( + "fmt" + "go.unistack.org/unistack-org/pkgdash/models" + "testing" +) + +func TestGenerate(t *testing.T) { + m := []models.Module{ + { + 1, "test", "1.2.3", 2, "23.31", + }, + { + 1, "321test", "1.3", 4, "2111.31", + }, + { + 1, "testabcd", "1.2.3", 2, "153453.31", + }, + } + + str := generateQuery(m) + + fmt.Println(str) +} diff --git a/storage/storage.go b/storage/storage.go index 2833e6a..616cb8c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -26,8 +26,10 @@ type Storage interface { cmsstorage.Migrator ListPackage(ctx context.Context) (models.ListPackage, error) + UpdatePackage(ctx context.Context, rsp *pb.UpdatePackageRsp) error AddComment(ctx context.Context, rsp *pb.AddCommentRsp) error AddPackage(ctx context.Context, rsp *pb.AddPackageRsp) error + InsertButchModules(ctx context.Context, rsp []models.Module) ([]uint64, error) } func NewStorage(name string, db *sql.DB) (interface{}, error) {