consume from kafka

feat/kafka
Adam Veldhousen 2023-06-11 22:51:34 -05:00
parent ebe90899f0
commit bf6e10a856
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B
9 changed files with 236 additions and 62 deletions

View File

@ -19,20 +19,6 @@ type CatalogServiceClient struct {
capi.CatalogClient
}
type Auction struct {
Fingerprint string `json:"fingerprint,omitempty"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
SourceSiteURL string `json:"source_site_url,omitempty"`
SourceSiteName string `json:"source_site_name,omitempty"`
SourceURL string `json:"source_url,omitempty"`
Country string `json:"country,omitempty"`
Province string `json:"province,omitempty"`
ItemCount int `json:"itemCount,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
}
type AuctionCreatedEvent struct {
Fingerprint string
ID int
@ -46,11 +32,11 @@ func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a Au
End: timestamppb.New(a.End),
Title: a.Title,
Description: a.Description,
SourceSiteURL: a.SourceSiteURL,
SourceSiteName: a.SourceSiteName,
SourceURL: a.SourceURL,
Country: a.Country,
Province: a.Province,
SourceSiteURL: a.Source.SiteURL,
SourceSiteName: a.Source.Name,
SourceURL: a.Source.AuctionURL,
Country: a.Address.CountryCode,
Province: a.Address.State,
})
if err != nil {
return AuctionCreatedEvent{}, err
@ -62,3 +48,36 @@ func (css *CatalogServiceClient) UpdateUpcomingAuction(ctx context.Context, a Au
Duplicate: ac.Duplicate,
}, nil
}
type Auction struct {
ID int `json:"id,omitempty"`
Fingerprint string `json:"fingerprint,omitempty"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
ItemCount int `json:"itemCount,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
Source SourceDetail `json:"source"`
Seller SellerDetail `json:"seller"`
Address AddressDetail `json:"address"`
}
type SourceDetail struct {
Name string `json:"name"`
AuctionURL string `json:"auctionUrl"`
SiteURL string `json:"siteURL"`
}
type SellerDetail struct {
ID int `json:"id"`
Name string `json:"name"`
SiteURL string `json:"siteUrl"`
}
type AddressDetail struct {
CountryCode string `json:"country"`
Lat float64 `json:"lat"`
Long float64 `json:"lng"`
State string `json:"state"`
City string `json:"city"`
}

View File

@ -14,8 +14,8 @@ service Catalog {
};
}
rpc ImportAuction(ImportAuctionMessage) returns (AuctionCreated) {
}
// rpc ImportAuction(ImportAuctionMessage) returns (AuctionCreated) {
// }
}
message AuctionSearchCriteria {

View File

@ -0,0 +1,58 @@
package kafka
import (
"context"
"errors"
"io"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
kafka "github.com/segmentio/kafka-go"
)
type kafkaConsumer struct {
*kafka.Conn
}
func (c *kafkaConsumer) Close() error {
return c.Conn.Close()
}
func (c *kafkaConsumer) ConsumeAsync(ctx context.Context, output chan<- []byte) {
for {
batch := c.ReadBatch(1024, 1024*50)
for {
msg, err := batch.ReadMessage()
if err != nil && !errors.Is(err, io.EOF) || len(msg.Value) == 0 {
break
}
kernel.TraceLog.Printf("header: %+v, value: %+v", msg.Headers, msg.Value)
select {
case output <- msg.Value:
case <-ctx.Done():
batch.Close()
return
}
}
batch.Close()
select {
case <-ctx.Done():
return
}
}
}
func NewConsumer(topic string) (*kafkaConsumer, error) {
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka:9092", topic, 0)
if err != nil {
return nil, err
}
return &kafkaConsumer{
Conn: conn,
}, nil
}

View File

@ -0,0 +1,55 @@
package internal
import (
"context"
"encoding/json"
"io"
api "git.vdhsn.com/barretthousen/barretthousen/src/catalog/api"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/domain"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
)
type KafkaConsumer interface {
io.Closer
ConsumeAsync(context.Context, chan<- []byte)
}
type AuctionImporter interface {
ImportAuction(context.Context, domain.ImportAuctionMessage) (domain.AuctionCreated, error)
}
func RunIndexer(ctx context.Context, c KafkaConsumer, importer AuctionImporter) {
defer c.Close()
msgs := make(chan []byte, 64)
go c.ConsumeAsync(ctx, msgs)
for msg := range msgs {
var auction api.Auction
if err := json.Unmarshal(msg, &auction); err != nil {
kernel.TraceLog.Printf("could not ingest: %w", err)
continue
}
_, err := importer.ImportAuction(ctx, domain.ImportAuctionMessage{
Auction: domain.Auction{
ItemCount: auction.ItemCount,
Start: auction.Start,
End: auction.End,
Title: auction.Title,
Description: auction.Description,
SourceSiteURL: auction.Source.SiteURL,
SourceSiteName: auction.Source.Name,
SourceURL: auction.Source.AuctionURL,
Country: auction.Address.CountryCode,
Province: auction.Address.State,
},
})
if err != nil {
kernel.ErrorLog.Printf("could not import auction: %w", err)
continue
}
}
}

View File

@ -8,6 +8,7 @@ import (
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data/kafka"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/data/postgres"
"git.vdhsn.com/barretthousen/barretthousen/src/catalog/internal/domain"
"git.vdhsn.com/barretthousen/barretthousen/src/lib/kernel"
@ -80,7 +81,15 @@ func (app *catalogApp) Start(ctx context.Context) error {
return err
}
return ioc.Invoke(func(d *domain.Usecase) error {
if err = ioc.Provide(func() (internal.KafkaConsumer, error) {
return kafka.NewConsumer("runner.sync_results")
}); err != nil {
return err
}
return ioc.Invoke(func(d *domain.Usecase, consumer internal.KafkaConsumer) error {
go internal.RunIndexer(ctx, consumer, d)
catalogService := internal.NewCatalogServer(d)
if _, err := kernel.StartGRPCServer(ctx, app.Port, catalogService); err != nil {

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"
kafka "github.com/segmentio/kafka-go"
)
@ -16,15 +17,25 @@ func (c *kafkaProducer) Close() error {
return c.Conn.Close()
}
func (c *kafkaProducer) SendMessageJSON(msg interface{}) (err error) {
func (c *kafkaProducer) SendMessageJSON(headers map[string]string, msg interface{}) (err error) {
var data []byte
if data, err = json.Marshal(msg); err != nil {
err = fmt.Errorf("could not marshal message into JSON: %w", err)
return
}
h := []kafka.Header{}
for k, v := range headers {
h = append(h, kafka.Header{
Key: k,
Value: []byte(v),
})
}
if _, err = c.WriteMessages(kafka.Message{
Value: data,
Time: time.Now().UTC(),
Headers: h,
Value: data,
}); err != nil {
return
}

View File

@ -199,16 +199,26 @@ func LAGetSaleInfo(ctx context.Context, catIDs LACatalogIDs) (results []catalog.
results = make([]catalog.Auction, len(apiResults.Data.Catalogs))
for idx, c := range apiResults.Data.Catalogs {
results[idx] = catalog.Auction{
Title: c.Title,
Description: c.Description,
SourceSiteURL: "https://www.liveauctioneers.com",
SourceSiteName: "Live Auctioneers",
SourceURL: fmt.Sprintf("https://www.liveauctioneers.com/catalog/%d", c.ID),
Start: time.Unix(c.SaleStartTS, 0),
End: time.Unix(c.SaleStartTS, 0).Add(time.Hour * 8),
ItemCount: c.ItemCount,
Country: c.Address.CountryCode,
Province: c.Address.City,
Title: c.Title,
Description: c.Description,
Start: time.Unix(c.SaleStartTS, 0),
End: time.Unix(c.SaleStartTS, 0).Add(time.Hour * 8),
ItemCount: c.ItemCount,
Source: catalog.SourceDetail{
SiteURL: "https://www.liveauctioneers.com",
Name: "Live Auctioneers",
AuctionURL: fmt.Sprintf("https://www.liveauctioneers.com/catalog/%d", c.ID),
},
Address: catalog.AddressDetail{
CountryCode: c.Address.CountryCode,
City: c.Address.City,
State: c.Address.State,
Lat: c.Address.Lat,
Long: c.Address.Long,
},
Seller: catalog.SellerDetail{
ID: int(c.SellerID),
},
}
}

View File

@ -32,7 +32,7 @@ func RegisterAuctionFinder(finder UpcomingAuctionFinder) {
type (
Domain struct {
Storage
CatalogService
// CatalogService
}
CompleteScrapeJobStatus struct {
@ -50,6 +50,11 @@ type (
UpdateUpcomingAuction(context.Context, catalog.Auction) (catalog.AuctionCreatedEvent, error)
}
KafkaProducer interface {
io.Closer
SendMessageJSON(map[string]string, interface{}) error
}
FindNewUpcomingInput struct {
TargetSite string
}
@ -62,17 +67,23 @@ type (
func (domain Domain) StartSync(ctx context.Context, in FindNewUpcomingInput) (out FindNewUpcomingOutput, err error) {
kernel.TraceLog.Printf("%+v", in)
finder := targetsImpls["liveauctioneers"]
runAllFinders := in.TargetSite == "" || in.TargetSite == "all"
for targetSite, finder := range targetsImpls {
if !runAllFinders && in.TargetSite != targetSite {
continue
}
if out.Job, err = domain.Storage.CreateScrapeJob(ctx, in.TargetSite); err != nil {
err = fmt.Errorf("could not create new scrape job record: %w", err)
return
if out.Job, err = domain.Storage.CreateScrapeJob(ctx, targetSite); err != nil {
err = fmt.Errorf("could not create new scrape job record: %w", err)
return
}
kernel.InfoLog.Printf("Scrape Job %d starting", out.Job.ID)
// TODO: make everything after this line async and run after return
go domain.executeScrapeJob(finder, out.Job.ID)
}
kernel.InfoLog.Printf("Scrape Job %d starting", out.Job.ID)
// TODO: make everything after this line async and run after return
go domain.executeScrapeJob(finder, out.Job.ID)
return
}
@ -95,18 +106,13 @@ func (domain Domain) Status(ctx context.Context, in GetJobsInput) (out GetJobsOu
return
}
type KafkaProducer interface {
io.Closer
SendMessageJSON(interface{}) error
}
// TODO: tests
func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
found := make(chan catalog.Auction, 2048)
deadlineCtx, deadlineCancel := context.WithDeadline(ctx, time.Now().Add(time.Minute))
deadlineCtx, deadlineCancel := context.WithTimeout(ctx, time.Minute*5)
defer deadlineCancel()
errGroup, innerCtx := errgroup.WithContext(deadlineCtx)
@ -116,7 +122,7 @@ func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int)
var p KafkaProducer
var err error
if p, err = kafka.NewProducer("catalog.upcoming"); err != nil {
if p, err = kafka.NewProducer("runner.sync_results"); err != nil {
return
}
@ -129,20 +135,26 @@ func (domain *Domain) executeScrapeJob(finder UpcomingAuctionFinder, jobID int)
continue
}
if err := p.SendMessageJSON(auction); err != nil {
if err := p.SendMessageJSON(map[string]string{
"scrape-job-id": fmt.Sprintf("%d", jobID),
"target-site": finder.String(),
}, auction); err != nil {
kernel.TraceLog.Printf("could not publish auction to kafka: %v", err)
}
ace, err := domain.CatalogService.UpdateUpcomingAuction(ctx, auction)
if err != nil {
kernel.TraceLog.Printf("could not import upcoming auction: %s", err.Error())
fmt.Fprintf(errs, "{ \"AuctionFingerprint\": \"%s\", \"error\": \"%s\" }\n", ace.Fingerprint, err.Error())
continue
}
if !ace.Duplicate {
count++
}
count++
// ace, err := domain.CatalogService.UpdateUpcomingAuction(ctx, auction)
// if err != nil {
// kernel.TraceLog.Printf("could not import upcoming auction: %s", err.Error())
// fmt.Fprintf(errs, "{ \"AuctionFingerprint\": \"%s\", \"error\": \"%s\" }\n", ace.Fingerprint, err.Error())
// continue
// }
// if !ace.Duplicate {
// count++
// }
}
if err := errGroup.Wait(); err != nil {

View File

@ -101,8 +101,8 @@ func (app *runnerApp) Start(ctx context.Context) error {
if err = ioc.Provide(func(css domain.CatalogService, rs domain.Storage) *domain.Domain {
return &domain.Domain{
Storage: rs,
CatalogService: css,
Storage: rs,
// CatalogService: css,
}
}); err != nil {
return err