barretthousen/src/runner/main.go

126 lines
3.4 KiB
Go

package main
import (
"context"
"embed"
"flag"
"fmt"
capi "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/dig"
"google.golang.org/grpc"
_ "git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain/liveauctioneers"
)
type (
runnerApp struct {
LogLevel kernel.LogLevel `yaml:"log_level" yaml-default:"0"`
Port int `yaml:"port" `
CatalogEndpoint string `yaml:"catalog_endpoint"`
DB_Service kernel.PostgresConnection `yaml:"db_service"`
DB_Migrate kernel.PostgresConnection `yaml:"db_migrate"`
}
)
var (
migrate = flag.Bool("migrate", false, "migrates postgres db")
client = flag.String("client", "", "Runs this service in client mode, to invoke sync job. Takes GRPC endpoint")
target = flag.String("target", "", "To be used with client mode. The target to sync")
//go:embed internal/data/postgres/migrations/*.sql
dbMigrateScript embed.FS
)
func main() {
flag.Parse()
kernel.Run(context.Background(), &runnerApp{
CatalogEndpoint: "catalog-local:5001",
LogLevel: kernel.LevelTrace,
Port: 5001,
})
}
func (app *runnerApp) Start(ctx context.Context) error {
if *client != "" {
kernel.InfoLog.Printf("Client mode: %s - target: %s", *client, *target)
return nil
}
if *migrate {
kernel.InfoLog.Printf("running db migrations on %v", app.DB_Migrate)
if err := kernel.MigrateDB(ctx, app.DB_Migrate, dbMigrateScript, "runner"); err != nil {
return fmt.Errorf("could not execute db migration: %w", err)
}
}
ioc := dig.New()
var err error
if err = ioc.Provide(func() kernel.PostgresConnection {
return app.DB_Service
}); err != nil {
return err
}
if err = ioc.Provide(func(pgCfg kernel.PostgresConnection) (*pgxpool.Pool, error) {
return kernel.NewDBConnection(ctx, pgCfg)
}); err != nil {
return err
}
if err = ioc.Provide(func(pgConn *pgxpool.Pool) *postgres.Queries {
return postgres.New(pgConn)
}); err != nil {
return err
}
if err = ioc.Provide(func(queries *postgres.Queries) domain.Storage {
return &data.PGRunnerStorage{Queries: queries}
}); err != nil {
return err
}
if err = ioc.Provide(func() (grpc.ClientConnInterface, error) {
return kernel.DialGRPC(app.CatalogEndpoint)
}); err != nil {
return err
}
if err = ioc.Provide(func(conn grpc.ClientConnInterface) domain.CatalogService {
return capi.NewCatalogServiceClient(conn)
}); err != nil {
return err
}
if err = ioc.Provide(func(css domain.CatalogService, rs domain.Storage) *domain.Domain {
return &domain.Domain{
Storage: rs,
CatalogService: css,
}
}); err != nil {
return err
}
return ioc.Invoke(func(d *domain.Domain) error {
runnerService := internal.NewRunnerServer(d)
if _, err := kernel.StartGRPCServer(ctx, app.Port, runnerService); err != nil {
return err
}
return nil
})
}
func (app *runnerApp) OnStop(ctx context.Context) {
}
func (app *runnerApp) GetLogLevel() kernel.LogLevel { return app.LogLevel }