runner service finished
parent
8e33be359a
commit
30d2938bd6
|
|
@ -5,3 +5,4 @@ kustomize 5.0.1
|
|||
kubectl 1.26.3
|
||||
buf 1.17.0
|
||||
k9s 0.26.3
|
||||
golang 1.19
|
||||
|
|
|
|||
|
|
@ -1,2 +1,3 @@
|
|||
resources:
|
||||
- ./runner-deployment.yaml
|
||||
- ./proxy-admin-deployment.yaml
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: proxy-admin
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
service: proxy-admin
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
service: proxy-admin
|
||||
spec:
|
||||
containers:
|
||||
- name: proxy-admin
|
||||
image: barretthousen/service-proxy-admin:latest
|
||||
ports:
|
||||
- containerPort: 80
|
||||
name: http
|
||||
command:
|
||||
- /opt/proxy-admin
|
||||
resources:
|
||||
limits:
|
||||
memory: "64Mi"
|
||||
cpu: "250m"
|
||||
volumeMounts:
|
||||
- name: proxy-admin-config
|
||||
mountPath: /config/
|
||||
volumes:
|
||||
- name: proxy-admin-config
|
||||
configMap:
|
||||
name: proxy-admin-config
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: proxy-admin
|
||||
spec:
|
||||
selector:
|
||||
service: proxy-admin
|
||||
ports:
|
||||
- port: 80
|
||||
targetPort: 80
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: proxy-admin-config
|
||||
data:
|
||||
config.yaml: |
|
||||
log_level: 2
|
||||
port: 80
|
||||
endpoints:
|
||||
runner: local-runner:5001
|
||||
|
|
@ -6,11 +6,11 @@ spec:
|
|||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
service: bh-runner
|
||||
service: runner
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
service: bh-runner
|
||||
service: runner
|
||||
spec:
|
||||
initContainers:
|
||||
- name: runner-db-migrate
|
||||
|
|
@ -29,6 +29,9 @@ spec:
|
|||
containers:
|
||||
- name: runner
|
||||
image: barretthousen/service-runner:latest
|
||||
ports:
|
||||
- containerPort: 5001
|
||||
name: grpc
|
||||
command:
|
||||
- /opt/runner
|
||||
resources:
|
||||
|
|
@ -42,6 +45,17 @@ spec:
|
|||
- name: runner-config
|
||||
configMap:
|
||||
name: runner-config
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: runner
|
||||
spec:
|
||||
selector:
|
||||
service: runner
|
||||
ports:
|
||||
- port: 5001
|
||||
targetPort: 5001
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
|
|
@ -52,10 +66,17 @@ data:
|
|||
config.yaml: |
|
||||
log_level: 2
|
||||
port: 5001
|
||||
postgres:
|
||||
scheme: postgres
|
||||
port: 5432
|
||||
host: bh-db
|
||||
name: bh
|
||||
user: postgres
|
||||
password: bh-admin
|
||||
db_service:
|
||||
scheme: postgres
|
||||
port: 5432
|
||||
host: bh-db
|
||||
name: bh
|
||||
user: runner-service
|
||||
password: runner-service
|
||||
db_migrate:
|
||||
scheme: postgres
|
||||
port: 5432
|
||||
host: bh-db
|
||||
name: bh
|
||||
user: postgres
|
||||
password: bh-admin
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: runner
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: runner
|
||||
command:
|
||||
- /go/bin/dlv
|
||||
args:
|
||||
- --headless
|
||||
- --listen=0.0.0.0:2345
|
||||
- --api-version=2
|
||||
- --log
|
||||
- --accept-multiclient
|
||||
#- --log-output=rpc,dap
|
||||
- exec
|
||||
- /opt/runner-debug
|
||||
- --continue
|
||||
ports:
|
||||
- containerPort: 2345
|
||||
|
|
@ -5,3 +5,6 @@ commonLabels:
|
|||
environment: local
|
||||
|
||||
namePrefix: local-
|
||||
|
||||
patchesStrategicMerge:
|
||||
- debug-runner.yaml
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
|
||||
<data-source source="LOCAL" name="bh-db@localhost" uuid="05aba6c0-168c-47e3-b807-951316a3a483">
|
||||
<driver-ref>postgresql</driver-ref>
|
||||
<synchronize>true</synchronize>
|
||||
<jdbc-driver>org.postgresql.Driver</jdbc-driver>
|
||||
<jdbc-url>jdbc:postgresql://localhost:5432/postgres</jdbc-url>
|
||||
<working-dir>$ProjectFileDir$</working-dir>
|
||||
</data-source>
|
||||
</component>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/src.iml" filepath="$PROJECT_DIR$/.idea/src.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="DBE_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
|
|
@ -1,20 +1,31 @@
|
|||
FROM golang:1.19-alpine as builder
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
COPY . /go/src
|
||||
|
||||
WORKDIR /go/src/${SERVICE}
|
||||
|
||||
RUN go mod tidy && go build -v -o /opt/${SERVICE} /go/src/${SERVICE}
|
||||
|
||||
|
||||
FROM alpine
|
||||
FROM golang:1.19-alpine as development
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
ENV SERVICE=${SERVICE}
|
||||
|
||||
COPY --from=builder /opt/${SERVICE} /opt/${SERVICE}
|
||||
RUN go install github.com/go-delve/delve/cmd/dlv@latest
|
||||
|
||||
COPY . /go/src
|
||||
|
||||
WORKDIR /go/src/${SERVICE}
|
||||
|
||||
RUN go mod tidy \
|
||||
&& CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o /opt/${SERVICE} /go/src/${SERVICE} \
|
||||
&& CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -gcflags="-N -l" -v -o /opt/${SERVICE}-debug /go/src/${SERVICE}
|
||||
# && go build -v -gcflags="all=-N -l" -o /opt/${SERVICE}-debug /go/src/${SERVICE}
|
||||
|
||||
ENTRYPOINT ['/go/bin/dlv']
|
||||
|
||||
CMD ['debug', '/go/src/${SERVICE}']
|
||||
|
||||
|
||||
FROM alpine as production
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
ENV SERVICE=${SERVICE}
|
||||
|
||||
COPY --from=development /opt/${SERVICE} /opt/${SERVICE}
|
||||
|
||||
ENTRYPOINT ['/opt/${SERVICE}']
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
FROM golang:1.19-alpine as builder
|
||||
|
||||
ARG SERVICE
|
||||
|
||||
COPY . /go/src
|
||||
|
||||
WORKDIR /go/src/${SERVICE}
|
||||
|
||||
RUN go mod tidy && go build -v -o /opt/${SERVICE} /go/src/${SERVICE}
|
||||
16
src/Tiltfile
16
src/Tiltfile
|
|
@ -38,19 +38,20 @@ helm_resource(
|
|||
|
||||
|
||||
|
||||
def bh_service(service="", port_forwards=[], labels=['2-services']):
|
||||
def bh_service(service="", port_forwards=[], devMode=True, labels=['2-services']):
|
||||
docker_build(
|
||||
ref="barretthousen/service-{}".format(service),
|
||||
dockerfile="./Dockerfile.service",
|
||||
context=".",
|
||||
target="development" if devMode else "production",
|
||||
build_args={
|
||||
"SERVICE": service
|
||||
},
|
||||
only=[
|
||||
"{}".format(service),
|
||||
"lib",
|
||||
"Dockerfile.service"
|
||||
]
|
||||
# only=[
|
||||
# "./{}".format(service),
|
||||
# "lib",
|
||||
# "Dockerfile.service"
|
||||
# ]
|
||||
)
|
||||
|
||||
k8s_resource(
|
||||
|
|
@ -62,7 +63,8 @@ def bh_service(service="", port_forwards=[], labels=['2-services']):
|
|||
|
||||
|
||||
|
||||
bh_service(service="runner", port_forwards=[5001])
|
||||
bh_service(service="runner", port_forwards=[5001, 2345])
|
||||
bh_service(service="proxy-admin", port_forwards=["8082:80"])
|
||||
|
||||
k8s_yaml(
|
||||
kustomize("../env/local")
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@ deps:
|
|||
- remote: buf.build
|
||||
owner: googleapis
|
||||
repository: googleapis
|
||||
commit: 5ae7f88519b04fe1965da0f8a375a088
|
||||
digest: shake256:27d9fcdc0e3eb957449dc3d17e2d24c7ce59c3c483ecf128818183c336dfd28595ecd13771fb3172247775caf7707c4076dd8e70c5ac2cbcac170df35e4d0028
|
||||
commit: cc916c31859748a68fd229a3c8d7a2e8
|
||||
digest: shake256:469b049d0eb04203d5272062636c078decefc96fec69739159c25d85349c50c34c7706918a8b216c5c27f76939df48452148cff8c5c3ae77fa6ba5c25c1b8bf8
|
||||
- remote: buf.build
|
||||
owner: grpc-ecosystem
|
||||
repository: grpc-gateway
|
||||
|
|
|
|||
|
|
@ -65,6 +65,9 @@ type (
|
|||
)
|
||||
|
||||
func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
|
||||
for k := range targetsImpls {
|
||||
kernel.TraceLog.Printf("Find Target: %q", k)
|
||||
}
|
||||
finder, ok := targetsImpls[in.TargetSite]
|
||||
if !ok {
|
||||
err = fmt.Errorf("could not find target matching name")
|
||||
|
|
@ -76,9 +79,9 @@ func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInpu
|
|||
return
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Scrape Job %q starting", out.Job.ID)
|
||||
kernel.InfoLog.Printf("Scrape Job %d starting", out.Job.ID)
|
||||
|
||||
found := make(chan catalog.Auction, 64)
|
||||
found := make(chan catalog.Auction)
|
||||
errGroup, innerCtx := errgroup.WithContext(ctx)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
|
|
@ -86,31 +89,29 @@ func (domain Domain) FindNewUpcoming(ctx context.Context, in FindNewUpcomingInpu
|
|||
})
|
||||
|
||||
count := 0
|
||||
for {
|
||||
auction, ok := <-found
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
for auction := range found {
|
||||
count++
|
||||
a := auction
|
||||
errGroup.Go(func() error {
|
||||
return domain.CatalogService.UpdateUpcomingAuction(auction)
|
||||
return domain.CatalogService.UpdateUpcomingAuction(a)
|
||||
})
|
||||
}
|
||||
|
||||
errMsg := ""
|
||||
if err = errGroup.Wait(); err != nil {
|
||||
err = fmt.Errorf("an issue occurred while finding upcoming items iteration: %w", err)
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
if out.Job, err = domain.Storage.CompleteScrapeJob(ctx, out.Job.ID, CompleteScrapeJobStatus{
|
||||
AuctionCount: count,
|
||||
Errors: err.Error(),
|
||||
Errors: errMsg,
|
||||
}); err != nil {
|
||||
err = fmt.Errorf("Could not complete scrape job, failing: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Scrape Job %q completed in %v.", out.Job.ID, out.Job.Completed.Sub(out.Job.Started))
|
||||
kernel.InfoLog.Printf("Scrape Job %d completed in %v.", out.Job.ID, out.Job.Completed.Sub(out.Job.Started))
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,13 +42,13 @@ func Run(parent context.Context, app App) {
|
|||
TraceLog.Println("[SHUTDOWN TRIGGERED] context exited unexpectedly")
|
||||
}
|
||||
|
||||
InfoLog.Println("Shutting down service")
|
||||
InfoLog.Println("Shutting down service ⛔⚠️😱")
|
||||
stopCtx, stopCanceller := context.WithTimeout(parent, time.Second*5)
|
||||
defer stopCanceller()
|
||||
app.OnStop(stopCtx)
|
||||
}()
|
||||
|
||||
InfoLog.Println("Starting service")
|
||||
InfoLog.Println("Starting service 🚀")
|
||||
|
||||
if err := loadConfig(app); err != nil {
|
||||
ErrorLog.Println(err)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
|
@ -14,6 +15,7 @@ type ServerBuilder func(grpc.ServiceRegistrar, string)
|
|||
|
||||
var (
|
||||
grpcServerInstance *grpc.Server
|
||||
httpServerInstance *http.Server
|
||||
listener net.Listener
|
||||
)
|
||||
|
||||
|
|
@ -49,3 +51,36 @@ func StopGRPCServer() error {
|
|||
grpcServerInstance.GracefulStop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func StartHTTPServer(ctx context.Context, port int, handler http.Handler) (err error) {
|
||||
if httpServerInstance != nil {
|
||||
err = errors.New("There can only be one HTTP server running at a time")
|
||||
return
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("0.0.0.0:%d", port)
|
||||
listener, err = net.Listen("tcp", endpoint)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not start tcp listener: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
httpServerInstance = &http.Server{
|
||||
Handler: handler,
|
||||
Addr: endpoint,
|
||||
ReadHeaderTimeout: time.Second * 1,
|
||||
}
|
||||
|
||||
if err = httpServerInstance.Serve(listener); err != nil {
|
||||
err = fmt.Errorf("could not serve http over listener: %w", err)
|
||||
}
|
||||
|
||||
InfoLog.Printf("Listening for HTTP requests on %s", endpoint)
|
||||
return
|
||||
}
|
||||
|
||||
func StopHTTPServer() error {
|
||||
ctx, canceler := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
|
||||
defer canceler()
|
||||
return httpServerInstance.Shutdown(ctx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,11 +62,13 @@ type Logger interface {
|
|||
|
||||
func SetLogLevel(ll LogLevel) {
|
||||
for i := 0; i < len(loggers); i++ {
|
||||
lg := loggers[i]
|
||||
if i > int(ll) {
|
||||
lg.SetOutput(writer)
|
||||
} else {
|
||||
lg.SetOutput(os.Stdout)
|
||||
target := writer
|
||||
|
||||
if int(ll) >= i {
|
||||
target = os.Stdout
|
||||
}
|
||||
|
||||
loggers[i].SetOutput(target)
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ func (pc PostgresConnection) String() string {
|
|||
}
|
||||
|
||||
func NewDBConnection(ctx context.Context, pg PostgresConnection) (conn *pgx.Conn, err error) {
|
||||
for retries := 0; retries < 3; retries++ {
|
||||
for retries := 0; retries < 5; retries++ {
|
||||
if conn, err = pgx.Connect(ctx, pg.String()); err != nil {
|
||||
sleepTime := time.Second * time.Duration(retries)
|
||||
log.Printf("%d attempt(s) to postgres failed, retrying in %v: %v", retries+1, sleepTime, err)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
module git.vdhsn.com/barretthousen/barretthousen/src/proxy-admin
|
||||
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
git.vdhsn.com/barretthousen/barretthousen/src/lib v1.0.0
|
||||
git.vdhsn.com/barretthousen/barretthousen/src/runner v1.0.0
|
||||
)
|
||||
|
||||
replace git.vdhsn.com/barretthousen/barretthousen/src/lib v1.0.0 => ../lib
|
||||
replace git.vdhsn.com/barretthousen/barretthousen/src/runner v1.0.0 => ../runner
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/runner/api"
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
type ProxyAdminApp struct {
|
||||
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
|
||||
Port int `yaml:"port" env:"PROXY_ADMIN_PORT"`
|
||||
Endpoints struct {
|
||||
Runner string `yaml:"runner" env:"RUNNER_ENDPOINT"`
|
||||
} `yaml:"endpoints" env:"PROXY_ADMIN_SERVICES"`
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) Start(ctx context.Context) error {
|
||||
grpcMux := runtime.NewServeMux()
|
||||
err := api.RegisterRunnerHandlerFromEndpoint(ctx, grpcMux, app.Endpoints.Runner, []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kernel.TraceLog.Printf("%+v", app)
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf("0.0.0.0:%d", app.Port),
|
||||
ReadHeaderTimeout: time.Second,
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
kernel.TraceLog.Printf("{ \"Client\": \"%s\", \"Path\":\"%s\"} ", r.RemoteAddr, r.URL.Path)
|
||||
grpcMux.ServeHTTP(w, r)
|
||||
}),
|
||||
}
|
||||
|
||||
kernel.InfoLog.Printf("Starting HTTP proxy @ %q", httpServer.Addr)
|
||||
|
||||
return httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) OnStop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (app *ProxyAdminApp) GetLogLevel() kernel.LogLevel { return app.LogLevel }
|
||||
|
||||
func main() {
|
||||
kernel.Run(context.Background(), &ProxyAdminApp{
|
||||
LogLevel: kernel.LevelTrace,
|
||||
Port: 80,
|
||||
})
|
||||
}
|
||||
|
|
@ -7,6 +7,16 @@ SELECT id,
|
|||
errors
|
||||
FROM runner.scrapejob;
|
||||
|
||||
-- name: GetJobByID :one
|
||||
SELECT id,
|
||||
startedTs,
|
||||
completedTs,
|
||||
targetSiteName,
|
||||
auctionsFound,
|
||||
errors
|
||||
FROM runner.scrapejob
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: CreateScrapeJob :one
|
||||
INSERT INTO runner.scrapejob (
|
||||
targetSiteName
|
||||
|
|
@ -14,3 +24,10 @@ INSERT INTO runner.scrapejob (
|
|||
$1
|
||||
) RETURNING *;
|
||||
|
||||
|
||||
-- name: CompleteScrapeJob :exec
|
||||
UPDATE runner.scrapejob SET
|
||||
completedTs = $2,
|
||||
auctionsFound = $3,
|
||||
errors = $4
|
||||
WHERE id = $1;
|
||||
|
|
|
|||
|
|
@ -1,8 +1,5 @@
|
|||
START TRANSACTION;
|
||||
|
||||
DROP ROLE IF EXISTS "runner-service";
|
||||
CREATE USER "runner-service" WITH PASSWORD 'runner-service';
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS runner;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS runner.scrapejob (
|
||||
|
|
@ -11,7 +8,24 @@ CREATE TABLE IF NOT EXISTS runner.scrapejob (
|
|||
completedTs TIMESTAMP,
|
||||
targetSiteName VARCHAR(512) NOT NULL,
|
||||
auctionsFound INT NOT NULL DEFAULT 0,
|
||||
errors VARCHAR(5000) NOT NULL
|
||||
errors VARCHAR(5000) NOT NULL DEFAULT ''
|
||||
);
|
||||
|
||||
DO
|
||||
$do$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles -- SELECT list can be empty for this
|
||||
WHERE rolname = 'runner-service') THEN
|
||||
|
||||
CREATE USER "runner-service" WITH PASSWORD 'runner-service';
|
||||
END IF;
|
||||
END
|
||||
$do$;
|
||||
|
||||
GRANT CONNECT ON DATABASE bh to "runner-service";
|
||||
GRANT USAGE ON SCHEMA runner TO "runner-service";
|
||||
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA runner TO "runner-service";
|
||||
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA runner TO "runner-service";
|
||||
|
||||
COMMIT;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,9 @@ package data
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
|
||||
|
|
@ -25,9 +28,55 @@ func (db *PGRunnerStorage) CreateScrapeJob(ctx context.Context, target string) (
|
|||
}
|
||||
|
||||
func (db *PGRunnerStorage) CompleteScrapeJob(ctx context.Context, ID int, status runner.CompleteScrapeJobStatus) (sj runner.ScrapeJob, err error) {
|
||||
completedTime := time.Now().UTC()
|
||||
if err = db.Queries.CompleteScrapeJob(ctx, postgres.CompleteScrapeJobParams{
|
||||
ID: int32(ID),
|
||||
Completedts: sql.NullTime{
|
||||
Time: completedTime,
|
||||
Valid: true,
|
||||
},
|
||||
Auctionsfound: int32(status.AuctionCount),
|
||||
Errors: status.Errors,
|
||||
}); err != nil {
|
||||
err = fmt.Errorf("could update scrape job in DB: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
var rsj postgres.RunnerScrapejob
|
||||
if rsj, err = db.Queries.GetJobByID(ctx, int32(ID)); err != nil {
|
||||
err = fmt.Errorf("could not get job by ID: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
sj = runner.ScrapeJob{
|
||||
ID: ID,
|
||||
Started: rsj.Startedts,
|
||||
Completed: completedTime,
|
||||
TargetSite: rsj.Targetsitename,
|
||||
AuctionsFound: status.AuctionCount,
|
||||
Errors: status.Errors,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *PGRunnerStorage) GetJobs(ctx context.Context) (results []runner.ScrapeJob, err error) {
|
||||
var jobs []postgres.RunnerScrapejob
|
||||
if jobs, err = db.Queries.GetJobs(ctx); err != nil {
|
||||
err = fmt.Errorf("Couldn't get jobs from DB: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, j := range jobs {
|
||||
results = append(results, runner.ScrapeJob{
|
||||
ID: int(j.ID),
|
||||
Started: j.Startedts,
|
||||
Completed: j.Completedts.Time,
|
||||
TargetSite: j.Targetsitename,
|
||||
AuctionsFound: int(j.Auctionsfound),
|
||||
Errors: j.Errors,
|
||||
})
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,11 @@ import (
|
|||
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/catalog"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/domain/runner"
|
||||
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
|
||||
)
|
||||
|
||||
func init() {
|
||||
kernel.TraceLog.Println("Registering AuctionFinder liveauctioneers")
|
||||
runner.RegisterAuctionFinder(
|
||||
LAAuctionFinder("liveauctioneers"),
|
||||
)
|
||||
|
|
@ -27,15 +29,14 @@ func (la LAAuctionFinder) String() string {
|
|||
func (la LAAuctionFinder) Find(ctx context.Context, limit int, results chan<- catalog.Auction) (err error) {
|
||||
defer close(results)
|
||||
|
||||
var fetched, total int
|
||||
if limit == 0 {
|
||||
limit = -1
|
||||
}
|
||||
fetched := -1
|
||||
total := 0
|
||||
page := uint(0)
|
||||
|
||||
for fetched < limit {
|
||||
var ids LACatalogIDs
|
||||
if ids, total, err = LAGetUpcomingSaleIDs(ctx, GetUpcomingSaleIDsInput{
|
||||
Page: 0,
|
||||
Page: page,
|
||||
Limit: 128,
|
||||
}); err != nil {
|
||||
return err
|
||||
|
|
@ -54,6 +55,11 @@ func (la LAAuctionFinder) Find(ctx context.Context, limit int, results chan<- ca
|
|||
for _, a := range auctions {
|
||||
results <- a
|
||||
}
|
||||
|
||||
if fetched >= limit {
|
||||
break
|
||||
}
|
||||
page++
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -158,7 +164,7 @@ func LAGetSaleInfo(ctx context.Context, catIDs LACatalogIDs) (results []catalog.
|
|||
return
|
||||
}
|
||||
|
||||
if res.StatusCode < 300 {
|
||||
if res.StatusCode >= 300 {
|
||||
err = fmt.Errorf("got non 200 status: %d", res.StatusCode)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func NewRunnerServer(d *runner.Domain) func(grpcServer grpc.ServiceRegistrar, endpoint string) {
|
||||
|
|
@ -22,5 +23,43 @@ type runnerHandler struct {
|
|||
}
|
||||
|
||||
func (rh *runnerHandler) FindNewUpcoming(ctx context.Context, cmd *api.FindNewUpcomingCommand) (*api.JobResult, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method FindNewUpcoming not implemented")
|
||||
out, err := rh.domain.FindNewUpcoming(ctx, runner.FindNewUpcomingInput{
|
||||
TargetSite: cmd.TargetSite,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "method FindNewUpcoming failed: %q", err.Error())
|
||||
}
|
||||
|
||||
return &api.JobResult{
|
||||
Id: int32(out.Job.ID),
|
||||
AuctionsFound: int32(out.Job.AuctionsFound),
|
||||
CreatedTs: timestamppb.New(out.Job.Started),
|
||||
CompletedTs: timestamppb.New(out.Job.Completed),
|
||||
TargetSiteName: out.Job.TargetSite,
|
||||
Errors: out.Job.Errors,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rh *runnerHandler) GetJobs(ctx context.Context, cmd *api.GetJobsCommand) (*api.JobsResult, error) {
|
||||
out, err := rh.domain.GetJobs(ctx, runner.GetJobsInput{})
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "method GetJobs failed: %q", err.Error())
|
||||
}
|
||||
|
||||
result := &api.JobsResult{
|
||||
Jobs: []*api.JobResult{},
|
||||
}
|
||||
|
||||
for _, j := range out.Jobs {
|
||||
result.Jobs = append(result.Jobs, &api.JobResult{
|
||||
Id: int32(j.ID),
|
||||
AuctionsFound: int32(j.AuctionsFound),
|
||||
CreatedTs: timestamppb.New(j.Started),
|
||||
CompletedTs: timestamppb.New(j.Completed),
|
||||
TargetSiteName: j.TargetSite,
|
||||
Errors: j.Errors,
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,9 +20,10 @@ import (
|
|||
|
||||
type (
|
||||
RunnerApp struct {
|
||||
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
|
||||
Port int `yaml:"port" env:"RUNNER_PORT"`
|
||||
Postgres kernel.PostgresConnection `yaml:"postgres" env:"RUNNER_DB"`
|
||||
LogLevel kernel.LogLevel `yaml:"log_level" env:"BH_LOG_LEVEL" env-default:"0" yaml-default:"0"`
|
||||
Port int `yaml:"port" env:"RUNNER_PORT"`
|
||||
DB_Service kernel.PostgresConnection `yaml:"db_service" env:"RUNNER_DB_SERVICE"`
|
||||
DB_Migrate kernel.PostgresConnection `yaml:"db_migrate" env:"RUNNER_DB_MIGRATE"`
|
||||
}
|
||||
)
|
||||
|
||||
|
|
@ -42,10 +43,10 @@ func main() {
|
|||
|
||||
func (app *RunnerApp) Start(ctx context.Context) error {
|
||||
if *migrate {
|
||||
kernel.InfoLog.Printf("running db migrations on %v", app.Postgres)
|
||||
kernel.InfoLog.Printf("running db migrations on %v", app.DB_Migrate)
|
||||
|
||||
kernel.InfoLog.Printf("MIGRATION SCRIPT:\n%q", dbMigrateScript)
|
||||
if err := kernel.Migrate(ctx, app.Postgres, dbMigrateScript); err != nil {
|
||||
if err := kernel.Migrate(ctx, app.DB_Migrate, dbMigrateScript); err != nil {
|
||||
return fmt.Errorf("could not execute db migration: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +56,7 @@ func (app *RunnerApp) Start(ctx context.Context) error {
|
|||
ioc := dig.New()
|
||||
var err error
|
||||
if err = ioc.Provide(func() kernel.PostgresConnection {
|
||||
return app.Postgres
|
||||
return app.DB_Service
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue