runner runs

pull/2/head
Adam Veldhousen 1 year ago
parent 18b9523c8b
commit 96393815ee
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B

@ -4,3 +4,4 @@ kind 0.18.0
kustomize 5.0.1
kubectl 1.26.3
buf 1.17.0
k9s 0.26.3

@ -1,4 +1,6 @@
go.uber.org/dig v1.0.0/go.mod h1:z+dSd2TP9Usi48jL8M3v63iSBVkiwtVyMKxMZYYauPg=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=

@ -73,3 +73,5 @@ bh_service(service="runner")
k8s_yaml(
kustomize("../env/local")
)
k8s_resource(workload='local-runner', port_forwards=5001)

@ -0,0 +1,16 @@
package catalog
import "time"
type Auction struct {
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
SourceSiteURL string `json:"source_site_url,omitempty"`
SourceSiteName string `json:"source_site_name,omitempty"`
SourceURL string `json:"source_url,omitempty"`
Country string `json:"country,omitempty"`
Province string `json:"province,omitempty"`
ItemCount int `json:"itemCount,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
}

@ -0,0 +1,135 @@
package runner
import (
"context"
"fmt"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"golang.org/x/sync/errgroup"
)
var targetsImpls = map[string]UpcomingAuctionFinder{}
type UpcomingAuctionFinder interface {
fmt.Stringer
Find(ctx context.Context, limit int, results chan<- catalog.Auction) error
}
func RegisterAuctionFinder(finder UpcomingAuctionFinder) {
targetName := finder.String()
if _, ok := targetsImpls[targetName]; ok {
kernel.FatalErr(fmt.Errorf("target %q has already been registered", targetName))
}
targetsImpls[targetName] = finder
}
type (
Domain struct {
Storage
CatalogService
}
CompleteScrapeJobStatus struct {
AuctionCount int
Errors string
}
Storage interface {
CreateScrapeJob(context.Context, string) (ScrapeJob, error)
CompleteScrapeJob(context.Context, int, CompleteScrapeJobStatus) (ScrapeJob, error)
GetJobs(context.Context) ([]ScrapeJob, error)
}
CatalogService interface {
UpdateUpcomingAuction(catalog.Auction) error
}
)
type CatalogServiceStub struct{}
func (css *CatalogServiceStub) UpdateUpcomingAuction(a catalog.Auction) error {
kernel.TraceLog.Printf("Invoke CatalogService[UpdateUpcomingAuction](%+v)", a)
return nil
}
type (
FindNewUpcomingInput struct {
TargetSite string
}
FindNewUpcomingOutput struct {
Job ScrapeJob
}
)
func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
finder, ok := targetsImpls[in.TargetSite]
if !ok {
err = fmt.Errorf("could not find target matching name")
return
}
if out.Job, err = domain.Storage.CreateScrapeJob(ctx, in.TargetSite); err != nil {
err = fmt.Errorf("could not create new scrape job record: %w", err)
return
}
kernel.InfoLog.Printf("Scrape Job %q starting", out.Job.ID)
found := make(chan catalog.Auction, 64)
errGroup, innerCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return finder.Find(innerCtx, 0, found)
})
count := 0
for {
auction, ok := <-found
if !ok {
break
}
count++
errGroup.Go(func() error {
return domain.CatalogService.UpdateUpcomingAuction(auction)
})
}
if err = errGroup.Wait(); err != nil {
err = fmt.Errorf("an issue occurred while finding upcoming items iteration: %w", err)
}
if out.Job, err = domain.Storage.CompleteScrapeJob(ctx, out.Job.ID, CompleteScrapeJobStatus{
AuctionCount: count,
Errors: err.Error(),
}); err != nil {
err = fmt.Errorf("Could not complete scrape job, failing: %w", err)
return
}
kernel.InfoLog.Printf("Scrape Job %q completed in %v.", out.Job.ID, out.Job.Completed.Sub(out.Job.Started))
return
}
type (
GetJobsInput struct{}
GetJobsOutput struct {
Jobs []ScrapeJob
}
)
func (domain Domain) GetJobs(ctx context.Context, in GetJobsInput) (out GetJobsOutput, err error) {
scrapeJobs, err := domain.Storage.GetJobs(ctx)
if err != nil {
err = fmt.Errorf("could not fetch jobs from storage: %w", err)
return
}
out = GetJobsOutput{Jobs: scrapeJobs}
return
}

@ -0,0 +1,12 @@
package runner
import "time"
type ScrapeJob struct {
ID int `json:"id,omitempty"`
Started time.Time `json:"startedTs,omitempty"`
Completed time.Time `json:"completedTs,omitempty"`
TargetSite string `json:"target_site,omitempty"`
AuctionsFound int `json:"auctions_found,omitempty"`
Errors string `json:"errors,omitempty"`
}

@ -36,8 +36,10 @@ func Run(parent context.Context, app App) {
defer canceller()
select {
case <-sig:
case signal := <-sig:
TraceLog.Printf("[SHUTDOWN TRIGGERED] got shutdown signal: %v", signal)
case <-ctx.Done():
TraceLog.Println("[SHUTDOWN TRIGGERED] context exited unexpectedly")
}
InfoLog.Println("Shutting down service")

@ -29,10 +29,11 @@ func StartGRPCServer(port int, sb ServerBuilder, opts ...grpc.ServerOption) (err
sb(grpcServerInstance)
InfoLog.Printf("Listening on 0.0.0.0:%d", port)
return grpcServerInstance.Serve(listener)
}
func StopGRPCServer() error {
grpcServerInstance.GracefulStop()
return listener.Close()
return nil
}

@ -5,6 +5,7 @@ go 1.19
require (
git.vdhsn.com/barretthousen/barretthousen/src/lib v1.0.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
google.golang.org/genproto v0.0.0-20230223222841-637eb2293923
google.golang.org/protobuf v1.28.1
)

@ -165,6 +165,7 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

@ -1,5 +1,6 @@
START TRANSACTION;
DROP ROLE IF EXISTS "runner-service";
CREATE USER "runner-service" WITH PASSWORD 'runner-service';
CREATE SCHEMA IF NOT EXISTS runner;

@ -0,0 +1,33 @@
package data
import (
"context"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
)
type PGRunnerStorage struct {
*postgres.Queries
}
func (db *PGRunnerStorage) CreateScrapeJob(ctx context.Context, target string) (sj runner.ScrapeJob, err error) {
rsj, err := db.Queries.CreateScrapeJob(ctx, target)
if err != nil {
return runner.ScrapeJob{}, err
}
return runner.ScrapeJob{
ID: int(rsj.ID),
Started: rsj.Startedts,
TargetSite: rsj.Targetsitename,
}, nil
}
func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status runner.CompleteScrapeJobStatus) (sj runner.ScrapeJob, err error) {
return
}
func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []runner.ScrapeJob, err error) {
return
}

@ -1,54 +0,0 @@
package domain
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
)
const LiveAuctioneers_Target = TargetSite("liveauctioneers")
type LA_UpcomingResults struct {
Payload struct {
Results struct{} `json:"results"`
} `json:"payload"`
}
// TODO: build liveauctioneers api client to put the two calls behind
func (domain RunnerDomain) FindNewUpcoming_LiveAuctioneers(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
req, _ := http.NewRequestWithContext(
ctx,
http.MethodGet,
"https://search-party-prod.liveauctioneers.com/search/catalogsearch?c=20170802&client=web&client_version=5.0.0&excludedHouses=%5B%5D&max_facet_values=3&offset=300&pagesize=24&sort=saleStart&pageSize=24",
nil)
req.Header.Set("User-Agent", "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/112.0")
req.Header.Set("Accept", "*/*")
req.Header.Set("Accept-Encoding", "gzip")
req.Header.Set("Cache-Control", "no-cache")
var res *http.Response
if res, err = http.DefaultClient.Do(req); err != nil {
return
}
if res.StatusCode > 299 {
err = fmt.Errorf("Got a bad http status: %d", res.StatusCode)
return
}
defer res.Body.Close()
var data []byte
if data, err = ioutil.ReadAll(res.Body); err != nil {
err = fmt.Errorf("could not read response body: %w", err)
return
}
kernel.TraceLog.Printf("%s", data)
// curl 'https://item-api-prod.liveauctioneers.com/spa/small/catalogs?c=20170802' -X POST -H 'User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/112.0' -H 'Accept: */*' -H 'Accept-Language: en-US,en;q=0.5' -H 'Accept-Encoding: gzip, deflate, br' -H 'Content-Type: application/json' -H 'Origin: https://www.liveauctioneers.com' -H 'DNT: 1' -H 'Alt-Used: item-api-prod.liveauctioneers.com' -H 'Connection: keep-alive' -H 'Referer: https://www.liveauctioneers.com/' -H 'Cookie: rbzid=MCMBxY4NXXt2ojSy6pRxVB6kz2RUIkGZu9j9g6FRdpdE6Ztu+RgO3PRCsEvDlJCDKpgIO+cUy/aHKCsLXjjlcUrGVF8GIo0D0Bcg8XniH2tGiuCa1MOrur3D6+lolfzerP1JI/lxbQfoCSnL30uFX4L4uqA0Qhs56Qo9wpkNDCrZA2cTbRPuCXrFyczps4bN4veZ1cOne1q4jMgxiNI75w==; rbzsessionid=a3e7079dfe6cc10e0b89f88d176b53ff; join-modal-last-seen=2023-04-26; pagination-page-size=120' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Pragma: no-cache' -H 'Cache-Control: no-cache' -H 'TE: trailers' --data-raw '{"ids":[285693,284926,285574,285404,281011,285610,285838,284614,285043,285729,285148,285016,283446,280796,285031,284245,285775,284879,285554,284055,284833,284818,285920,284969,285107,285165,285924,285608,285732,284849,284768,286038,285188,285862,285375,284643,285733,285411,285074,285731,284784,281479,284758,283547,284368,284489,284723,285406,284681,285183,284751,283883,284657,283320,283130,284408,285539,285849,285583,284462,285546,285134,285867,282513,285082,284795,284642,285230,285233,285236,284801,285776,279905,284015,283432,285513,285925,284880,284207,284079,285106,285017,284243,286036,282291,285778,284747,285407,285828,284819,284295,282099,285044,285734,281896,285149,285533,284733,285032,285287,284574,285052,285555,284683,284834,284970,285929,284196,285108,285166,285737,284850,286044,284769,286039,285189]}'
return
}

@ -1,43 +0,0 @@
package domain
import (
"context"
"reflect"
"testing"
)
func TestRunnerDomain_FindNewUpcoming_LiveAuctioneers(t *testing.T) {
type fields struct {
storage RunnerStorage
}
type args struct {
ctx context.Context
in FindNewUpcomingInput
}
tests := []struct {
name string
fields fields
args args
wantOut FindNewUpcomingOutput
wantErr bool
}{
{
name: "test run",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
domain := RunnerDomain{
storage: tt.fields.storage,
}
gotOut, err := domain.FindNewUpcoming_LiveAuctioneers(tt.args.ctx, tt.args.in)
if (err != nil) != tt.wantErr {
t.Errorf("RunnerDomain.FindNewUpcoming_LiveAuctioneers() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotOut, tt.wantOut) {
t.Errorf("RunnerDomain.FindNewUpcoming_LiveAuctioneers() = %v, want %v", gotOut, tt.wantOut)
}
})
}
}

@ -0,0 +1,209 @@
package domain
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
)
func init() {
runner.RegisterAuctionFinder(
LAAuctionFinder("liveauctioneers"),
)
}
type LAAuctionFinder string
func (la LAAuctionFinder) String() string {
return string(la)
}
func (la LAAuctionFinder) Find(ctx context.Context, limit int, results chan<- catalog.Auction) (err error) {
defer close(results)
var fetched, total int
if limit == 0 {
limit = -1
}
for fetched < limit {
var ids LACatalogIDs
if ids, total, err = LAGetUpcomingSaleIDs(ctx, GetUpcomingSaleIDsInput{
Page: 0,
Limit: 128,
}); err != nil {
return err
}
if limit <= 0 {
limit = total
}
var auctions []catalog.Auction
if auctions, err = LAGetSaleInfo(ctx, ids); err != nil {
return err
}
fetched += len(auctions)
for _, a := range auctions {
results <- a
}
}
return
}
type GetUpcomingSaleIDsInput struct {
Page uint
Limit uint
}
func LAGetUpcomingSaleIDs(ctx context.Context, in GetUpcomingSaleIDsInput) (ids LACatalogIDs, total int, err error) {
if in.Limit == 0 {
in.Limit = 128
}
req, _ := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf(
"https://search-party-prod.liveauctioneers.com/search/catalogsearch?c=20170802&client=web&client_version=5.0.0&excludedHouses=[]&max_facet_values=0&offset=%d&sort=saleStart&pageSize=%d",
in.Page,
in.Limit,
),
nil)
req.Header.Set("User-Agent", "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/112.0")
req.Header.Set("Accept", "*/*")
req.Header.Set("Accept-Encoding", "gzip")
req.Header.Set("Cache-Control", "no-cache")
var res *http.Response
if res, err = http.DefaultClient.Do(req); err != nil {
return
}
if res.StatusCode > 299 {
err = fmt.Errorf("Got a bad http status: %d", res.StatusCode)
return
}
defer res.Body.Close()
var searchResults struct {
Payload struct {
Count int `json:"count"`
Results []struct {
ID int `json:"catid"`
} `json:"results"`
} `json:"payload"`
}
if err = json.NewDecoder(res.Body).Decode(&searchResults); err != nil {
return
}
if searchResults.Payload.Results == nil {
return
}
total = searchResults.Payload.Count
ids = LACatalogIDs(make([]int, len(searchResults.Payload.Results)))
for idx, result := range searchResults.Payload.Results {
ids[idx] = result.ID
}
return
}
type LACatalogIDs []int
func (cip LACatalogIDs) String() string {
sb := &strings.Builder{}
sb.WriteString("{\"ids\":[")
for idx, catID := range cip {
fmt.Fprintf(sb, "%d", catID)
if idx < len(cip)-1 {
sb.WriteString(",")
}
}
sb.WriteString("]}")
return sb.String()
}
func LAGetSaleInfo(ctx context.Context, catIDs LACatalogIDs) (results []catalog.Auction, err error) {
if catIDs == nil || len(catIDs) == 0 {
return
}
req, _ := http.NewRequestWithContext(ctx, http.MethodPost,
"https://item-api-prod.liveauctioneers.com/spa/small/catalogs",
strings.NewReader(catIDs.String()))
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:97.0) Gecko/20100101 Firefox/97.0")
req.Header.Set("Content-Type", "application/json")
var res *http.Response
if res, err = http.DefaultClient.Do(req); err != nil {
err = fmt.Errorf("could not complete request: %w", err)
return
}
if res.StatusCode < 300 {
err = fmt.Errorf("got non 200 status: %d", res.StatusCode)
return
}
defer res.Body.Close()
var apiResults struct {
Data struct {
Catalogs []struct {
ID int `json:"catid"`
Title string `json:"title"`
Description string `json:"description"`
ItemCount int `json:"lotsListed"`
SaleStartTS int64 `json:"saleStartTs"`
Address struct {
CountryCode string `json:"country"`
Lat float64 `json:"lat"`
Long float64 `json:"lng"`
State string `json:"state"`
City string `json:"city"`
} `json:"address"`
} `json:"catalogs"`
} `json:"data"`
}
if err = json.NewDecoder(res.Body).Decode(&apiResults); err != nil {
err = fmt.Errorf("could not parse response: %w", err)
return
}
results = make([]catalog.Auction, len(apiResults.Data.Catalogs))
for idx, c := range apiResults.Data.Catalogs {
results[idx] = catalog.Auction{
Title: c.Title,
Description: c.Description,
SourceSiteURL: "https://www.liveauctioneers.com",
SourceSiteName: "Live Auctioneers",
SourceURL: fmt.Sprintf("https://www.liveauctioneers.com/catalog/%d", c.ID),
Start: time.Unix(c.SaleStartTS, 0),
End: time.Unix(c.SaleStartTS, 0).Add(time.Hour * 8),
ItemCount: c.ItemCount,
Country: c.Address.CountryCode,
Province: c.Address.City,
}
}
return
}

@ -0,0 +1,30 @@
package domain
import "testing"
func Test_catIDPayload_String(t *testing.T) {
tests := []struct {
name string
cip LACatalogIDs
want string
}{
{
name: "happy path",
cip: LACatalogIDs{283257, 283882, 284163},
want: "{\"ids\":[283257,283882,284163]}",
},
{
name: "no items",
cip: LACatalogIDs{},
want: "{\"ids\":[]}",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.cip.String(); got != tt.want {
t.Errorf("catIDPayload.String() = %v, want %v", got, tt.want)
}
})
}
}

@ -1,72 +0,0 @@
package domain
import (
"context"
"fmt"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/api"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"google.golang.org/protobuf/types/known/timestamppb"
)
type RunnerStorage interface {
CreateScrapeJob(context.Context, string) (postgres.RunnerScrapejob, error)
GetJobs(context.Context) ([]postgres.RunnerScrapejob, error)
}
type RunnerDomain struct {
storage RunnerStorage
}
type (
FindNewUpcomingInput struct {
TargetSite string
}
FindNewUpcomingOutput struct{}
TargetSite string
)
func (domain RunnerDomain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
if in.TargetSite == string(LiveAuctioneers_Target) {
out, err = domain.FindNewUpcoming_LiveAuctioneers(ctx, in)
}
return
}
type (
GetJobsInput struct{}
GetJobsOutput struct {
Jobs []*api.JobResult
}
)
func (domain RunnerDomain) GetJobs(ctx context.Context, in GetJobsInput) (out GetJobsOutput, err error) {
scrapeJobs, err := domain.storage.GetJobs(ctx)
if err != nil {
err = fmt.Errorf("could not fetch jobs from storage: %w", err)
return
}
out = GetJobsOutput{
Jobs: []*api.JobResult{},
}
for _, sj := range scrapeJobs {
jr := &api.JobResult{
Id: sj.ID,
AuctionsFound: sj.Auctionsfound,
Errors: sj.Errors,
TargetSiteName: sj.Targetsitename,
CreatedTs: timestamppb.New(sj.Startedts),
}
if sj.Completedts.Valid {
jr.CompletedTs = timestamppb.New(sj.Completedts.Time)
}
out.Jobs = append(out.Jobs, jr)
}
return
}

@ -1,21 +1,26 @@
package internal
import (
"context"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/api"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func NewRunnerServer(grpcServer grpc.ServiceRegistrar) {
api.RegisterRunnerServer(grpcServer, &runnerHandler{})
func NewRunnerServer(d *runner.Domain) func(grpcServer grpc.ServiceRegistrar) {
return func(grpcServer grpc.ServiceRegistrar) {
api.RegisterRunnerServer(grpcServer, &runnerHandler{domain: d})
}
}
type runnerHandler struct {
api.UnimplementedRunnerServer
q *postgres.Queries
domain *runner.Domain
}
// func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *runner.FindNewUpcomingCommand) (*runner.JobResult, error) {
// return nil, status.Errorf(codes.Unimplemented, "method FindNewUpcoming not implemented")
// }
func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *api.FindNewUpcomingCommand) (*api.JobResult, error) {
return nil, status.Errorf(codes.Unimplemented, "method FindNewUpcoming not implemented")
}

@ -7,8 +7,15 @@ import (
_ "embed"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"github.com/jackc/pgx/v4"
"go.uber.org/dig"
_ "git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain/liveauctioneers"
)
type (
@ -45,7 +52,37 @@ func (app *RunnerApp) Start(ctx context.Context) error {
return nil
}
return kernel.StartGRPCServer(app.Port, internal.NewRunnerServer)
ioc := dig.New()
ioc.Provide(func() kernel.PostgresConnection {
return app.Postgres
})
ioc.Provide(func(pgCfg kernel.PostgresConnection) (*pgx.Conn, error) {
return kernel.NewDBConnection(ctx, pgCfg)
})
ioc.Provide(func(pgConn *pgx.Conn) *postgres.Queries {
return postgres.New(pgConn)
})
ioc.Provide(func(queries *postgres.Queries) runner.Storage {
return &data.PGRunnerStorage{queries}
})
ioc.Provide(func() runner.CatalogService {
return &runner.CatalogServiceStub{}
})
ioc.Provide(func(css runner.CatalogService, rs runner.Storage) *runner.Domain {
return &runner.Domain{
Storage: rs,
CatalogService: css,
}
})
return ioc.Invoke(func(d *runner.Domain) error {
return kernel.StartGRPCServer(app.Port, internal.NewRunnerServer(d))
})
}
func (app *RunnerApp) OnStop(ctx context.Context) {

Loading…
Cancel
Save