commitpull/5/head30d2938bd6
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Sat May 6 22:11:09 2023 -0500 runner service finished commit8e33be359a
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Sat Apr 29 17:25:05 2023 -0500 add error handling for runner main commit118d4ffcc6
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Sat Apr 29 17:24:28 2023 -0500 refactor GRPC connection management commit852d0f4131
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Sat Apr 29 17:20:41 2023 -0500 refactor tiltfile and kustomize for better integration commit73fe1eb1d7
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Thu Apr 27 20:31:18 2023 -0500 use full gobin path for buf in make task commit96393815ee
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Thu Apr 20 23:14:17 2023 -0500 runner runs commit18b9523c8b
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Wed Apr 19 21:04:50 2023 -0500 working on scraping from LA commit2699f0c14d
Author: Adam Veldhousen <adamveld12@gmail.com> Date: Wed Apr 19 19:47:51 2023 -0500 runner starts up and runs migrations
parent
6cd624ced1
commit
a20b132280
@ -1,9 +1,10 @@
|
||||
.kubeconfig
|
||||
.vscode
|
||||
|
||||
|
||||
# auto generated files
|
||||
postgres
|
||||
|
||||
*.pb.go
|
||||
*.pb.gw.go
|
||||
*.swagger.json
|
||||
|
||||
src/**/internal/data/postgres/*.go
|
||||
|
@ -1,2 +1,3 @@
|
||||
resources:
|
||||
- ./runner-deployment.yaml
|
||||
- ./proxy-admin-deployment.yaml
|
||||
|
@ -0,0 +1,54 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: proxy-admin
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
service: proxy-admin
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
service: proxy-admin
|
||||
spec:
|
||||
containers:
|
||||
- name: proxy-admin
|
||||
image: barretthousen/service-proxy-admin:latest
|
||||
ports:
|
||||
- containerPort: 80
|
||||
name: http
|
||||
command:
|
||||
- /opt/proxy-admin
|
||||
resources:
|
||||
limits:
|
||||
memory: "64Mi"
|
||||
cpu: "250m"
|
||||
volumeMounts:
|
||||
- name: proxy-admin-config
|
||||
mountPath: /config/
|
||||
volumes:
|
||||
- name: proxy-admin-config
|
||||
configMap:
|
||||
name: proxy-admin-config
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: proxy-admin
|
||||
spec:
|
||||
selector:
|
||||
service: proxy-admin
|
||||
ports:
|
||||
- port: 80
|
||||
targetPort: 80
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: proxy-admin-config
|
||||
data:
|
||||
config.yaml: |
|
||||
log_level: 2
|
||||
port: 80
|
||||
endpoints:
|
||||
runner: local-runner:5001
|
@ -0,0 +1,23 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: runner
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: runner
|
||||
command:
|
||||
- /go/bin/dlv
|
||||
args:
|
||||
- --headless
|
||||
- --listen=0.0.0.0:2345
|
||||
- --api-version=2
|
||||
- --log
|
||||
- --accept-multiclient
|
||||
#- --log-output=rpc,dap
|
||||
- exec
|
||||
- /opt/runner-debug
|
||||
- --continue
|
||||
ports:
|
||||
- containerPort: 2345
|
@ -0,0 +1,6 @@
|
||||
go.uber.org/dig v1.0.0/go.mod h1:z+dSd2TP9Usi48jL8M3v63iSBVkiwtVyMKxMZYYauPg=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
|
@ -0,0 +1,6 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
|
||||
<data-source source="LOCAL" name="bh-db@localhost" uuid="05aba6c0-168c-47e3-b807-951316a3a483">
|
||||
<driver-ref>postgresql</driver-ref>
|
||||
<synchronize>true</synchronize>
|
||||
<jdbc-driver>org.postgresql.Driver</jdbc-driver>
|
||||
<jdbc-url>jdbc:postgresql://localhost:5432/postgres</jdbc-url>
|
||||
<working-dir>$ProjectFileDir$</working-dir>
|
||||
</data-source>
|
||||
</component>
|
||||
</project>
|
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/src.iml" filepath="$PROJECT_DIR$/.idea/src.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="DBE_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
|
||||
</component>
|
||||
</project>
|
@ -1,20 +1,31 @@
|
||||
FROM golang:1.19-alpine as builder
|
||||
FROM golang:1.19-alpine as development
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
ENV SERVICE=${SERVICE}
|
||||
|
||||
RUN go install github.com/go-delve/delve/cmd/dlv@latest
|
||||
|
||||
COPY . /go/src
|
||||
|
||||
WORKDIR /go/src/${SERVICE}
|
||||
|
||||
RUN go mod tidy && go build -v -o /opt/${SERVICE} /go/src/${SERVICE}
|
||||
RUN go mod tidy \
|
||||
&& CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o /opt/${SERVICE} /go/src/${SERVICE} \
|
||||
&& CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -gcflags="-N -l" -v -o /opt/${SERVICE}-debug /go/src/${SERVICE}
|
||||
# && go build -v -gcflags="all=-N -l" -o /opt/${SERVICE}-debug /go/src/${SERVICE}
|
||||
|
||||
ENTRYPOINT ['/go/bin/dlv']
|
||||
|
||||
CMD ['debug', '/go/src/${SERVICE}']
|
||||
|
||||
|
||||
FROM alpine
|
||||
FROM alpine as production
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
ENV SERVICE=${SERVICE}
|
||||
|
||||
COPY --from=builder /opt/${SERVICE} /opt/${SERVICE}
|
||||
COPY --from=development /opt/${SERVICE} /opt/${SERVICE}
|
||||
|
||||
CMD /opt/${SERVICE}
|
||||
ENTRYPOINT ['/opt/${SERVICE}']
|
||||
|
@ -0,0 +1,9 @@
|
||||
FROM golang:1.19-alpine as builder
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
COPY . /go/src
|
||||
|
||||
WORKDIR /go/src/${SERVICE}
|
||||
|
||||
RUN go mod tidy && go build -v -o /opt/${SERVICE} /go/src/${SERVICE}
|
@ -0,0 +1,16 @@
|
||||
package catalog
|
||||
|
||||
import "time"
|
||||
|
||||
type Auction struct {
|
||||
Title string `json:"title,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
SourceSiteURL string `json:"source_site_url,omitempty"`
|
||||
SourceSiteName string `json:"source_site_name,omitempty"`
|
||||
SourceURL string `json:"source_url,omitempty"`
|
||||
Country string `json:"country,omitempty"`
|
||||
Province string `json:"province,omitempty"`
|
||||
ItemCount int `json:"itemCount,omitempty"`
|
||||
Start time.Time `json:"start,omitempty"`
|
||||
End time.Time `json:"end,omitempty"`
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var targetsImpls = map[string]UpcomingAuctionFinder{}
|
||||
|
||||
type UpcomingAuctionFinder interface {
|
||||
fmt.Stringer
|
||||
Find(ctx context.Context, limit int, results chan<- catalog.Auction) error
|
||||
}
|
||||
|
||||
func RegisterAuctionFinder(finder UpcomingAuctionFinder) {
|
||||
targetName := finder.String()
|
||||
if _, ok := targetsImpls[targetName]; ok {
|
||||
kernel.FatalErr(fmt.Errorf("target %q has already been registered", targetName))
|
||||
}
|
||||
|
||||
targetsImpls[targetName] = finder
|
||||
}
|
||||
|
||||
type (
|
||||
Domain struct {
|
||||
Storage
|
||||
CatalogService
|
||||
}
|
||||
|
||||
CompleteScrapeJobStatus struct {
|
||||
AuctionCount int
|
||||
Errors string
|
||||
}
|
||||
|
||||
Storage interface {
|
||||
CreateScrapeJob(context.Context, string) (ScrapeJob, error)
|
||||
CompleteScrapeJob(context.Context, int, CompleteScrapeJobStatus) (ScrapeJob, error)
|
||||
GetJobs(context.Context) ([]ScrapeJob, error)
|
||||
}
|
||||
|
||||
CatalogService interface {
|
||||
UpdateUpcomingAuction(catalog.Auction) error
|
||||
}
|
||||
)
|
||||
|
||||
type CatalogServiceStub struct{}
|
||||
|
||||
func (css *CatalogServiceStub) UpdateUpcomingAuction(a catalog.Auction) error {
|
||||
kernel.TraceLog.Printf("Invoke CatalogService[UpdateUpcomingAuction](%+v)", a)
|
||||
return nil
|
||||
}
|
||||
|
||||
type (
|
||||
FindNewUpcomingInput struct {
|
||||
TargetSite string
|
||||
}
|
||||
|
||||
FindNewUpcomingOutput struct {
|
||||
Job ScrapeJob
|
||||
}
|
||||
)
|
||||
|
||||
func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
|
||||
for k := range targetsImpls {
|
||||
kernel.TraceLog.Printf("Find Target: %q", k)
|
||||
}
|
||||
finder, ok := targetsImpls[in.TargetSite]
|
||||
if !ok {
|
||||
err = fmt.Errorf("could not find target matching name")
|
||||
return
|
||||
}
|
||||
|
||||
if out.Job, err = domain.Storage.CreateScrapeJob(ctx, in.TargetSite); err != nil {
|
||||
err = fmt.Errorf("could not create new scrape job record: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Scrape Job %d starting", out.Job.ID)
|
||||
|
||||
found := make(chan catalog.Auction)
|
||||
errGroup, innerCtx := errgroup.WithContext(ctx)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
return finder.Find(innerCtx, 0, found)
|
||||
})
|
||||
|
||||
count := 0
|
||||
for auction := range found {
|
||||
count++
|
||||
a := auction
|
||||
errGroup.Go(func() error {
|
||||
return domain.CatalogService.UpdateUpcomingAuction(a)
|
||||
})
|
||||
}
|
||||
|
||||
errMsg := ""
|
||||
if err = errGroup.Wait(); err != nil {
|
||||
err = fmt.Errorf("an issue occurred while finding upcoming items iteration: %w", err)
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
if out.Job, err = domain.Storage.CompleteScrapeJob(ctx, out.Job.ID, CompleteScrapeJobStatus{
|
||||
AuctionCount: count,
|
||||
Errors: errMsg,
|
||||
}); err != nil {
|
||||
err = fmt.Errorf("Could not complete scrape job, failing: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Scrape Job %d completed in %v.", out.Job.ID, out.Job.Completed.Sub(out.Job.Started))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type (
|
||||
GetJobsInput struct{}
|
||||
GetJobsOutput struct {
|
||||
Jobs []ScrapeJob
|
||||
}
|
||||
)
|
||||
|
||||
func (domain Domain) GetJobs(ctx context.Context, in GetJobsInput) (out GetJobsOutput, err error) {
|
||||
scrapeJobs, err := domain.Storage.GetJobs(ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not fetch jobs from storage: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
out = GetJobsOutput{Jobs: scrapeJobs}
|
||||
|
||||
return
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package runner
|
||||
|
||||
import "time"
|
||||
|
||||
type ScrapeJob struct {
|
||||
ID int `json:"id,omitempty"`
|
||||
Started time.Time `json:"startedTs,omitempty"`
|
||||
Completed time.Time `json:"completedTs,omitempty"`
|
||||
TargetSite string `json:"target_site,omitempty"`
|
||||
AuctionsFound int `json:"auctions_found,omitempty"`
|
||||
Errors string `json:"errors,omitempty"`
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package kernel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type ServerBuilder func(grpc.ServiceRegistrar, string)
|
||||
|
||||
var (
|
||||
grpcServerInstance *grpc.Server
|
||||
httpServerInstance *http.Server
|
||||
listener net.Listener
|
||||
)
|
||||
|
||||
func StartGRPCServer(ctx context.Context, port int, sb ServerBuilder, opts ...grpc.ServerOption) (endpoint string, err error) {
|
||||
if grpcServerInstance != nil {
|
||||
err = errors.New("There can only be one GRPC server running at a time")
|
||||
return
|
||||
}
|
||||
|
||||
endpoint = fmt.Sprintf("0.0.0.0:%d", port)
|
||||
listener, err = net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
endpoint = ""
|
||||
err = fmt.Errorf("could not start tcp listener: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
grpcServerInstance = grpc.NewServer(opts...)
|
||||
|
||||
sb(grpcServerInstance, endpoint)
|
||||
|
||||
if err = grpcServerInstance.Serve(listener); err != nil {
|
||||
endpoint = ""
|
||||
err = fmt.Errorf("could not serve GRPC server over listener: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
InfoLog.Printf("Listening for GRPC requests on %s", endpoint)
|
||||
return
|
||||
}
|
||||
|
||||
func StopGRPCServer() error {
|
||||
grpcServerInstance.GracefulStop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func StartHTTPServer(ctx context.Context, port int, handler http.Handler) (err error) {
|
||||
if httpServerInstance != nil {
|
||||
err = errors.New("There can only be one HTTP server running at a time")
|
||||
return
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("0.0.0.0:%d", port)
|
||||
listener, err = net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not start tcp listener: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
httpServerInstance = &http.Server{
|
||||
Handler: handler,
|
||||
Addr: endpoint,
|
||||
ReadHeaderTimeout: time.Second * 1,
|
||||
}
|
||||
|
||||
if err = httpServerInstance.Serve(listener); err != nil {
|
||||
err = fmt.Errorf("could not serve http over listener: %w", err)
|
||||
}
|
||||
|
||||
InfoLog.Printf("Listening for HTTP requests on %s", endpoint)
|
||||
return
|
||||
}
|
||||
|
||||
func StopHTTPServer() error {
|
||||
ctx, canceler := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
|
||||
defer canceler()
|
||||
return httpServerInstance.Shutdown(ctx)
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
module git.vdhsn.com/barretthousen/barretthousen/src/proxy-admin
|
||||
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
git.vdhsn.com/barretthousen/barretthousen/src/lib v1.0.0
|
||||
git.vdhsn.com/barretthousen/barretthousen/src/runner v1.0.0
|
||||
)
|
||||
|
||||
replace git.vdhsn.com/barretthousen/barretthousen/src/lib v1.0.0 => ../lib
|
||||
replace git.vdhsn.com/barretthousen/barretthousen/src/runner v1.0.0 => ../runner
|
@ -0,0 +1,59 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/runner/api"
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
type ProxyAdminApp struct {
|
||||
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
|
||||
Port int `yaml:"port" env:"PROXY_ADMIN_PORT"`
|
||||
Endpoints struct {
|
||||
Runner string `yaml:"runner" env:"RUNNER_ENDPOINT"`
|
||||
} `yaml:"endpoints" env:"PROXY_ADMIN_SERVICES"`
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) Start(ctx context.Context) error {
|
||||
grpcMux := runtime.NewServeMux()
|
||||
err := api.RegisterRunnerHandlerFromEndpoint(ctx, grpcMux, app.Endpoints.Runner, []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kernel.TraceLog.Printf("%+v", app)
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf("0.0.0.0:%d", app.Port),
|
||||
ReadHeaderTimeout: time.Second,
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
kernel.TraceLog.Printf("{ \"Client\": \"%s\", \"Path\":\"%s\"} ", r.RemoteAddr, r.URL.Path)
|
||||
grpcMux.ServeHTTP(w, r)
|
||||
}),
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Starting HTTP proxy @ %q", httpServer.Addr)
|
||||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) OnStop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) GetLogLevel() kernel.LogLevel { return app.LogLevel }
|
||||
|
||||
func main() {
|
||||
kernel.Run(context.Background(), &ProxyAdminApp{
|
||||
LogLevel: kernel.LevelTrace,
|
||||
Port: 80,
|
||||
})
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
log_level: ERROR
|
||||
|
||||
service: {}
|
@ -0,0 +1,33 @@
|
||||
-- name: GetJobs :many
|
||||
SELECT id,
|
||||
startedTs,
|
||||
completedTs,
|
||||
targetSiteName,
|
||||
auctionsFound,
|
||||
errors
|
||||
FROM runner.scrapejob;
|
||||
|
||||
-- name: GetJobByID :one
|
||||
SELECT id,
|
||||
startedTs,
|
||||
completedTs,
|
||||
targetSiteName,
|
||||
auctionsFound,
|
||||
errors
|
||||
FROM runner.scrapejob
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: CreateScrapeJob :one
|
||||
INSERT INTO runner.scrapejob (
|
||||
targetSiteName
|
||||
) VALUES (
|
||||
$1
|
||||
) RETURNING *;
|
||||
|
||||
|
||||
-- name: CompleteScrapeJob :exec
|
||||
UPDATE runner.scrapejob SET
|
||||
completedTs = $2,
|
||||
auctionsFound = $3,
|
||||
errors = $4
|
||||
WHERE id = $1;
|
@ -0,0 +1,31 @@
|
||||
START TRANSACTION;
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS runner;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS runner.scrapejob (
|
||||
id SERIAL PRIMARY KEY,
|
||||
startedTs TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
completedTs TIMESTAMP,
|
||||
targetSiteName VARCHAR(512) NOT NULL,
|
||||
auctionsFound INT NOT NULL DEFAULT 0,
|
||||
errors VARCHAR(5000) NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
DO
|
||||
$do$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles -- SELECT list can be empty for this
|
||||
WHERE rolname = 'runner-service') THEN
|
||||
|
||||
CREATE USER "runner-service" WITH PASSWORD 'runner-service';
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
|
||||
GRANT CONNECT ON DATABASE bh to "runner-service";
|
||||
GRANT USAGE ON SCHEMA runner TO "runner-service";
|
||||
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA runner TO "runner-service";
|
||||
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA runner TO "runner-service";
|
||||
|
||||
COMMIT;
|
@ -0,0 +1,82 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
|
||||
)
|
||||
|
||||
type PGRunnerStorage struct {
|
||||
*postgres.Queries
|
||||
}
|
||||
|
||||
func (db *PGRunnerStorage) CreateScrapeJob(ctx context.Context, target string) (sj runner.ScrapeJob, err error) {
|
||||
rsj, err := db.Queries.CreateScrapeJob(ctx, target)
|
||||
if err != nil {
|
||||
return runner.ScrapeJob{}, err
|
||||
}
|
||||
|
||||
return runner.ScrapeJob{
|
||||
ID: int(rsj.ID),
|
||||
Started: rsj.Startedts,
|
||||
TargetSite: rsj.Targetsitename,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status runner.CompleteScrapeJobStatus) (sj runner.ScrapeJob, err error) {
|
||||
completedTime := time.Now().UTC()
|
||||
if err = db.Queries.CompleteScrapeJob(ctx, postgres.CompleteScrapeJobParams{
|
||||
ID: int32(ID),
|
||||
Completedts: sql.NullTime{
|
||||
Time: completedTime,
|
||||
Valid: true,
|
||||
},
|
||||
Auctionsfound: int32(status.AuctionCount),
|
||||
Errors: status.Errors,
|
||||
}); err != nil {
|
||||
err = fmt.Errorf("could update scrape job in DB: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
var rsj postgres.RunnerScrapejob
|
||||
if rsj, err = db.Queries.GetJobByID(ctx, int32(ID)); err != nil {
|
||||
err = fmt.Errorf("could not get job by ID: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
sj = runner.ScrapeJob{
|
||||
ID: ID,
|
||||
Started: rsj.Startedts,
|
||||
Completed: completedTime,
|
||||
TargetSite: rsj.Targetsitename,
|
||||
AuctionsFound: status.AuctionCount,
|
||||
Errors: status.Errors,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []runner.ScrapeJob, err error) {
|
||||
var jobs []postgres.RunnerScrapejob
|
||||
if jobs, err = db.Queries.GetJobs(ctx); err != nil {
|
||||
err = fmt.Errorf("Couldn't get jobs from DB: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, j := range jobs {
|
||||
results = append(results, runner.ScrapeJob{
|
||||
ID: int(j.ID),
|
||||
Started: j.Startedts,
|
||||
Completed: j.Completedts.Time,
|
||||
TargetSite: j.Targetsitename,
|
||||
AuctionsFound: int(j.Auctionsfound),
|
||||
Errors: j.Errors,
|
||||
})
|
||||
}
|
||||
|
||||
return
|
||||
}
|
@ -0,0 +1,215 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
|
||||
)
|
||||
|
||||
func init() {
|
||||
kernel.TraceLog.Println("Registering AuctionFinder liveauctioneers")
|
||||
runner.RegisterAuctionFinder(
|
||||
LAAuctionFinder("liveauctioneers"),
|
||||
)
|
||||
}
|
||||
|
||||
type LAAuctionFinder string
|
||||
|
||||
func (la LAAuctionFinder) String() string {
|
||||
return string(la)
|
||||
}
|
||||
|
||||
func (la LAAuctionFinder) Find(ctx context.Context, limit int, results chan<- catalog.Auction) (err error) {
|
||||
defer close(results)
|
||||
|
||||
fetched := -1
|
||||
total := 0
|
||||
page := uint(0)
|
||||
|
||||
for fetched < limit {
|
||||
var ids LACatalogIDs
|
||||
if ids, total, err = LAGetUpcomingSaleIDs(ctx, GetUpcomingSaleIDsInput{
|
||||
Page: page,
|
||||
Limit: 128,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if limit <= 0 {
|
||||
limit = total
|
||||
}
|
||||
|
||||
var auctions []catalog.Auction
|
||||
if auctions, err = LAGetSaleInfo(ctx, ids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fetched += len(auctions)
|
||||
for _, a := range auctions {
|
||||
results <- a
|
||||
}
|
||||
|
||||
if fetched >= limit {
|
||||
break
|
||||
}
|
||||
page++
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type GetUpcomingSaleIDsInput struct {
|
||||
Page uint
|
||||
Limit uint
|
||||
}
|
||||
|
||||
func LAGetUpcomingSaleIDs(ctx context.Context, in GetUpcomingSaleIDsInput) (ids LACatalogIDs, total int, err error) {
|
||||
if in.Limit == 0 {
|
||||
in.Limit = 128
|
||||
}
|
||||
|
||||
req, _ := http.NewRequestWithContext(
|
||||
ctx,
|
||||
http.MethodGet,
|
||||
fmt.Sprintf(
|
||||
"https://search-party-prod.liveauctioneers.com/search/catalogsearch?c=20170802&client=web&client_version=5.0.0&excludedHouses=[]&max_facet_values=0&offset=%d&sort=saleStart&pageSize=%d",
|
||||
in.Page,
|
||||
in.Limit,
|
||||
),
|
||||
|
||||
nil)
|
||||
|
||||
req.Header.Set("User-Agent", "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/112.0")
|
||||
req.Header.Set("Accept", "*/*")
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
req.Header.Set("Cache-Control", "no-cache")
|
||||
|
||||
var res *http.Response
|
||||
if res, err = http.DefaultClient.Do(req); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if res.StatusCode > 299 {
|
||||
err = fmt.Errorf("Got a bad http status: %d", res.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
var searchResults struct {
|
||||
Payload struct {
|
||||
Count int `json:"count"`
|
||||
Results []struct {
|
||||
ID int `json:"catid"`
|
||||
} `json:"results"`
|
||||
} `json:"payload"`
|
||||
}
|
||||
|
||||
if err = json.NewDecoder(res.Body).Decode(&searchResults); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if searchResults.Payload.Results == nil {
|
||||
return
|
||||
}
|
||||
|
||||
total = searchResults.Payload.Count
|
||||
|
||||
ids = LACatalogIDs(make([]int, len(searchResults.Payload.Results)))
|
||||
for idx, result := range searchResults.Payload.Results {
|
||||
ids[idx] = result.ID
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type LACatalogIDs []int
|
||||
|
||||
func (cip LACatalogIDs) String() string {
|
||||
sb := &strings.Builder{}
|
||||
sb.WriteString("{\"ids\":[")
|
||||
for idx, catID := range cip {
|
||||
fmt.Fprintf(sb, "%d", catID)
|
||||
if idx < len(cip)-1 {
|
||||
sb.WriteString(",")
|
||||
}
|
||||
}
|
||||
|
||||
sb.WriteString("]}")
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func LAGetSaleInfo(ctx context.Context, catIDs LACatalogIDs) (results []catalog.Auction, err error) {
|
||||
if catIDs == nil || len(catIDs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||
"https://item-api-prod.liveauctioneers.com/spa/small/catalogs",
|
||||
strings.NewReader(catIDs.String()))
|
||||
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:97.0) Gecko/20100101 Firefox/97.0")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
var res *http.Response
|
||||
if res, err = http.DefaultClient.Do(req); err != nil {
|
||||
err = fmt.Errorf("could not complete request: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if res.StatusCode >= 300 {
|
||||
err = fmt.Errorf("got non 200 status: %d", res.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
var apiResults struct {
|
||||
Data struct {
|
||||
Catalogs []struct {
|
||||
ID int `json:"catid"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
ItemCount int `json:"lotsListed"`
|
||||
SaleStartTS int64 `json:"saleStartTs"`
|
||||
Address struct {
|
||||
CountryCode string `json:"country"`
|
||||
Lat float64 `json:"lat"`
|
||||
Long float64 `json:"lng"`
|
||||
State string `json:"state"`
|
||||
City string `json:"city"`
|
||||
} `json:"address"`
|
||||
} `json:"catalogs"`
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
if err = json.NewDecoder(res.Body).Decode(&apiResults); err != nil {
|
||||
err = fmt.Errorf("could not parse response: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
results = make([]catalog.Auction, len(apiResults.Data.Catalogs))
|
||||
for idx, c := range apiResults.Data.Catalogs {
|
||||
results[idx] = catalog.Auction{
|
||||
Title: c.Title,
|
||||
Description: c.Description,
|
||||
SourceSiteURL: "https://www.liveauctioneers.com",
|
||||
SourceSiteName: "Live Auctioneers",
|
||||
SourceURL: fmt.Sprintf("https://www.liveauctioneers.com/catalog/%d", c.ID),
|
||||
Start: time.Unix(c.SaleStartTS, 0),
|
||||
End: time.Unix(c.SaleStartTS, 0).Add(time.Hour * 8),
|
||||
ItemCount: c.ItemCount,
|
||||
Country: c.Address.CountryCode,
|
||||
Province: c.Address.City,
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package domain
|
||||
|
||||
import "testing"
|
||||
|
||||
func Test_catIDPayload_String(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cip LACatalogIDs
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
cip: LACatalogIDs{283257, 283882, 284163},
|
||||
want: "{\"ids\":[283257,283882,284163]}",
|
||||
},
|
||||
{
|
||||
name: "no items",
|
||||
cip: LACatalogIDs{},
|
||||
want: "{\"ids\":[]}",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.cip.String(); got != tt.want {
|
||||
t.Errorf("catIDPayload.String() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/runner/api"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func NewRunnerServer(d *runner.Domain) func(grpcServer grpc.ServiceRegistrar, endpoint string) {
|
||||
return func(grpcServer grpc.ServiceRegistrar, endpoint string) {
|
||||
api.RegisterRunnerServer(grpcServer, &runnerHandler{domain: d})
|
||||
}
|
||||
}
|
||||
|
||||
type runnerHandler struct {
|
||||
api.UnimplementedRunnerServer
|
||||
domain *runner.Domain
|
||||
}
|
||||
|
||||
func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *api.FindNewUpcomingCommand) (*api.JobResult, error) {
|
||||
out, err := rh.domain.FindNewUpcoming(ctx, runner.FindNewUpcomingInput{
|
||||
TargetSite: cmd.TargetSite,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "method FindNewUpcoming failed: %q", err.Error())
|
||||
}
|
||||
|
||||
return &api.JobResult{
|
||||
Id: int32(out.Job.ID),
|
||||
AuctionsFound: int32(out.Job.AuctionsFound),
|
||||
CreatedTs: timestamppb.New(out.Job.Started),
|
||||
CompletedTs: timestamppb.New(out.Job.Completed),
|
||||
TargetSiteName: out.Job.TargetSite,
|
||||
Errors: out.Job.Errors,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rh *runnerHandler) GetJobs(ctx context.Context, cmd *api.GetJobsCommand) (*api.JobsResult, error) {
|
||||
out, err := rh.domain.GetJobs(ctx, runner.GetJobsInput{})
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "method GetJobs failed: %q", err.Error())
|
||||
}
|
||||
|
||||
result := &api.JobsResult{
|
||||
Jobs: []*api.JobResult{},
|
||||
}
|
||||
|
||||
for _, j := range out.Jobs {
|
||||
result.Jobs = append(result.Jobs, &api.JobResult{
|
||||
Id: int32(j.ID),
|
||||
AuctionsFound: int32(j.AuctionsFound),
|
||||
CreatedTs: timestamppb.New(j.Started),
|
||||
CompletedTs: timestamppb.New(j.Completed),
|
||||
TargetSiteName: j.TargetSite,
|
||||
Errors: j.Errors,
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
Loading…
Reference in new issue