rearrange some things for better structure

pull/3/head
Adam Veldhousen 2023-05-10 21:02:43 -05:00
parent 8759a2a61c
commit a424af4fbf
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B
12 changed files with 89 additions and 196 deletions

View File

@ -37,6 +37,10 @@ helm_resource(
def bh_service(service="", port_forwards=[], devMode=True, labels=['2-services'], deps=['postgres']):
# local_resource(
# "{}-go-compile".format(service),
# 'CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o .bin/{}-debug'.format(service)
# )
docker_build(
ref="barretthousen/service-{}".format(service),
dockerfile="./src/Dockerfile.service",
@ -44,7 +48,11 @@ def bh_service(service="", port_forwards=[], devMode=True, labels=['2-services']
target="development" if devMode else "production",
build_args={
"service": service
}
},
ignore=[
"{}/internal/data/postgres/migrations".format(service),
"{}/internal/data/postgres/queries.sql".format(service)
]
)
k8s_resource(

43
src/catalog/api/client.go Normal file
View File

@ -0,0 +1,43 @@
package api
import (
"context"
capi "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api/grpc"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
func NewCatalogServiceClient(conn grpc.ClientConnInterface) *CatalogServiceClient {
return &CatalogServiceClient{
CatalogClient: capi.NewCatalogClient(conn),
}
}
type CatalogServiceClient struct {
capi.CatalogClient
}
func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a catalog.Auction) error {
ac, err := css.ImportAuction(ctx, &capi.ImportAuctionMessage{
Title: a.Title,
Description: a.Description,
SourceSiteURL: a.SourceSiteURL,
SourceSiteName: a.SourceSiteName,
SourceURL: a.SourceURL,
Country: a.Country,
Province: a.Province,
Items: int32(a.ItemCount),
Start: timestamppb.New(a.Start),
End: timestamppb.New(a.End),
})
if err != nil {
return err
}
kernel.TraceLog.Printf("Import Result: IsDupe: %q", ac.GetDuplicate())
return nil
}

View File

@ -5,7 +5,7 @@ package main;
import "google/protobuf/timestamp.proto";
import "google/api/annotations.proto";
option go_package = "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api";
option go_package = "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api/grpc";
service Catalog {
rpc GetUpcoming(AuctionSearchCriteria) returns (GetUpcomingResult) {

View File

@ -1,112 +0,0 @@
-- +goose Up
START TRANSACTION;
CREATE SCHEMA IF NOT EXISTS catalog;
CREATE TABLE IF NOT EXISTS catalog.upcoming_auctions (
id SERIAL PRIMARY KEY,
fingerprint VARCHAR(512) NOT NULL UNIQUE,
title VARCHAR(1024) NOT NULL,
description TEXT DEFAULT ''::TEXT NOT NULL,
startts TIMESTAMP NOT NULL,
endts TIMESTAMP,
itemcount INTEGER DEFAULT 0 NOT NULL,
sourcesiteurl VARCHAR(1024) DEFAULT ''::CHARACTER VARYING NOT NULL,
sourcesitename VARCHAR(256) NOT NULL,
sourceurl VARCHAR(1024) NOT NULL,
country VARCHAR(64) NOT NULL,
province VARCHAR(128) NOT NULL
);
CREATE TABLE IF NOT EXISTS catalog.upcoming_auctions_fts (
id SERIAL PRIMARY KEY,
auctionid INTEGER REFERENCES CATALOG.UPCOMING_AUCTIONS,
title VARCHAR(1024) NOT NULL,
description TEXT NOT NULL,
ts tsvector GENERATED ALWAYS AS ((
setweight(to_tsvector('english', (title)::TEXT), 'A') ||
setweight(to_tsvector('english', (description)::TEXT), 'B')
)) STORED
);
CREATE INDEX ts_idx ON catalog.upcoming_auctions_fts USING GIN(ts);
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION catalog.bh_import_auction(
p_fingerprint VARCHAR(512),
p_title VARCHAR(1024),
p_description TEXT DEFAULT '',
p_startts TIMESTAMP,
p_endts TIMESTAMP,
p_itemcount INTEGER DEFAULT 0,
p_sourcesiteurl VARCHAR(1024),
p_sourcesitename VARCHAR(256),
p_sourceurl VARCHAR(1024),
p_country VARCHAR(64),
p_province VARCHAR(128))
RETURNS integer
LANGUAGE plpgsql AS $BODY$
DECLARE
auction_id integer;
BEGIN
INSERT INTO catalog.upcoming_auctions (
fingerprint,
title,
description,
startts,
endts,
itemcount,
sourcesiteurl,
sourcesitename,
sourceurl,
country,
province
) VALUES (
p_fingerprint,
p_title,
p_description,
p_startts,
p_endts,
p_itemcount,
p_sourcesiteurl,
p_sourcesitename,
p_sourceurl,
p_country,
p_province
) RETURN id INTO auction_id;
INSERT INTO catalog.upcoming_auctions_fts (
auctionid,
title,
description
) VALUES ( auction_id, p_title, p_description);
return auction_id;
END;
$BODY$;
-- +goose StatementEnd
-- +goose StatementBegin
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles -- SELECT list can be empty for this
WHERE rolname = 'catalog-service') THEN
CREATE USER "catalog-service" WITH PASSWORD 'catalog-service';
END IF;
END
$do$;
-- +goose StatementEnd
GRANT CONNECT ON DATABASE bh to "catalog-service";
GRANT USAGE ON SCHEMA catalog TO "catalog-service";
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA catalog TO "catalog-service";
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA catalog TO "catalog-service";
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA catalog TO "catalog-service";
COMMIT;
-- +goose Down

View File

@ -2,7 +2,6 @@ package data
import (
"context"
"database/sql"
"fmt"
"time"
@ -61,10 +60,8 @@ func (ps *PGCatalogStorage) CreateUpcoming(ctx context.Context, a catalog.Auctio
Country: a.Country,
Province: a.Province,
Startts: a.Start,
Endts: sql.NullTime{
Time: a.End,
},
Itemcount: int32(a.ItemCount),
Endts: a.End,
Itemcount: int32(a.ItemCount),
}); err != nil {
return fmt.Errorf("could not save auction to DB: %w", err)
}

View File

@ -5,7 +5,7 @@ import (
"fmt"
"time"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
api "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api/grpc"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"

View File

@ -43,7 +43,7 @@ func main() {
func (app *catalogApp) Start(ctx context.Context) error {
if *migrate {
kernel.InfoLog.Printf("running db migrations on %v", app.DB_Migrate)
if err := kernel.MigrateDB(ctx, app.DB_Migrate, dbMigrateScript, "internal/data/postgres/migrations"); err != nil {
if err := kernel.MigrateDB(ctx, app.DB_Migrate, dbMigrateScript, "catalog"); err != nil {
return fmt.Errorf("could not execute db migration: %w", err)
}

View File

@ -4,11 +4,9 @@ import (
"context"
"fmt"
capi "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)
var targetsImpls = map[string]UpcomingAuctionFinder{}
@ -49,30 +47,8 @@ type (
}
)
type CatalogServiceClient struct {
capi.CatalogClient
}
func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a catalog.Auction) error {
ac, err := css.ImportAuction(ctx, &capi.ImportAuctionMessage{
Title: a.Title,
Description: a.Description,
SourceSiteURL: a.SourceSiteURL,
SourceSiteName: a.SourceSiteName,
SourceURL: a.SourceURL,
Country: a.Country,
Province: a.Province,
Items: int32(a.ItemCount),
Start: timestamppb.New(a.Start),
End: timestamppb.New(a.End),
})
if err != nil {
return err
}
kernel.TraceLog.Printf("Import Result: IsDupe: %q", ac.GetDuplicate())
return nil
type CatalogServiceClient interface {
UpdateUpcomingAuction(context.Context, catalog.Auction) error
}
type (
@ -114,7 +90,10 @@ func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInpu
count++
a := auction
errGroup.Go(func() error {
domain.CatalogService.UpdateUpcomingAuction(ctx, a)
if err := domain.CatalogService.UpdateUpcomingAuction(ctx, a); err != nil {
kernel.TraceLog.Printf("could not import upcoming auction: %s", err.Error())
return err
}
return nil
})
}

View File

@ -70,7 +70,7 @@ func Migrate(ctx context.Context, pg PostgresConnection, sql string) error {
return nil
}
func MigrateDB(ctx context.Context, pg PostgresConnection, migrationFS embed.FS, path string) error {
func MigrateDB(ctx context.Context, pg PostgresConnection, migrationFS embed.FS, migrationKey string) error {
db, err := sql.Open("pgx", pg.String())
if err != nil {
return fmt.Errorf("could not open DB connection: %w", err)
@ -83,6 +83,13 @@ func MigrateDB(ctx context.Context, pg PostgresConnection, migrationFS embed.FS,
goose.SetBaseFS(migrationFS)
if migrationKey == "" {
migrationKey = "public"
}
goose.SetTableName(fmt.Sprintf("bh_%s_migrations", migrationKey))
path := "internal/data/postgres/migrations"
if err := goose.Up(db, path); err != nil {
return err
}

View File

@ -1,31 +0,0 @@
START TRANSACTION;
CREATE SCHEMA IF NOT EXISTS runner;
CREATE TABLE IF NOT EXISTS runner.scrapejob (
id SERIAL PRIMARY KEY,
startedTs TIMESTAMP NOT NULL DEFAULT NOW(),
completedTs TIMESTAMP,
targetSiteName VARCHAR(512) NOT NULL,
auctionsFound INT NOT NULL DEFAULT 0,
errors VARCHAR(5000) NOT NULL DEFAULT ''
);
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles -- SELECT list can be empty for this
WHERE rolname = 'runner-service') THEN
CREATE USER "runner-service" WITH PASSWORD 'runner-service';
END IF;
END
$do$;
GRANT CONNECT ON DATABASE bh to "runner-service";
GRANT USAGE ON SCHEMA runner TO "runner-service";
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA runner TO "runner-service";
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA runner TO "runner-service";
COMMIT;

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"embed"
"flag"
"fmt"
@ -23,33 +24,35 @@ import (
type (
RunnerApp struct {
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
Port int `yaml:"port" env:"RUNNER_PORT"`
DB_Service kernel.PostgresConnection `yaml:"db_service" env:"RUNNER_DB_SERVICE"`
DB_Migrate kernel.PostgresConnection `yaml:"db_migrate" env:"RUNNER_DB_MIGRATE"`
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
Port int `yaml:"port" env:"RUNNER_PORT"`
CatalogEndpoint string `yaml:"catalog_endpoint" env:"RUNNER_CATALOG_ENDPOINT"`
DB_Service kernel.PostgresConnection `yaml:"db_service" env:"RUNNER_DB_SERVICE"`
DB_Migrate kernel.PostgresConnection `yaml:"db_migrate" env:"RUNNER_DB_MIGRATE"`
}
)
var migrate = flag.Bool("migrate", false, "migrates postgres db")
var (
migrate = flag.Bool("migrate", false, "migrates postgres db")
//go:embed internal/data/postgres/schema.sql
var dbMigrateScript string
//go:embed internal/data/postgres/migrations/*.sql
dbMigrateScript embed.FS
)
func main() {
flag.Parse()
kernel.Run(context.Background(), &RunnerApp{
LogLevel: kernel.LevelTrace,
Port: 5001,
CatalogEndpoint: "local-catalog:5001",
LogLevel: kernel.LevelTrace,
Port: 5001,
})
}
func (app *RunnerApp) Start(ctx context.Context) error {
if *migrate {
kernel.InfoLog.Printf("running db migrations on %v", app.DB_Migrate)
kernel.InfoLog.Printf("MIGRATION SCRIPT:\n%q", dbMigrateScript)
if err := kernel.Migrate(ctx, app.DB_Migrate, dbMigrateScript); err != nil {
if err := kernel.MigrateDB(ctx, app.DB_Migrate, dbMigrateScript, "internal/data/postgres/migrations"); err != nil {
return fmt.Errorf("could not execute db migration: %w", err)
}
@ -82,7 +85,7 @@ func (app *RunnerApp) Start(ctx context.Context) error {
return err
}
conn, err := grpc.Dial("local-catalog:5001", grpc.WithTransportCredentials(
conn, err := grpc.Dial(app.CatalogEndpoint, grpc.WithTransportCredentials(
insecure.NewCredentials(),
))
if err != nil {
@ -91,8 +94,7 @@ func (app *RunnerApp) Start(ctx context.Context) error {
defer conn.Close()
if err = ioc.Provide(func() runner.CatalogService {
client := capi.NewCatalogClient(conn)
return &runner.CatalogServiceClient{CatalogClient: client}
return capi.NewCatalogServiceClient(conn)
}); err != nil {
return err
}

View File

@ -1,7 +1,7 @@
version: "2"
sql:
- queries: "runner/internal/data/postgres/queries.sql"
schema: "runner/internal/data/postgres/schema.sql"
schema: "runner/internal/data/postgres/migrations"
engine: "postgresql"
gen:
go: