refactor grpc dial into kernel, trying to figure out runner import slowness

pull/1/head
Adam Veldhousen 1 year ago
parent d09e5aaf53
commit c4cebcea6d
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B

@ -101,10 +101,14 @@ def bh_backend_service(service="", port_forwards=[], migrateDB=False, devMode=Tr
bh_backend_service(service="catalog", migrateDB=True, port_forwards=["5002:5001", "2346:2345"])
bh_backend_service(service="runner", migrateDB=True, port_forwards=[5001, 2345])
bh_backend_service(service="runner", migrateDB=True, port_forwards=[
port_forward(2345, name='Delve port')
])
bh_backend_service(service="catalog", migrateDB=True, port_forwards=[
port_forward(2346, 2345, name='Delve port')
])
# bh_service(service="proxy-web", port_forwards=["8081:80"], deps=['ingress', 'local-catalog'])
bh_backend_service(service="proxy-admin", port_forwards=[
port_forward(8082, 80, name="HTTP API @ localhost:8082")
], deps=['ingress'])

@ -0,0 +1,18 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: admin
labels:
name: admin
spec:
rules:
- host: admin.barretthousen.com
http:
paths:
- pathType: Prefix
path: "/api"
backend:
service:
name: proxy-admin
port:
number: 80

@ -5,3 +5,4 @@ resources:
- ./proxy-client-deployment.yaml
- ./web-client-deployment.yaml
- ./client-ingress.yaml
- ./admin-ingress.yaml

@ -42,25 +42,6 @@ spec:
- port: 80
targetPort: 80
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: proxy-admin
labels:
name: proxy-admin
spec:
rules:
- host: admin.barretthousen.com
http:
paths:
- pathType: Prefix
path: "/api"
backend:
service:
name: proxy-admin
port:
number: 80
---
apiVersion: v1
kind: ConfigMap
metadata:

@ -13,7 +13,7 @@ patchesStrategicMerge:
patches:
- target:
kind: Ingress
name: proxy-admin
name: admin
patch: |-
- op: replace
path: /spec/rules/0/host

@ -10,6 +10,7 @@ import (
"github.com/ilyakaznacheev/cleanenv"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"
// "go.uber.org/dig"
)
@ -45,7 +46,18 @@ func Run(parent context.Context, app App) {
InfoLog.Println("Shutting down service ⛔⚠️😱")
stopCtx, stopCanceller := context.WithTimeout(parent, time.Second*5)
defer stopCanceller()
app.OnStop(stopCtx)
errs := &errgroup.Group{}
errs.Go(CloseGRPCConns)
errs.Go(StopHTTPServer)
errs.Go(StopGRPCServer)
if err := errs.Wait(); err != nil {
ErrorLog.Printf("There was an error shutting down the application gracefully: %v", err)
}
}()
InfoLog.Println("Starting service 🚀")

@ -6,9 +6,12 @@ import (
"fmt"
"net"
"net/http"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
)
type ServerBuilder func(grpc.ServiceRegistrar, string)
@ -84,3 +87,57 @@ func StopHTTPServer() error {
defer canceler()
return httpServerInstance.Shutdown(ctx)
}
type connMan struct {
sync.Mutex
openConns map[string]*grpc.ClientConn
}
var grpcConns = &connMan{
openConns: map[string]*grpc.ClientConn{},
}
type DialConfig struct {
InsecureTransport bool
BackoffMaxDelay time.Duration
BackoffMinTimeout time.Duration
}
func DialGRPC(endpoint string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
grpcConns.Lock()
defer grpcConns.Unlock()
var ok bool
if conn, ok = grpcConns.openConns[endpoint]; ok {
conn.ResetConnectBackoff()
return
}
conn, err = grpc.Dial(endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
MaxDelay: time.Second * 3,
},
MinConnectTimeout: time.Second,
}))
if err != nil {
err = fmt.Errorf("Could not dial %s: %w", endpoint, err)
return
}
grpcConns.openConns[endpoint] = conn
return
}
func CloseGRPCConns() error {
grpcConns.Lock()
defer grpcConns.Unlock()
for k, v := range grpcConns.openConns {
TraceLog.Printf("closing grpc connection: %s", k)
v.Close()
defer delete(grpcConns.openConns, k)
}
return nil
}

@ -95,10 +95,13 @@ func (domain Domain) Status(ctx context.Context, in GetJobsInput) (out GetJobsOu
// TODO: tests
func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int) {
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(time.Minute))
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
found := make(chan catalog.Auction, 2048)
errGroup, innerCtx := errgroup.WithContext(ctx)
deadlineCtx, deadlineCancel := context.WithDeadline(ctx, time.Now().Add(time.Minute))
defer deadlineCancel()
errGroup, innerCtx := errgroup.WithContext(deadlineCtx)
errGroup.Go(func() error {
return finder.Find(innerCtx, 0, found)
@ -136,7 +139,7 @@ func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int)
AuctionCount: count,
Errors: errs.String(),
}); err != nil {
kernel.ErrorLog.Printf("Could not complete scrape job, failing: %w", err)
kernel.ErrorLog.Printf("Could not complete scrape job, failing: %v", err)
}
kernel.InfoLog.Printf("Scrape Job %d completed in %v. Successfully imported %d/%d", jobID, completedJob.Completed.Sub(completedJob.Started), count, total)

@ -15,7 +15,6 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/dig"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
_ "git.vdhsn.com/barretthousen/barretthousen/src/runner/internal/domain/liveauctioneers"
)
@ -32,6 +31,8 @@ type (
var (
migrate = flag.Bool("migrate", false, "migrates postgres db")
client = flag.String("client", "", "Runs this service in client mode, to invoke sync job. Takes GRPC endpoint")
target = flag.String("target", "", "To be used with client mode. The target to sync")
//go:embed internal/data/postgres/migrations/*.sql
dbMigrateScript embed.FS
@ -48,6 +49,11 @@ func main() {
}
func (app *runnerApp) Start(ctx context.Context) error {
if *client != "" {
kernel.InfoLog.Printf("Client mode: %s - target: %s", *client, *target)
return nil
}
if *migrate {
kernel.InfoLog.Printf("running db migrations on %v", app.DB_Migrate)
if err := kernel.MigrateDB(ctx, app.DB_Migrate, dbMigrateScript, "runner"); err != nil {
@ -81,15 +87,13 @@ func (app *runnerApp) Start(ctx context.Context) error {
return err
}
conn, err := grpc.Dial(app.CatalogEndpoint, grpc.WithTransportCredentials(
insecure.NewCredentials(),
))
if err != nil {
if err = ioc.Provide(func() (grpc.ClientConnInterface, error) {
return kernel.DialGRPC(app.CatalogEndpoint)
}); err != nil {
return err
}
defer conn.Close()
if err = ioc.Provide(func() domain.CatalogService {
if err = ioc.Provide(func(conn grpc.ClientConnInterface) domain.CatalogService {
return capi.NewCatalogServiceClient(conn)
}); err != nil {
return err
@ -116,9 +120,6 @@ func (app *runnerApp) Start(ctx context.Context) error {
}
func (app *runnerApp) OnStop(ctx context.Context) {
if err := kernel.StopGRPCServer(); err != nil {
kernel.ErrorLog.Printf("could not gracefully stop GRPC server: %v", err)
}
}
func (app *runnerApp) GetLogLevel() kernel.LogLevel { return app.LogLevel }

Loading…
Cancel
Save