pkgdash/internal/worker/worker.go

227 lines
4.9 KiB
Go
Raw Normal View History

package worker
import (
"context"
"database/sql"
"fmt"
"io"
"net/url"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/filemode"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/pkg/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/pkgdash/internal/models"
"go.unistack.org/pkgdash/internal/modules"
"go.unistack.org/pkgdash/internal/storage"
"golang.org/x/mod/modfile"
"golang.org/x/mod/module"
)
func Run(ctx context.Context, log logger.Logger, store storage.Storage, td time.Duration) {
modTicker := time.NewTicker(5 * time.Second)
defer modTicker.Stop()
pkgTicker := time.NewTicker(5 * time.Second)
defer pkgTicker.Stop()
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
return
case <-pkgTicker.C:
packages, err := store.PackagesProcess(ctx, td)
if err != nil {
if err != sql.ErrNoRows {
continue
}
log.Fatal(ctx, "failed to get packages to process: %v", err)
}
wg.Add(len(packages))
for _, pkg := range packages {
go func(p *models.Package) {
if err := parseModFile(ctx, log, store, p); err != nil {
log.Error(ctx, "failed to process package %s: %v", p.Name, err)
}
p.LastCheck.Time = time.Now()
wg.Done()
}(pkg)
}
wg.Wait()
if err = store.PackagesUpdateLastCheck(ctx, packages); err != nil {
log.Error(ctx, "update packages last_check %#+v, err: %v", packages, err)
}
case <-modTicker.C:
modules, err := store.ModulesProcess(ctx, td)
if err != nil {
if err != sql.ErrNoRows {
continue
}
log.Fatal(ctx, "failed to get modules to process: %v", err)
}
if err := processModules(ctx, log, store, modules); err != nil {
log.Error(ctx, "failed to process modules: %v", err)
}
}
}
}
func parseModFile(ctx context.Context, log logger.Logger, store storage.Storage, pkg *models.Package) error {
log.Info(ctx, "process package %v", pkg)
u, err := url.Parse(pkg.URL)
if err != nil {
return err
}
var rev string
if idx := strings.Index(u.Path, "@"); idx > 0 {
rev = u.Path[idx+1:]
}
cloneOpts := &git.CloneOptions{
URL: pkg.URL,
Progress: os.Stdout,
}
if len(rev) == 0 {
cloneOpts.SingleBranch = true
cloneOpts.Depth = 1
}
if err = cloneOpts.Validate(); err != nil {
return err
}
repo, err := git.CloneContext(ctx, memory.NewStorage(), nil, cloneOpts)
if err != nil {
return err
}
ref, err := repo.Head()
if err != nil {
return fmt.Errorf("failed to get head: %v", err)
}
commit, err := repo.CommitObject(ref.Hash())
if err != nil {
return fmt.Errorf("failed to get commit: %v", err)
}
tree, err := commit.Tree()
if err != nil {
return err
}
unique := make(map[string]*models.Module)
var mvs []module.Version
err = tree.Files().ForEach(func(file *object.File) error {
if file == nil {
err = errors.New("file pointer is nil")
log.Error(ctx, "file tree error", err)
return err
}
switch file.Mode {
case filemode.Regular:
if strings.HasSuffix(file.Name, "go.mod") {
if mvs, err = parseFile(file); err != nil {
return err
}
for i := range mvs {
unique[mvs[i].Path] = &models.Module{
Name: mvs[i].Path,
Version: mvs[i].Version,
}
}
}
}
return nil
})
modules := make([]*models.Module, 0, len(unique))
for _, v := range unique {
modules = append(modules, v)
}
sort.Slice(modules, func(i, j int) bool {
return modules[i].Name < modules[j].Name
})
if err = store.PackageModulesCreate(ctx, pkg, modules); err != nil {
log.Error(ctx, "failed to set create modules: %v", err)
return err
}
return nil
}
func processModules(ctx context.Context, log logger.Logger, store storage.Storage, mods []*models.Module) error {
mvs := make(map[string]*models.Module, len(mods))
for _, mod := range mods {
mvs[mod.Name] = mod
}
mvsu := make([]module.Version, 0, len(mvs))
for _, mv := range mvs {
mvsu = append(mvsu, module.Version{Path: mv.Name, Version: mv.Version})
}
modules.Updates(modules.UpdateOptions{
Pre: false,
Major: false,
Cached: false,
Modules: mvsu,
OnUpdate: func(u modules.Update) {
if u.Err != nil {
log.Error(ctx, "%s: failed: %v", u.Module.Path, u.Err)
} else {
mvs[u.Module.Path].Version = u.Version
}
},
})
if err := store.ModuleCreate(ctx, mods); err != nil {
return err
}
return nil
}
func parseFile(file *object.File) ([]module.Version, error) {
r, err := file.Reader()
if err != nil {
return nil, err
}
data, err := io.ReadAll(r)
r.Close()
if err != nil {
return nil, err
}
modfile, err := modfile.ParseLax("go.mod", data, nil)
if err != nil {
return nil, err
}
mods := make([]module.Version, 0, len(modfile.Require))
for _, req := range modfile.Require {
mods = append(mods, req.Mod)
}
sort.Slice(mods, func(i, j int) bool {
return mods[i].Path < mods[j].Path
})
return mods, nil
}