package postgres import ( "context" "database/sql" "embed" "errors" pb "go.unistack.org/unistack-org/pkgdash/proto/go_generate" "github.com/golang-migrate/migrate/v4" mpgx "github.com/golang-migrate/migrate/v4/database/pgx" "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/lib/pq" "go.unistack.org/micro/v3/logger" "go.unistack.org/unistack-org/pkgdash/config" "go.unistack.org/unistack-org/pkgdash/models" ) const ( pathMigration = `migrations/postgres` ) type Postgres struct { db *sql.DB fs embed.FS } func NewStorage(db *sql.DB) (interface{}, error) { return &Postgres{db: db}, nil } func NewStorageFS(fs embed.FS) func(*sql.DB) (interface{}, error) { return func(db *sql.DB) (interface{}, error) { return &Postgres{db: db, fs: fs}, nil } } func (s *Postgres) MigrateUp() error { driver, err := mpgx.WithInstance(s.db, &mpgx.Config{ MigrationsTable: mpgx.DefaultMigrationsTable, DatabaseName: config.ServiceName, }) if err != nil { return err } source, err := iofs.New(s.fs, pathMigration) if err != nil { return err } // TODO: pass own logger m, err := migrate.NewWithInstance("fs", source, config.ServiceName, driver) if err != nil { return err } if err = m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { return err } return nil } func (s *Postgres) MigrateDown() error { driver, err := mpgx.WithInstance(s.db, &mpgx.Config{ MigrationsTable: mpgx.DefaultMigrationsTable, DatabaseName: config.ServiceName, }) if err != nil { return err } source, err := iofs.New(s.fs, pathMigration) if err != nil { return err } // TODO: pass own logger m, err := migrate.NewWithInstance("fs", source, config.ServiceName, driver) if err != nil { return err } if err = m.Down(); err != nil && !errors.Is(err, migrate.ErrNoChange) { return err } return nil } func (s *Postgres) ListPackage(ctx context.Context) (models.ListPackage, error) { rows, err := s.db.QueryContext(ctx, queryListPackage) if err != nil { return nil, err } defer func() { if err = rows.Close(); err != nil { return } err = rows.Err() }() result := make([]*models.Package, 0) for rows.Next() { tmp := &models.Package{} if err = rows.Scan( &tmp.ID, &tmp.Name, &tmp.URL, pq.Array(&tmp.Comments), ); err != nil { return nil, err } } return result, err } func (s *Postgres) AddComment(ctx context.Context, rsp *pb.AddCommentRsp) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer func() { if err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { logger.Errorf(ctx, "AddComment: unable to rollback: %v", rollbackErr) } } else { err = tx.Commit() } }() res, err := tx.ExecContext(ctx, queryAddComment, rsp.Text, rsp.IdPackage) if err != nil { return err } if aff, affErr := res.RowsAffected(); err != nil { err = affErr } else if aff == 0 { err = errors.New("rows affected is 0") } return err } func (s *Postgres) AddPackage(ctx context.Context, rsp *pb.AddPackageRsp) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer func() { if err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { logger.Errorf(ctx, "AddPackage: unable to rollback: %v", rollbackErr) } } else { err = tx.Commit() } }() res, err := tx.ExecContext(ctx, queryAddPackage, rsp.Name, rsp.Url, pq.Array(rsp.Modules)) if err != nil { return err } if aff, affErr := res.RowsAffected(); err != nil { err = affErr } else if aff == 0 { err = errors.New("rows affected is 0") } return err }