Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2023-08-19 16:55:52 +03:00
parent 0e18a63f10
commit 59e28e5a86
18 changed files with 801 additions and 804 deletions

View File

@@ -15,8 +15,7 @@ create table if not exists comment (
create table if not exists module (
id serial not null unique primary key ,
name varchar not null ,
version varchar not null ,
last_version varchar not null
version varchar not null
);
create table if not exists issue (

View File

@@ -6,15 +6,6 @@ create table if not exists comments (
updated timestamp not null default current_timestamp
);
create table if not exists modules (
id integer primary key autoincrement not null,
name varchar not null ,
version varchar not null,
package integer not null,
last_version varchar not null,
created timestamp not null default current_timestamp,
updated timestamp not null default current_timestamp
);
create table if not exists issues (
id integer primary key autoincrement not null,
@@ -36,3 +27,22 @@ create table if not exists packages (
status integer default 1,
last_check timestamp
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_url on packages (url);
create table if not exists modules (
id integer primary key autoincrement not null,
name varchar not null,
version varchar not null,
last_check timestamp not null default current_timestamp
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_name_version on modules (name,version);
create table if not exists packages_modules (
id integer primary key autoincrement not null,
package integer,
module integer not null
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_idx_package_module on packages_modules (package,module);

View File

@@ -0,0 +1,28 @@
package handler
import (
"context"
"net/http"
"git.unistack.org/unistack-org/pkgdash/internal/models"
pb "git.unistack.org/unistack-org/pkgdash/proto"
httpsrv "go.unistack.org/micro-server-http/v4"
"go.unistack.org/micro/v4/logger"
)
func (h *Handler) PackagesModules(ctx context.Context, req *pb.PackagesModulesReq, rsp *pb.PackagesModulesRsp) error {
logger.Debug(ctx, "PackagesModuleshandler start")
modules, err := h.store.PackagesModules(ctx, req)
if err != nil {
logger.Errorf(ctx, "error db response: %v", err)
httpsrv.SetRspCode(ctx, http.StatusInternalServerError)
return httpsrv.SetError(NewInternalError(err))
}
for _, mod := range modules {
rsp.Modules = append(rsp.Modules, models.NewModule(mod))
}
logger.Debug(ctx, "PackagesModules handler stop")
return nil
}

View File

@@ -9,6 +9,7 @@ import (
)
type Package struct {
LastCheck sql.NullTime `db:"last_check"`
Created time.Time `db:"created"`
Updated time.Time `db:"updated"`
Name string `db:"name"`
@@ -18,7 +19,6 @@ type Package struct {
Comments uint64 `db:"comments"`
ID uint64 `db:"id"`
Status uint64 `db:"status"`
LastCheck sql.NullTime `db:"last_check"`
}
func NewPackage(pkg *Package) *pb.Package {
@@ -39,25 +39,22 @@ func NewPackage(pkg *Package) *pb.Package {
}
type Module struct {
Created time.Time `db:"created"`
Updated time.Time `db:"updated"`
Name string `db:"name"`
Version string `db:"version"`
LastVersion string `db:"last_version"`
ID uint64 `db:"id"`
Package uint64 `db:"package"`
LastCheck sql.NullTime `db:"last_check"`
Name string `db:"name"`
Version string `db:"version"`
ID uint64 `db:"id"`
}
func NewModule(mod *Module) *pb.Module {
return &pb.Module{
Name: mod.Name,
Version: mod.Version,
LastVersion: mod.LastVersion,
Package: mod.Package,
Id: mod.ID,
Created: timestamppb.New(mod.Created),
Updated: timestamppb.New(mod.Updated),
rsp := &pb.Module{
Name: mod.Name,
Version: mod.Version,
Id: mod.ID,
}
if mod.LastCheck.Valid {
rsp.LastCheck = timestamppb.New(mod.LastCheck.Time)
}
return rsp
}
type Issue struct {

View File

@@ -1,4 +1,4 @@
package internal
package modules
import (
"bufio"
@@ -237,19 +237,19 @@ func QueryPackage(pkgpath string, cached bool) (*Module, error) {
// Update reports a newer version of a module.
// The Err field will be set if an error occured.
type Update struct {
Err error
Module module.Version
Version string
Err error
}
// UpdateOptions specifies a set of modules to check for updates.
// The OnUpdate callback will be invoked with any updates found.
type UpdateOptions struct {
OnUpdate func(Update)
Modules []module.Version
Pre bool
Cached bool
Major bool
Modules []module.Version
OnUpdate func(Update)
}
// Updates finds updates for a set of specified modules.

View File

@@ -1,4 +1,4 @@
package internal
package modules
import (
"fmt"

View File

@@ -1,14 +1,17 @@
package sqlite
const (
queryPackagesProcess = `select id, name, url, comments, modules, issues, created, updated from packages where ROUND((JULIANDAY(CURRENT_TIMESTAMP) - JULIANDAY(last_check)) * 86400) > $1 or last_check is NULL`
queryPackagesModulesCount = `update packages set modules = $2, last_check = CURRENT_TIMESTAMP where id = $1;`
queryPackagesList = `select id, name, url, comments, modules, issues, created, updated from packages;`
queryPackagesLookup = `select id, name, url, comments, modules, issues, created, updated from packages where id = $1;`
queryCommentsCreate = `insert into comments (comment) values ($1) returning id;`
queryPackagesCreate = `insert into packages (name, url) values ($1, $2) returning *;`
queryInsMsgGetIDs = `insert into modules(name, version, last_version) values %s returning id;`
queryModulesList = `select id, name, version, last_version, created, updated from modules;`
queryModulesCreate = `insert into modules (name, version, last_version, package) values ($1, $2, $3, $4) returning *;`
queryCommentsList = `select id, text, created, updated from comments;`
queryPackagesModulesCreate = `insert into packages_modules as pm (package, module) values ($1, $2) on conflict (package,module) do nothing;`
queryPackagesUpdateLastCheck = `update packages set last_check = CURRENT_TIMESTAMP where id = $1;`
queryPackagesModules = `select modules.id, modules.name, modules.version from modules left join packages_modules on modules.id = packages_modules.module left join packages on packages.id = packages_modules.package where packages_modules.package = $1;`
queryPackagesProcess = `select id, name, url, comments, modules, issues, created, updated, last_check from packages where ROUND((JULIANDAY(CURRENT_TIMESTAMP) - JULIANDAY(last_check)) * 86400) > $1 or last_check is NULL`
queryModulesProcess = `select id, name, version, last_check from modules where ROUND((JULIANDAY(CURRENT_TIMESTAMP) - JULIANDAY(last_check)) * 86400) > $1 or last_check is NULL`
queryPackagesModulesCount = `update packages set modules = $2, last_check = CURRENT_TIMESTAMP where id = $1;`
queryPackagesList = `select id, name, url, comments, modules, issues, created, updated from packages;`
queryPackagesLookup = `select id, name, url, comments, modules, issues, created, updated from packages where id = $1;`
queryCommentsCreate = `insert into comments (comment) values ($1) returning id;`
queryPackagesCreate = `insert into packages as p (name, url) values ($1, $2) on conflict (url) do update set name = p.name returning *;`
queryModulesList = `select id, name, version from modules;`
queryModulesCreate = `insert into modules as m (name, version) values ($1, $2) on conflict (name,version) do update set last_check = CURRENT_TIMESTAMP returning *;`
queryCommentsList = `select id, text, created, updated from comments;`
)

View File

@@ -3,7 +3,6 @@ package sqlite
import (
"context"
"fmt"
"strings"
"time"
"git.unistack.org/unistack-org/pkgdash/internal/models"
@@ -29,6 +28,34 @@ func NewStorage() func(*sqlx.DB) interface{} {
}
}
func (s *Sqlite) PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
for _, mod := range modules {
err = tx.GetContext(ctx, mod, queryModulesCreate, mod.Name, mod.Version)
if err != nil {
_ = tx.Rollback()
return err
}
_, err = tx.ExecContext(ctx, queryPackagesModulesCreate, pkg.ID, mod.ID)
if err != nil {
_ = tx.Rollback()
return err
}
}
if err = tx.Commit(); err != nil {
_ = tx.Rollback()
return err
}
return nil
}
func (s *Sqlite) PackagesDelete(ctx context.Context, req *pb.PackagesDeleteReq) error {
return fmt.Errorf("need implement")
}
@@ -59,6 +86,17 @@ func (s *Sqlite) PackagesList(ctx context.Context, req *pb.PackagesListReq) ([]*
return packages, nil
}
func (s *Sqlite) PackagesModules(ctx context.Context, req *pb.PackagesModulesReq) ([]*models.Module, error) {
var modules []*models.Module
err := s.db.SelectContext(ctx, &modules, queryPackagesModules, req.Package)
if err != nil {
return nil, err
}
return modules, nil
}
func (s *Sqlite) CommentsDelete(ctx context.Context, req *pb.CommentsDeleteReq) error {
return nil
}
@@ -96,6 +134,36 @@ func (s *Sqlite) PackagesProcess(ctx context.Context, td time.Duration) ([]*mode
return packages, nil
}
func (s *Sqlite) PackagesUpdateLastCheck(ctx context.Context, packages []*models.Package) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
for _, pkg := range packages {
if _, err = tx.ExecContext(ctx, queryPackagesUpdateLastCheck, pkg.ID); err != nil {
tx.Rollback()
return err
}
}
if err = tx.Commit(); err != nil {
tx.Rollback()
return err
}
return nil
}
func (s *Sqlite) ModulesProcess(ctx context.Context, td time.Duration) ([]*models.Module, error) {
var modules []*models.Module
err := s.db.SelectContext(ctx, &modules, queryModulesProcess, td.Seconds())
if err != nil {
return nil, err
}
return modules, nil
}
func (s *Sqlite) PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) (*models.Package, error) {
pkg := &models.Package{}
err := s.db.GetContext(ctx, pkg, queryPackagesCreate, req.Name, req.Url)
@@ -106,25 +174,20 @@ func (s *Sqlite) PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq)
return pkg, nil
}
func (s *Sqlite) PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error {
func (s *Sqlite) ModulesCreate(ctx context.Context, modules []*models.Module) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
for _, mod := range modules {
err = tx.GetContext(ctx, mod, queryModulesCreate, mod.Name, mod.Version, mod.LastVersion, mod.Package)
err = tx.GetContext(ctx, mod, queryModulesCreate, mod.Name, mod.Version)
if err != nil {
_ = tx.Rollback()
return err
}
}
if _, err = tx.ExecContext(ctx, queryPackagesModulesCount, pkg.ID, len(modules)); err != nil {
_ = tx.Rollback()
return err
}
if err = tx.Commit(); err != nil {
_ = tx.Rollback()
return err
@@ -134,41 +197,12 @@ func (s *Sqlite) PackagesModulesCreate(ctx context.Context, pkg *models.Package,
}
func (s *Sqlite) ModulesList(ctx context.Context, req *pb.ModulesListReq) ([]*models.Module, error) {
var err error
var modules []*models.Module
rows, err := s.db.QueryContext(ctx, queryModulesList)
err := s.db.SelectContext(ctx, &modules, queryModulesList)
if err != nil {
return nil, err
}
defer func() {
if err = rows.Close(); err != nil {
return
}
err = rows.Err()
}()
for ; rows.Err() == nil; rows.Next() {
mod := &models.Module{}
if err = rows.Scan(
&mod.ID,
&mod.Name,
&mod.Version,
&mod.LastVersion,
); err != nil {
return nil, err
}
modules = append(modules, mod)
}
if err = rows.Err(); err != nil {
return nil, err
}
if err = rows.Close(); err != nil {
return nil, err
}
return modules, nil
}
@@ -183,16 +217,3 @@ func (s *Sqlite) CommentsList(ctx context.Context, req *pb.CommentsListReq) ([]*
return comments, nil
}
func generateQuery(rsp []models.Module) string {
const pattern = `%c('%s', '%s', '%s')`
build := strings.Builder{}
comma := ' '
for i := range rsp {
str := fmt.Sprintf(pattern, comma, rsp[i].Name, rsp[i].Version, rsp[i].LastVersion)
build.WriteString(str)
comma = ','
}
return fmt.Sprintf(queryInsMsgGetIDs, build.String())
}

View File

@@ -17,6 +17,10 @@ func RegisterStorage(name string, fn func(*sqlx.DB) interface{}) {
var storages = map[string]func(*sqlx.DB) interface{}{}
type Storage interface {
PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error
PackagesUpdateLastCheck(ctx context.Context, packages []*models.Package) error
PackagesModules(ctx context.Context, req *pb.PackagesModulesReq) ([]*models.Module, error)
ModulesProcess(ctx context.Context, td time.Duration) ([]*models.Module, error)
PackagesProcess(ctx context.Context, td time.Duration) ([]*models.Package, error)
PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) (*models.Package, error)
PackagesList(ctx context.Context, req *pb.PackagesListReq) ([]*models.Package, error)
@@ -27,7 +31,7 @@ type Storage interface {
CommentsDelete(ctx context.Context, req *pb.CommentsDeleteReq) error
CommentsList(ctx context.Context, req *pb.CommentsListReq) ([]*models.Comment, error)
ModulesList(ctx context.Context, req *pb.ModulesListReq) ([]*models.Module, error)
PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error
ModulesCreate(ctx context.Context, modules []*models.Module) error
}
func NewStorage(name string, db *sqlx.DB) (Storage, error) {

View File

@@ -12,8 +12,8 @@ import (
"sync"
"time"
"git.unistack.org/unistack-org/pkgdash/internal"
"git.unistack.org/unistack-org/pkgdash/internal/models"
"git.unistack.org/unistack-org/pkgdash/internal/modules"
"git.unistack.org/unistack-org/pkgdash/internal/storage"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/filemode"
@@ -26,63 +26,68 @@ import (
)
func Run(ctx context.Context, store storage.Storage, td time.Duration) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
modTicker := time.NewTicker(5 * time.Second)
defer modTicker.Stop()
pkgTicker := time.NewTicker(5 * time.Second)
defer pkgTicker.Stop()
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
logger.Infof(ctx, "check packages to process")
case <-pkgTicker.C:
packages, err := store.PackagesProcess(ctx, td)
logger.Infof(ctx, "check packages to process %#+v, err: %v", packages, err)
if err != nil && err != sql.ErrNoRows {
if err != nil {
if err != sql.ErrNoRows {
continue
}
logger.Fatalf(ctx, "failed to get packages to process: %v", err)
}
wg.Add(len(packages))
for _, pkg := range packages {
go func(p *models.Package) {
if err := process(ctx, store, p); err != nil {
if err := parseModFile(ctx, store, p); err != nil {
logger.Errorf(ctx, "failed to process package %s: %v", p.Name, err)
}
p.LastCheck.Time = time.Now()
wg.Done()
}(pkg)
}
wg.Wait()
if err = store.PackagesUpdateLastCheck(ctx, packages); err != nil {
logger.Errorf(ctx, "update packages last_check %#+v, err: %v", packages, err)
}
case <-modTicker.C:
modules, err := store.ModulesProcess(ctx, td)
if err != nil {
if err != sql.ErrNoRows {
continue
}
logger.Fatalf(ctx, "failed to get modules to process: %v", err)
}
if err := processModules(ctx, store, modules); err != nil {
logger.Errorf(ctx, "failed to process modules: %v", err)
}
}
}
}
func process(ctx context.Context, store storage.Storage, pkg *models.Package) error {
func parseModFile(ctx context.Context, store storage.Storage, pkg *models.Package) error {
logger.Infof(ctx, "process package %v", pkg)
modules, err := getGoModule(ctx, pkg.ID, pkg.URL)
u, err := url.Parse(pkg.URL)
if err != nil {
logger.Errorf(ctx, "failed to get modules: %v", err)
return err
}
if err = store.PackagesModulesCreate(ctx, pkg, modules); err != nil {
logger.Errorf(ctx, "failed to set create modules: %v", err)
return err
}
return nil
}
func getGoModule(ctx context.Context, pkgID uint64, gitUrl string) ([]*models.Module, error) {
u, err := url.Parse(gitUrl)
if err != nil {
logger.Fatal(ctx, err)
}
var rev string
if idx := strings.Index(u.Path, "@"); idx > 0 {
rev = u.Path[idx+1:]
}
cloneOpts := &git.CloneOptions{
URL: gitUrl,
URL: pkg.URL,
Progress: os.Stdout,
}
@@ -92,27 +97,27 @@ func getGoModule(ctx context.Context, pkgID uint64, gitUrl string) ([]*models.Mo
}
if err = cloneOpts.Validate(); err != nil {
return nil, err
return err
}
repo, err := git.CloneContext(ctx, memory.NewStorage(), nil, cloneOpts)
if err != nil {
return nil, err
return err
}
ref, err := repo.Head()
if err != nil {
return nil, fmt.Errorf("failed to get head: %v", err)
return fmt.Errorf("failed to get head: %v", err)
}
commit, err := repo.CommitObject(ref.Hash())
if err != nil {
return nil, fmt.Errorf("failed to get commit: %v", err)
return fmt.Errorf("failed to get commit: %v", err)
}
tree, err := commit.Tree()
if err != nil {
return nil, err
return err
}
unique := make(map[string]*models.Module)
@@ -127,73 +132,95 @@ func getGoModule(ctx context.Context, pkgID uint64, gitUrl string) ([]*models.Mo
switch file.Mode {
case filemode.Regular:
if strings.HasSuffix(file.Name, "go.mod") {
if mvs, err = Direct(file); err != nil {
if mvs, err = parseFile(file); err != nil {
return err
}
for i := range mvs {
unique[mvs[i].Path] = &models.Module{
Package: pkgID,
Name: mvs[i].Path,
Version: mvs[i].Version,
LastVersion: mvs[i].Version,
Name: mvs[i].Path,
Version: mvs[i].Version,
}
}
internal.Updates(internal.UpdateOptions{
Pre: false,
Major: false,
Cached: false,
Modules: mvs,
OnUpdate: func(u internal.Update) {
if u.Err != nil {
logger.Errorf(ctx, "%s: failed: %v\n", u.Module.Path, u.Err)
} else {
val := unique[u.Module.Path]
val.LastVersion = u.Version
unique[u.Module.Path] = val
}
},
})
}
}
return nil
})
result := make([]*models.Module, 0, len(unique))
modules := make([]*models.Module, 0, len(unique))
for _, v := range unique {
result = append(result, v)
modules = append(modules, v)
}
sort.Slice(result, func(i, j int) bool {
return result[i].Name < result[j].Name
sort.Slice(modules, func(i, j int) bool {
return modules[i].Name < modules[j].Name
})
return result, err
if err = store.PackagesModulesCreate(ctx, pkg, modules); err != nil {
logger.Errorf(ctx, "failed to set create modules: %v", err)
return err
}
return nil
}
func Direct(file *object.File) ([]module.Version, error) {
func processModules(ctx context.Context, store storage.Storage, mods []*models.Module) error {
mvs := make(map[string]*models.Module, len(mods))
for _, mod := range mods {
mvs[mod.Name] = mod
}
mvsu := make([]module.Version, 0, len(mvs))
for _, mv := range mvs {
mvsu = append(mvsu, module.Version{Path: mv.Name, Version: mv.Version})
}
modules.Updates(modules.UpdateOptions{
Pre: false,
Major: false,
Cached: false,
Modules: mvsu,
OnUpdate: func(u modules.Update) {
if u.Err != nil {
logger.Errorf(ctx, "%s: failed: %v", u.Module.Path, u.Err)
} else {
mvs[u.Module.Path].Version = u.Version
}
},
})
if err := store.ModulesCreate(ctx, mods); err != nil {
return err
}
return nil
}
func parseFile(file *object.File) ([]module.Version, error) {
r, err := file.Reader()
if err != nil {
return nil, err
}
defer r.Close()
data, err := io.ReadAll(r)
r.Close()
if err != nil {
return nil, err
}
modfile, err := modfile.ParseLax("go.mod", data, nil)
if err != nil {
return nil, err
}
var mods []module.Version
mods := make([]module.Version, 0, len(modfile.Require))
for _, req := range modfile.Require {
// if !req.Indirect {
mods = append(mods, req.Mod)
//}
}
/*
sort.Slice(mods, func(i, j int) bool {
return mods[i].Path < mods[j].Path
})
*/
sort.Slice(mods, func(i, j int) bool {
return mods[i].Path < mods[j].Path
})
return mods, nil
}