import works

pull/3/head
Adam Veldhousen 2023-05-12 22:57:46 -05:00
parent 6608130298
commit 5b9931068d
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B
16 changed files with 109 additions and 102 deletions

View File

@ -64,7 +64,6 @@ def bh_service(service="", port_forwards=[], migrateDB=False, devMode=True, labe
if migrateDB:
entry_cmd += ['--', '-migrate']
docker_build_with_restart(
ref="barretthousen/service-{}".format(service),
dockerfile="./src/Dockerfile.dev",

View File

@ -9,11 +9,12 @@ namePrefix: local-
patchesStrategicMerge:
- debug-catalog.yaml
- debug-runner.yaml
# patches:
# - target:
# kind: Ingress
# name: proxy-admin
# patch: |-
# - op: replace
# path: /spec/rules/0/host
# value: beta.bh.localhost
patches:
- target:
kind: Ingress
name: proxy-admin
patch: |-
- op: replace
path: /spec/rules/0/host
value: admin.localhost

View File

@ -5,7 +5,6 @@ import (
"time"
capi "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api/grpc"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
@ -34,7 +33,7 @@ type Auction struct {
End time.Time `json:"end,omitempty"`
}
func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a Auction) error {
func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a Auction) (string, error) {
ac, err := css.ImportAuction(ctx, &capi.ImportAuctionMessage{
Items: int32(a.ItemCount),
Start: timestamppb.New(a.Start),
@ -48,10 +47,8 @@ func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a Au
Province: a.Province,
})
if err != nil {
return err
return "", err
}
kernel.TraceLog.Printf("Import Result: IsDupe: %q", ac.GetDuplicate())
return nil
return ac.Auction.GetFingerprint(), nil
}

View File

@ -33,16 +33,17 @@ message GetUpcomingResult {
message Auction {
int32 id = 1;
int32 items = 2;
google.protobuf.Timestamp start = 3;
google.protobuf.Timestamp end = 4;
string title = 5;
string description = 6;
string sourceSiteURL = 7;
string sourceSiteName = 8;
string sourceURL = 9;
string country = 10;
string province = 11;
string fingerprint = 2;
int32 items = 3;
google.protobuf.Timestamp start = 4;
google.protobuf.Timestamp end = 5;
string title = 6;
string description = 7;
string sourceSiteURL = 8;
string sourceSiteName = 9;
string sourceURL = 10;
string country = 11;
string province = 12;
}
message ImportAuctionMessage {

View File

@ -2,6 +2,7 @@ package data
import (
"context"
"crypto/sha512"
"fmt"
"time"
@ -49,11 +50,12 @@ func (ps *PGCatalogStorage) GetUpcoming(ctx context.Context, q domain.UpcomingQu
return
}
func (ps *PGCatalogStorage) CreateUpcoming(ctx context.Context, a domain.Auction) (err error) {
func (ps *PGCatalogStorage) CreateUpcoming(ctx context.Context, a domain.Auction) (fingerprint string, err error) {
var auctionID int32
fingerprint = fmt.Sprintf("%x", sha512.Sum512(append([]byte(a.Title), []byte(a.Description)...)))
if auctionID, err = ps.Queries.ImportAuction(ctx, postgres.ImportAuctionParams{
Fingerprint: fmt.Sprintf("%s-%d", a.Title, time.Now().UTC().UnixMilli()),
Fingerprint: fingerprint,
Title: a.Title,
Description: a.Description,
Sourcesiteurl: a.SourceSiteURL,

View File

@ -18,7 +18,7 @@ type (
Storage interface {
GetUpcoming(context.Context, UpcomingQuery) ([]Auction, int, error)
CreateUpcoming(context.Context, Auction) error
CreateUpcoming(context.Context, Auction) (string, error)
}
UpcomingQuery struct {
@ -52,16 +52,23 @@ func (d *Usecase) GetUpcoming(ctx context.Context, q UpcomingQuery) (results Upc
}
func (d *Usecase) ImportAuction(ctx context.Context, in ImportAuctionMessage) (event AuctionCreated, err error) {
if err = d.Storage.CreateUpcoming(ctx, in.Auction); err != nil && !errors.Is(err, ErrDuplicateAuctionImported) {
if in.Fingerprint, err = d.Storage.CreateUpcoming(ctx, in.Auction); err != nil && !errors.Is(err, ErrDuplicateAuctionImported) {
err = fmt.Errorf("could not import auction: %w", err)
return
}
kernel.TraceLog.Printf("Imported: %+v", in.Auction)
err = nil
event = AuctionCreated{
Auction: in.Auction,
Duplicate: errors.Is(err, ErrDuplicateAuctionImported),
}
status := "✅"
if event.Duplicate {
status = "🚫"
}
kernel.TraceLog.Printf("%s Import Auction [%s]", status, in.Fingerprint)
return
}

View File

@ -84,6 +84,7 @@ func (rh *catalogHandler) ImportAuction(ctx context.Context, cmd *api.ImportAuct
Auction: &api.Auction{
Id: int32(evt.Auction.ID),
Items: int32(evt.Auction.ItemCount),
Fingerprint: evt.Fingerprint,
Start: timestamppb.New(evt.Auction.Start),
End: timestamppb.New(evt.Auction.End),
Title: evt.Auction.Title,

View File

@ -5,13 +5,13 @@ go 1.19
require (
github.com/jackc/pgx/v4 v4.18.1
go.uber.org/automaxprocs v1.5.2
golang.org/x/sync v0.2.0
google.golang.org/grpc v1.55.0
)
require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect

View File

@ -74,6 +74,7 @@ github.com/jackc/pgx/v4 v4.18.1/go.mod h1:FydWkUyadDmdNH/mHnGob881GawxeEm7TcMCzk
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
@ -175,8 +176,6 @@ golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -6,28 +6,28 @@ import (
"fmt"
"time"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain"
)
type PGRunnerStorage struct {
*postgres.Queries
}
func (db *PGRunnerStorage) CreateScrapeJob(ctx context.Context, target string) (sj runner.ScrapeJob, err error) {
func (db *PGRunnerStorage) CreateScrapeJob(ctx context.Context, target string) (sj domain.ScrapeJob, err error) {
rsj, err := db.Queries.CreateScrapeJob(ctx, target)
if err != nil {
return runner.ScrapeJob{}, err
return domain.ScrapeJob{}, err
}
return runner.ScrapeJob{
return domain.ScrapeJob{
ID: int(rsj.ID),
Started: rsj.Startedts,
TargetSite: rsj.Targetsitename,
}, nil
}
func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status runner.CompleteScrapeJobStatus) (sj runner.ScrapeJob, err error) {
func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status domain.CompleteScrapeJobStatus) (sj domain.ScrapeJob, err error) {
completedTime := time.Now().UTC()
if err = db.Queries.CompleteScrapeJob(ctx, postgres.CompleteScrapeJobParams{
ID: int32(ID),
@ -48,7 +48,7 @@ func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status
return
}
sj = runner.ScrapeJob{
sj = domain.ScrapeJob{
ID: ID,
Started: rsj.Startedts,
Completed: completedTime,
@ -60,7 +60,7 @@ func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status
return
}
func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []runner.ScrapeJob, err error) {
func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []domain.ScrapeJob, err error) {
var jobs []postgres.RunnerScrapejob
if jobs, err = db.Queries.GetJobs(ctx); err != nil {
err = fmt.Errorf("Couldn't get jobs from DB: %w", err)
@ -68,7 +68,7 @@ func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []runner.Scrape
}
for _, j := range jobs {
results = append(results, runner.ScrapeJob{
results = append(results, domain.ScrapeJob{
ID: int(j.ID),
Started: j.Startedts,
Completed: j.Completedts.Time,

View File

@ -1,4 +1,4 @@
package domain
package liveauctioneers
import (
"context"
@ -9,13 +9,13 @@ import (
"time"
catalog "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain"
)
func init() {
kernel.TraceLog.Println("Registering AuctionFinder liveauctioneers")
runner.RegisterAuctionFinder(
domain.RegisterAuctionFinder(
LAAuctionFinder("liveauctioneers"),
)
}

View File

@ -1,4 +1,4 @@
package domain
package liveauctioneers
import "testing"

View File

@ -1,8 +1,10 @@
package runner
package domain
import (
"context"
"fmt"
"strings"
"time"
catalog "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
@ -43,15 +45,9 @@ type (
}
CatalogService interface {
UpdateUpcomingAuction(context.Context, catalog.Auction) error
UpdateUpcomingAuction(context.Context, catalog.Auction) (string, error)
}
)
type CatalogServiceClient interface {
UpdateUpcomingAuction(context.Context, catalog.Auction) error
}
type (
FindNewUpcomingInput struct {
TargetSite string
}
@ -78,42 +74,8 @@ func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInpu
kernel.InfoLog.Printf("Scrape Job %d starting", out.Job.ID)
found := make(chan catalog.Auction)
errGroup, innerCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return finder.Find(innerCtx, 0, found)
})
count := 0
for auction := range found {
count++
a := auction
errGroup.Go(func() error {
if err := domain.CatalogService.UpdateUpcomingAuction(ctx, a); err != nil {
kernel.TraceLog.Printf("could not import upcoming auction: %s", err.Error())
return err
}
return nil
})
}
errMsg := ""
if err = errGroup.Wait(); err != nil {
err = fmt.Errorf("an issue occurred while finding upcoming items iteration: %w", err)
errMsg = err.Error()
}
if out.Job, err = domain.Storage.CompleteScrapeJob(ctx, out.Job.ID, CompleteScrapeJobStatus{
AuctionCount: count,
Errors: errMsg,
}); err != nil {
err = fmt.Errorf("Could not complete scrape job, failing: %w", err)
return
}
kernel.InfoLog.Printf("Scrape Job %d completed in %v.", out.Job.ID, out.Job.Completed.Sub(out.Job.Started))
// TODO: make everything after this line async and run after return
go domain.executeScrapeJob(finder, out.Job.ID)
return
}
@ -135,3 +97,41 @@ func (domain Domain) GetJobs(ctx context.Context, in GetJobsInput) (out GetJobsO
return
}
func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int) {
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(time.Minute))
defer cancel()
found := make(chan catalog.Auction)
errGroup, innerCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return finder.Find(innerCtx, 0, found)
})
count := 0
errs := &strings.Builder{}
for auction := range found {
count++
if fingerprint, err := domain.CatalogService.UpdateUpcomingAuction(ctx, auction); err != nil {
kernel.TraceLog.Printf("could not import upcoming auction: %s", err.Error())
fmt.Fprintf(errs, "{ \"AuctionFingerprint\": \"%s\", \"error\": \"%s\" }\n", fingerprint, err.Error())
}
}
if err := errGroup.Wait(); err != nil {
err = fmt.Errorf("an issue occurred while finding upcoming items iteration: %w", err)
fmt.Fprintf(errs, "{ \"error\": \"%s\" }", err.Error())
}
var completedJob ScrapeJob
var err error
if completedJob, err = domain.Storage.CompleteScrapeJob(ctx, jobID, CompleteScrapeJobStatus{
AuctionCount: count,
Errors: errs.String(),
}); err != nil {
kernel.ErrorLog.Printf("Could not complete scrape job, failing: %w", err)
}
kernel.InfoLog.Printf("Scrape Job %d completed in %v.", jobID, completedJob.Completed.Sub(completedJob.Started))
return
}

View File

@ -1,4 +1,4 @@
package runner
package domain
import "time"

View File

@ -3,15 +3,15 @@ package internal
import (
"context"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
api "git.vdhsn.com/barretthousen/barretthousen/src/runner/api/grpc"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
func NewRunnerServer(d *runner.Domain) func(grpcServer grpc.ServiceRegistrar, endpoint string) {
func NewRunnerServer(d *domain.Domain) func(grpcServer grpc.ServiceRegistrar, endpoint string) {
return func(grpcServer grpc.ServiceRegistrar, endpoint string) {
api.RegisterRunnerServer(grpcServer, &runnerHandler{domain: d})
}
@ -19,11 +19,11 @@ func NewRunnerServer(d *runner.Domain) func(grpcServer grpc.ServiceRegistrar, en
type runnerHandler struct {
api.UnimplementedRunnerServer
domain *runner.Domain
domain *domain.Domain
}
func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *api.FindNewUpcomingCommand) (*api.JobResult, error) {
out, err := rh.domain.FindNewUpcoming(ctx, runner.FindNewUpcomingInput{
out, err := rh.domain.FindNewUpcoming(ctx, domain.FindNewUpcomingInput{
TargetSite: cmd.TargetSite,
})
if err != nil {
@ -41,7 +41,7 @@ func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *api.FindNewUp
}
func (rh *runnerHandler) GetJobs(ctx context.Context, cmd *api.GetJobsCommand) (*api.JobsResult, error) {
out, err := rh.domain.GetJobs(ctx, runner.GetJobsInput{})
out, err := rh.domain.GetJobs(ctx, domain.GetJobsInput{})
if err != nil {
return nil, status.Errorf(codes.Internal, "method GetJobs failed: %q", err.Error())
}

View File

@ -7,11 +7,11 @@ import (
"fmt"
capi "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/dig"
"google.golang.org/grpc"
@ -75,7 +75,7 @@ func (app *runnerApp) Start(ctx context.Context) error {
return err
}
if err = ioc.Provide(func(queries *postgres.Queries) runner.Storage {
if err = ioc.Provide(func(queries *postgres.Queries) domain.Storage {
return &data.PGRunnerStorage{Queries: queries}
}); err != nil {
return err
@ -89,14 +89,14 @@ func (app *runnerApp) Start(ctx context.Context) error {
}
defer conn.Close()
if err = ioc.Provide(func() runner.CatalogService {
if err = ioc.Provide(func() domain.CatalogService {
return capi.NewCatalogServiceClient(conn)
}); err != nil {
return err
}
if err = ioc.Provide(func(css runner.CatalogService, rs runner.Storage) *runner.Domain {
return &runner.Domain{
if err = ioc.Provide(func(css domain.CatalogService, rs domain.Storage) *domain.Domain {
return &domain.Domain{
Storage: rs,
CatalogService: css,
}
@ -104,7 +104,7 @@ func (app *runnerApp) Start(ctx context.Context) error {
return err
}
return ioc.Invoke(func(d *runner.Domain) error {
return ioc.Invoke(func(d *domain.Domain) error {
runnerService := internal.NewRunnerServer(d)
if _, err := kernel.StartGRPCServer(ctx, app.Port, runnerService); err != nil {