use worker

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2023-08-18 23:59:15 +03:00
parent 78f0ae14d7
commit 0e18a63f10
46 changed files with 2195 additions and 1181 deletions

View File

@@ -1 +0,0 @@
drop table if exists dashboard, package, module, issue, comment;

View File

@@ -1,40 +0,0 @@
create table if not exists dashboard (
id serial not null unique primary key ,
"uuid" uuid not null unique default gen_random_uuid() ,
package integer[] default '{}'::integer[]
);
create table if not exists comment (
id serial not null unique primary key ,
"text" text ,
package integer not null,
created timestamp not null default current_timestamp ,
updated timestamp default current_timestamp
);
create table if not exists module (
id serial not null unique primary key ,
name varchar not null ,
version varchar not null ,
last_version varchar not null
);
create table if not exists issue (
id serial not null unique primary key ,
--package integer references package(id) ,
modules integer[] default '{}'::integer[],
status integer default 0 ,
"desc" varchar
);
create table if not exists package (
id serial not null unique primary key ,
name varchar not null ,
url varchar ,
modules integer[] default '{}'::integer[],
issues integer[] default '{}'::integer[],
comments integer[] default '{}'::integer[]
);
create unique index module_info on module(name, version);

View File

@@ -1,5 +0,0 @@
drop table if exists dashboard ;
drop table if exists package ;
drop table if exists module ;
drop table if exists issue ;
drop table if exists comment;

View File

@@ -1,39 +0,0 @@
create table if not exists dashboard (
id integer primary key autoincrement not null ,
"uuid" uuid not null unique ,
package integer[] default '{}'
);
create table if not exists comment (
id integer primary key autoincrement not null ,
"text" text ,
package integer not null,
created timestamp not null default current_timestamp ,
updated timestamp default current_timestamp
);
create table if not exists module (
id integer primary key autoincrement not null ,
name varchar not null ,
version varchar not null ,
last_version varchar not null
);
create table if not exists issue (
id serial not null unique primary key ,
--package integer references package(id) ,
modules integer[] default '[]',
status integer default 0 ,
"desc" varchar
);
create table if not exists package (
id integer primary key autoincrement not null ,
name varchar not null ,
url varchar ,
modules integer[] default '[]',
issues integer[] default '[]',
comments integer[] default '[]'
);

View File

@@ -1,34 +1,14 @@
package sqlite
const (
queryPackagesList = `select
id,
name,
url,
comments
modules,
issues,
from package;
`
queryCommentsCreate = `
insert into comment(text) values ($1) returning id;
update package
set comments = json_insert(comments, '$[#]', ( select last_insert_rowid() as id from comment ))
where id = $2 ;
`
queryPackagesCreate = `
insert into package(name, url, modules) values ($1, $2, $3);
`
queryInsMsgGetIDs = `
insert into module(name, version, last_version) values
%s
returning id;
`
queryModulesList = `
select id, name, version, last_version, created, updated from modules;
`
queryCommentsList = `
select id, text, created, updated from comments;
`
queryPackagesProcess = `select id, name, url, comments, modules, issues, created, updated from packages where ROUND((JULIANDAY(CURRENT_TIMESTAMP) - JULIANDAY(last_check)) * 86400) > $1 or last_check is NULL`
queryPackagesModulesCount = `update packages set modules = $2, last_check = CURRENT_TIMESTAMP where id = $1;`
queryPackagesList = `select id, name, url, comments, modules, issues, created, updated from packages;`
queryPackagesLookup = `select id, name, url, comments, modules, issues, created, updated from packages where id = $1;`
queryCommentsCreate = `insert into comments (comment) values ($1) returning id;`
queryPackagesCreate = `insert into packages (name, url) values ($1, $2) returning *;`
queryInsMsgGetIDs = `insert into modules(name, version, last_version) values %s returning id;`
queryModulesList = `select id, name, version, last_version, created, updated from modules;`
queryModulesCreate = `insert into modules (name, version, last_version, package) values ($1, $2, $3, $4) returning *;`
queryCommentsList = `select id, text, created, updated from comments;`
)

