wip: attempt to get some protos and domain logic up

pull/3/head
Adam Veldhousen 2023-05-08 19:56:26 -05:00
parent eac6118b7e
commit 03bbcc461b
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B
5 changed files with 150 additions and 36 deletions

View File

@ -14,7 +14,7 @@ service Catalog {
}; };
} }
rpc CreateUpcoming(UpcomingAuction) returns (CreateUpcomingResult) { rpc ImportAuction(ImportAuctionMessage) returns (AuctionCreated) {
} }
} }
@ -28,7 +28,7 @@ message AuctionSearchCriteria {
message GetUpcomingResult { message GetUpcomingResult {
int32 page = 1; int32 page = 1;
int32 total = 2; int32 total = 2;
repeated Auction catalogs = 3; repeated Auction results = 3;
} }
message Auction { message Auction {
@ -45,7 +45,7 @@ message Auction {
string province = 11; string province = 11;
} }
message UpcomingAuction { message ImportAuctionMessage {
int32 items = 1; int32 items = 1;
google.protobuf.Timestamp start = 2; google.protobuf.Timestamp start = 2;
google.protobuf.Timestamp end = 3; google.protobuf.Timestamp end = 3;
@ -58,6 +58,7 @@ message UpcomingAuction {
string province = 10; string province = 10;
} }
message CreateUpcomingResult { message AuctionCreated {
bool duplicate = 1; bool duplicate = 1;
Auction auction = 2;
} }

View File

@ -0,0 +1,54 @@
package data
import (
"context"
"fmt"
"time"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
)
type PGCatalogStorage struct {
Queries *postgres.Queries
}
type UpcomingQuery struct {
Term string
StartDateFilter time.Time
}
func (ps *PGCatalogStorage) GetUpcoming(ctx context.Context, q UpcomingQuery) (results []catalog.Auction, total int, err error) {
var pgResults []postgres.CatalogUpcomingAuction
if pgResults, err = ps.Queries.GetUpcoming(ctx, postgres.GetUpcomingParams{
PhrasetoTsquery: q.Term,
Limit: 64,
}); err != nil {
err = fmt.Errorf("could not get upcoming auctions from pg: %w", err)
return
}
results = make([]catalog.Auction, len(pgResults))
for idx, row := range pgResults {
results[idx] = catalog.Auction{
Title: row.Title,
Description: row.Description,
SourceSiteURL: row.Sourcesiteurl,
SourceSiteName: row.Sourcesitename,
SourceURL: row.Sourcesiteurl,
Country: row.Country,
Province: row.Province,
ItemCount: int(row.Itemcount),
Start: row.Starttime,
// TODO: check if null and set if not
// End: row.Endtime.Time,
}
}
return
}
func (ps *PGCatalogStorage) CreateUpcoming(ctx context.Context, a catalog.Auction) error {
return nil
}

View File

@ -6,6 +6,9 @@ import (
"flag" "flag"
"fmt" "fmt"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel" "git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
"go.uber.org/dig" "go.uber.org/dig"
@ -60,43 +63,35 @@ func (app *catalogApp) Start(ctx context.Context) error {
return err return err
} }
// if err = ioc.Provide(func(pgConn *pgx.Conn) *postgres.Queries { if err = ioc.Provide(func(pgConn *pgx.Conn) *postgres.Queries {
// return postgres.New(pgConn) return postgres.New(pgConn)
// }); err != nil { }); err != nil {
// return err return err
// } }
// if err = ioc.Provide(func(queries *postgres.Queries) catalog.Storage { if err = ioc.Provide(func(queries *postgres.Queries) catalog.Storage {
// return &data.PGcatalogStorage{Queries: queries} return &data.PGCatalogStorage{Queries: queries}
// }); err != nil { }); err != nil {
// return err return err
// } }
// if err = ioc.Provide(func() catalog.CatalogService { if err = ioc.Provide(func(rs catalog.Storage) *catalog.Domain {
// return &catalog.CatalogServiceStub{} return &catalog.Domain{
// }); err != nil { Storage: rs,
// return err }
// } }); err != nil {
return err
}
// if err = ioc.Provide(func(css catalog.CatalogService, rs runner.Storage) *runner.Domain { return ioc.Invoke(func(d *catalog.Domain) error {
// return &catalog.Domain{ catalogService := internal.NewCatalogServer(d)
// Storage: rs,
// CatalogService: css,
// }
// }); err != nil {
// return err
// }
// return ioc.Invoke(func(d *catalog.Domain) error { if _, err := kernel.StartGRPCServer(ctx, app.Port, catalogService); err != nil {
// catalogService := internal.NewRunnerServer(d) return err
}
// if _, err := kernel.StartGRPCServer(ctx, app.Port, catalogService); err != nil { return nil
// return err })
// }
// return nil
// })
return nil
} }
func (app *catalogApp) OnStop(ctx context.Context) { func (app *catalogApp) OnStop(ctx context.Context) {

View File

@ -0,0 +1,63 @@
package catalog
import (
"context"
"errors"
"fmt"
"time"
)
var ErrDuplicateAuctionImported = errors.New("this auction's fingerprint matches one that has already been imported")
type (
Domain struct {
Storage
}
Storage interface {
GetUpcoming(context.Context, UpcomingQuery) ([]Auction, int, error)
CreateUpcoming(context.Context, Auction) error
}
UpcomingQuery struct {
Term string
StartDateFilter time.Time
}
ImportAuctionMessage struct {
Auction
}
AuctionCreated struct {
Auction
Duplicate bool
}
UpcomingResults struct {
Page int
Total int
Results []Auction
}
)
func (d *Domain) GetUpcoming(ctx context.Context, q UpcomingQuery) (results UpcomingResults, err error) {
if results.Results, results.Total, err = d.Storage.GetUpcoming(ctx, q); err != nil {
err = fmt.Errorf("could not get upcoming from storage: %w", err)
return
}
return
}
func (d *Domain) ImportAuction(ctx context.Context, in ImportAuctionMessage) (event AuctionCreated, err error) {
if err = d.Storage.CreateUpcoming(ctx, in.Auction); err != nil && !errors.Is(err, ErrDuplicateAuctionImported) {
err = fmt.Errorf("could not import auction: %w", err)
return
}
event = AuctionCreated{
Auction: in.Auction,
Duplicate: errors.Is(err, ErrDuplicateAuctionImported),
}
return
}

View File

@ -47,6 +47,7 @@ func NewDBConnection(ctx context.Context, pg PostgresConnection) (conn *pgx.Conn
return conn, nil return conn, nil
} }
// deprecated
func Migrate(ctx context.Context, pg PostgresConnection, sql string) error { func Migrate(ctx context.Context, pg PostgresConnection, sql string) error {
conn, err := NewDBConnection(ctx, pg) conn, err := NewDBConnection(ctx, pg)
if err != nil { if err != nil {