View File

@@ -2,130 +2,71 @@ package sqlite
import (
"context"
"database/sql"
"embed"
"errors"
"fmt"
"strings"
"time"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/sqlite"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"go.unistack.org/micro/v4/logger"
"git.unistack.org/unistack-org/pkgdash/internal/models"
"git.unistack.org/unistack-org/pkgdash/internal/storage"
pb "git.unistack.org/unistack-org/pkgdash/proto"
"github.com/jmoiron/sqlx"
"go.unistack.org/micro/v4/logger"
)
const (
pathMigration = `migrations/sqlite`
)
func init() {
storage.RegisterStorage("sqlite", NewStorage())
}
var _ storage.Storage = (*Sqlite)(nil)
type Sqlite struct {
db *sql.DB
fs embed.FS
db *sqlx.DB
}
func NewStorage() func(*sql.DB, embed.FS) interface{} {
return func(db *sql.DB, fs embed.FS) interface{} {
return &Sqlite{db: db, fs: fs}
func NewStorage() func(*sqlx.DB) interface{} {
return func(db *sqlx.DB) interface{} {
return &Sqlite{db: db}
}
}
func (s *Sqlite) MigrateUp() error {
driver, err := sqlite.WithInstance(s.db, &sqlite.Config{
MigrationsTable: sqlite.DefaultMigrationsTable,
DatabaseName: "pkgdash",
})
if err != nil {
return err
}
source, err := iofs.New(s.fs, pathMigration)
if err != nil {
return err
}
// TODO: pass own logger
m, err := migrate.NewWithInstance("fs", source, "pkgdash", driver)
if err != nil {
return err
}
if err = m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
return err
}
return nil
func (s *Sqlite) PackagesDelete(ctx context.Context, req *pb.PackagesDeleteReq) error {
return fmt.Errorf("need implement")
}
func (s *Sqlite) MigrateDown() error {
driver, err := sqlite.WithInstance(s.db, &sqlite.Config{
MigrationsTable: sqlite.DefaultMigrationsTable,
DatabaseName: "pkgdash",
})
if err != nil {
return err
}
source, err := iofs.New(s.fs, pathMigration)
if err != nil {
return err
}
// TODO: pass own logger
m, err := migrate.NewWithInstance("fs", source, "pkgdash", driver)
if err != nil {
return err
}
if err = m.Down(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
return err
}
return nil
func (s *Sqlite) PackagesUpdate(ctx context.Context, req *pb.PackagesUpdateReq) (*models.Package, error) {
return nil, fmt.Errorf("need implement")
}
func (s *Sqlite) PackagesUpdate(ctx context.Context, req *pb.PackagesUpdateReq) error {
panic("need implement")
func (s *Sqlite) PackagesLookup(ctx context.Context, req *pb.PackagesLookupReq) (*models.Package, error) {
pkg := &models.Package{}
err := s.db.GetContext(ctx, pkg, queryPackagesLookup, req.Id)
if err != nil {
return nil, err
}
return pkg, err
}
func (s *Sqlite) PackagesList(ctx context.Context, req *pb.PackagesListReq) ([]*models.Package, error) {
var packages []*models.Package
rows, err := s.db.QueryContext(ctx, queryPackagesList)
err := s.db.SelectContext(ctx, &packages, queryPackagesList)
if err != nil {
return nil, err
}
for ; rows.Err() == nil; rows.Next() {
pkg := &models.Package{}
if err = rows.Scan(
&pkg.ID,
&pkg.Name,
&pkg.URL,
&pkg.Comments,
); err != nil {
_ = rows.Close()
return nil, err
}
packages = append(packages, pkg)
}
if err = rows.Err(); err != nil {
return nil, err
}
if err = rows.Close(); err != nil {
return nil, err
}
return packages, err
return packages, nil
}
func (s *Sqlite) CommentsCreate(ctx context.Context, req *pb.CommentsCreateReq) (id uint64, err error) {
func (s *Sqlite) CommentsDelete(ctx context.Context, req *pb.CommentsDeleteReq) error {
return nil
}
func (s *Sqlite) CommentsCreate(ctx context.Context, req *pb.CommentsCreateReq) (*models.Comment, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
return nil, err
}
defer func() {
@@ -138,85 +79,61 @@ func (s *Sqlite) CommentsCreate(ctx context.Context, req *pb.CommentsCreateReq)
}
}()
if err = tx.QueryRowContext(ctx, queryCommentsCreate, req.Text, req.PackageId).Scan(&id); err != nil {
return id, err
if _, err = tx.ExecContext(ctx, queryCommentsCreate, req.Comment, req.PackageId); err != nil {
return nil, err
}
return id, err
return nil, nil
}
func (s *Sqlite) PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) error {
tx, err := s.db.BeginTx(ctx, nil)
func (s *Sqlite) PackagesProcess(ctx context.Context, td time.Duration) ([]*models.Package, error) {
var packages []*models.Package
err := s.db.SelectContext(ctx, &packages, queryPackagesProcess, td.Seconds())
if err != nil {
return nil, err
}
return packages, nil
}
func (s *Sqlite) PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) (*models.Package, error) {
pkg := &models.Package{}
err := s.db.GetContext(ctx, pkg, queryPackagesCreate, req.Name, req.Url)
if err != nil {
return nil, err
}
return pkg, nil
}
func (s *Sqlite) PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() {
for _, mod := range modules {
err = tx.GetContext(ctx, mod, queryModulesCreate, mod.Name, mod.Version, mod.LastVersion, mod.Package)
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
logger.Errorf(ctx, "AddPackage: unable to rollback: %v", rollbackErr)
}
} else {
err = tx.Commit()
_ = tx.Rollback()
return err
}
}()
}
res, err := tx.ExecContext(ctx, queryPackagesCreate, req.Name, req.Url, pq.Array(req.Modules))
if err != nil {
if _, err = tx.ExecContext(ctx, queryPackagesModulesCount, pkg.ID, len(modules)); err != nil {
_ = tx.Rollback()
return err
}
if aff, affErr := res.RowsAffected(); err != nil {
err = affErr
} else if aff == 0 {
err = errors.New("rows affected is 0")
if err = tx.Commit(); err != nil {
_ = tx.Rollback()
return err
}
return err
return nil
}
func (s *Sqlite) InsertButchModules(ctx context.Context, req []models.Module) ([]uint64, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
logger.Errorf(ctx, "AddPackage: unable to rollback: %v", rollbackErr)
}
} else {
err = tx.Commit()
}
}()
query := generateQuery(req)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer func() {
if err = rows.Close(); err != nil {
return
}
err = rows.Err()
}()
result := make([]uint64, 0)
for rows.Next() {
tmp := uint64(0)
if err = rows.Scan(&tmp); err != nil {
return nil, err
}
result = append(result, tmp)
}
return result, err
}
func (s *Sqlite) GetModule(ctx context.Context, req *pb.ModulesListReq) ([]*models.Module, error) {
func (s *Sqlite) ModulesList(ctx context.Context, req *pb.ModulesListReq) ([]*models.Module, error) {
var err error
var modules []*models.Module
@@ -259,50 +176,14 @@ func (s *Sqlite) GetModule(ctx context.Context, req *pb.ModulesListReq) ([]*mode
func (s *Sqlite) CommentsList(ctx context.Context, req *pb.CommentsListReq) ([]*models.Comment, error) {
var comments []*models.Comment
rows, err := s.db.QueryContext(ctx, queryCommentsList, req.PackageId)
err := s.db.SelectContext(ctx, &comments, queryCommentsList, req.PackageId)
if err != nil {
return nil, err
}
defer func() {
if err = rows.Close(); err != nil {
return
}
err = rows.Err()
}()
for ; rows.Err() == nil; rows.Next() {
com := &models.Comment{}
if err = rows.Scan(
&com.ID,
&com.Text,
&com.Created,
&com.Updated,
); err != nil {
_ = rows.Close()
return nil, err
}
comments = append(comments, com)
}
if err = rows.Err(); err != nil {
return nil, err
}
if err = rows.Close(); err != nil {
return nil, err
}
return comments, nil
}
func convertSliceUInt(arg ...uint64) []interface{} {
result := make([]interface{}, 0, len(arg))
for i := range arg {
result = append(result, arg[i])
}
return result
}
func generateQuery(rsp []models.Module) string {
const pattern = `%c('%s', '%s', '%s')`
build := strings.Builder{}
@@ -315,7 +196,3 @@ func generateQuery(rsp []models.Module) string {
return fmt.Sprintf(queryInsMsgGetIDs, build.String())
}
func generateArrayIneq(count int) string {
return "(?" + strings.Repeat(",?", count-1) + ")"
}

View File

@@ -2,68 +2,43 @@ package storage
import (
"context"
"database/sql"
"embed"
"errors"
"time"
"git.unistack.org/unistack-org/pkgdash/internal/models"
// "git.unistack.org/unistack-org/pkgdash/internal/storage/postgres"
"git.unistack.org/unistack-org/pkgdash/internal/storage/sqlite"
pb "git.unistack.org/unistack-org/pkgdash/proto"
"github.com/jmoiron/sqlx"
)
//go:embed migrations
var fs embed.FS
var storages = map[string]func(*sql.DB, embed.FS) interface{}{
//"postgres": postgres.NewStorage(),
"sqlite": sqlite.NewStorage(),
func RegisterStorage(name string, fn func(*sqlx.DB) interface{}) {
storages[name] = fn
}
type contextKey string
var storeIdent = contextKey("store")
type Migrate interface {
MigrateUp() error
MigrateDown() error
}
var storages = map[string]func(*sqlx.DB) interface{}{}
type Storage interface {
Migrate
PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) error
PackagesProcess(ctx context.Context, td time.Duration) ([]*models.Package, error)
PackagesCreate(ctx context.Context, req *pb.PackagesCreateReq) (*models.Package, error)
PackagesList(ctx context.Context, req *pb.PackagesListReq) ([]*models.Package, error)
PackagesUpdate(ctx context.Context, req *pb.PackagesUpdateReq) error
PackagesLookup(ctx context.Context, req *pb.PackagesLookupReq) (*models.Package, error)
PackagesUpdate(ctx context.Context, req *pb.PackagesUpdateReq) (*models.Package, error)
PackagesDelete(ctx context.Context, req *pb.PackagesDeleteReq) error
CommentsCreate(ctx context.Context, req *pb.CommentsCreateReq) (*models.Comment, error)
CommentsDelete(ctx context.Context, req *pb.CommentsDeleteReq) error
CommentsList(ctx context.Context, req *pb.CommentsListReq) ([]*models.Comment, error)
InsertButchModules(ctx context.Context, req []models.Module) ([]uint64, error)
ModulesList(ctx context.Context, req *pb.ModulesListReq) ([]*models.Module, error)
PackagesModulesCreate(ctx context.Context, pkg *models.Package, modules []*models.Module) error
}
func NewStorage(name string, db *sql.DB) (Storage, error) {
func NewStorage(name string, db *sqlx.DB) (Storage, error) {
function, ok := storages[name]
if !ok {
return nil, errors.New("incorrect name store")
}
store := function(db, fs)
store := function(db)
database, ok := store.(Storage)
if !ok {
return nil, errors.New("dont implements interface Storage")
}
return database, nil
}
func InContext(ctx context.Context, val Storage) context.Context {
return context.WithValue(ctx, storeIdent, val)
}
func FromContext(ctx context.Context) (Storage, error) {
if store, ok := ctx.Value(storeIdent).(Storage); !ok {
return nil, errors.New("empty store")
} else {
return store, nil
}
}