@ -14,8 +14,8 @@ import (
)
const (
defaultSamples = 64
maxSamples = 128
defaultSamples = 32
maxSamples = 64
)
type Sqlite struct {
@ -46,19 +46,20 @@ func (ss *Sqlite) Close() error {
}
func ( ss * Sqlite ) GetLogAggregate ( la LogAggregateInput ) ( LogAggregate , error ) {
if la . End . IsZero ( ) || la . End . After ( time . Now ( ) ) {
la . End = time . Now ( ) . UTC ( )
rightNow := time . Now ( ) . UTC ( ) . Unix ( )
if la . End <= 0 || la . End > rightNow {
la . End = rightNow
}
if la . Start . After ( la . End ) {
if la . Start > la . End {
return LogAggregate { } , errors . New ( "Start time cannot be before end time" )
}
if la . Start . IsZero ( ) {
la . Start = time . Now ( ) . UTC ( ) . Add ( time . Hour * - 12 )
if la . Start <= 0 {
la . Start = time . Now ( ) . UTC ( ) . Add ( time . Hour * - 12 ) . Unix ( )
}
timespanSecs := int ( la . End . Sub ( la . Start ) / time . Second )
timespanSecs := int ( la . End - la . Start )
// how many data points to show on the line plot
sampleCount := defaultSamples
@ -87,68 +88,76 @@ func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) (LogAggregate, error) {
la . Column = string ( Domain )
}
logs , err := ss . GetLog ( GetLogInput {
Start : la . Start ,
End : la . End ,
Limit : - 1 ,
Page : 0 ,
} )
if err != nil {
return LogAggregate { } , err
}
if logs . PageCount > 1 {
return LogAggregate { } , fmt . Errorf ( "more than one page available: %v" , logs . PageCount )
}
buckets := map [ string ] [ ] StatsDataPoint { }
for _ , l := range logs . Logs {
k := GetAggregateColumnHeader ( l , LogAggregateColumn ( la . Column ) )
if _ , ok := buckets [ k ] ; ! ok {
buckets [ k ] = make ( [ ] StatsDataPoint , sampleCount )
}
dataset := buckets [ k ]
timeIndex := int ( l . Started . Sub ( la . Start ) / time . Second ) / la . IntervalSeconds
if Assert ( timeIndex >= len ( dataset ) , "ERROR TIME INDEX OUT OF RANGE %d/%d with interval %d - time index: %s" , timeIndex , len ( dataset ) , la . IntervalSeconds , l . Started . Sub ( la . Start ) / time . Second ) {
continue
}
ladp := dataset [ timeIndex ]
ladp . Header = k
offsetSecs := ( timeIndex * la . IntervalSeconds )
ladp . Time = la . Start . Add ( time . Duration ( offsetSecs ) * time . Second )
ladp . Count += 1
ladp . Value += float64 ( l . TotalTimeMs )
buckets [ k ] [ timeIndex ] = ladp
}
laResult := LogAggregate {
Labels : make ( [ ] string , sampleCount ) ,
Datasets : make ( [ ] LogAggregateDataset , len ( buckets ) ) ,
}
for idx := 0 ; idx < sampleCount ; idx ++ {
offsetSecs := ( idx * la . IntervalSeconds )
ts := la . Start . Add ( time . Duration ( offsetSecs ) * time . Second )
laResult . Labels [ idx ] = ts . Format ( "01-02 15:04:05" )
idx := 0
for k , v := range buckets {
ladp := v [ idx ]
if ladp . Time . IsZero ( ) {
v [ idx ] . Time = ts
}
laResult . Datasets [ idx ] . Dataset = v
laResult . Datasets [ idx ] . Label = k
idx ++
}
Datasets : make ( [ ] LogAggregateDataset , 0 ) ,
}
return laResult , nil
// logs, err := ss.GetLog(GetLogInput{
// Start: la.Start,
// End: la.End,
// Limit: -1,
// Page: 0,
// })
// if err != nil {
// return LogAggregate{}, err
// }
// if logs.PageCount > 1 {
// return LogAggregate{}, fmt.Errorf("more than one page available: %v", logs.PageCount)
// }
// buckets := map[string][]StatsDataPoint{}
// for _, l := range logs.Logs {
// k := GetAggregateColumnHeader(l, LogAggregateColumn(la.Column))
// if _, ok := buckets[k]; !ok {
// buckets[k] = make([]StatsDataPoint, sampleCount)
// }
// dataset := buckets[k]
// timeIndex := int(l.Started.Unix()-la.Start) / la.IntervalSeconds
// if Assert(timeIndex >= len(dataset), "ERROR TIME INDEX OUT OF RANGE %d/%d with interval %d - time index: %s", timeIndex, len(dataset), la.IntervalSeconds, l.Started.Sub(time.Unix(la.Start, 0))/time.Second) {
// continue
// }
// ladp := dataset[timeIndex]
// ladp.Header = k
// offsetSecs := int64(timeIndex * la.IntervalSeconds)
// ladp.Time = time.Unix(la.Start+offsetSecs, 0)
// ladp.Count += 1
// ladp.Value += float64(l.TotalTimeMs)
// buckets[k][timeIndex] = ladp
// }
// laResult := LogAggregate{
// Labels: make([]string, sampleCount),
// Datasets: make([]LogAggregateDataset, len(buckets)),
// }
// for idx := 0; idx < sampleCount; idx++ {
// offsetSecs := int64(idx * la.IntervalSeconds)
// ts := time.Unix(la.Start+offsetSecs, 0)
// laResult.Labels[idx] = ts.Format("01-02 15:04:05")
// idx := 0
// for k, v := range buckets {
// if idx < len(v) {
// if v[idx].Time.IsZero() {
// v[idx].Time = ts
// }
// }
// laResult.Datasets[idx].Dataset = v
// laResult.Datasets[idx].Label = k
// idx++
// }
// }
// return laResult, nil
}
type LogAggregate struct {
@ -169,7 +178,7 @@ func (ss *Sqlite) Log(ql QueryLog) error {
( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ;
`
if _ , err := ss . DB . Exec ( sql ,
ql . Started .UTC ( ) . Format ( ISO8601 ) ,
ql . Started ,
ql . ClientIP ,
ql . Protocol ,
ql . Domain ,
@ -366,12 +375,12 @@ func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
in . Limit = 25
}
if in . Start . IsZero ( ) {
in . Start = time . Now ( ) . Add ( time . Hour * - 86400 )
if in . Start == 0 {
in . Start = time . Now ( ) . Add ( time . Hour * - 86400 ) . Unix ( )
}
if in . End . IsZero ( ) {
in . End = time . Now ( )
if in . End == 0 {
in . End = time . Now ( ) . Unix ( )
}
glr := GetLogResult {
@ -387,12 +396,11 @@ func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
glr . TotalResults = lpi . Total
glr . PageCount = lpi . PageCount + 1
limitTxt := "LIMIT ?"
if in . Limit <= - 1 {
limitTxt = ""
in. Limit = 100000
}
sql := fmt . Sprintf ( `
sql := `
SELECT
started , clientIp , protocol , domain , totalTimeMs ,
error , recurseRoundTripTimeMs , recurseUpstreamIp , status
@ -408,12 +416,12 @@ func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
recurseUpstreamIp ,
status
FROM log
WHERE CAST ( strftime ( ' % % s ' , started ) AS INTEGER ) BETWEEN ? AND ?
WHERE CAST ( strftime ( ' % s ' , started ) AS INTEGER ) BETWEEN ? AND ?
ORDER BY started DESC
) WHERE id <= ? ORDER BY id DESC % s ;
` , limitTxt )
) WHERE id <= ? ORDER BY id DESC LIMIT ? ;
`
rows , err := ss . DB . Query ( sql , in . Start .UTC ( ) . Format ( ISO8601 ) , in . End . UTC ( ) . Format ( ISO8601 ) , lpi . FirstItemID , in . Limit )
rows , err := ss . DB . Query ( sql , in . Start , in . End , lpi . FirstItemID , in . Limit )
if err != nil {
return glr , fmt . Errorf ( "issue with GetLog sql query: %w" , err )
}
@ -425,10 +433,9 @@ func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
for rows . Next ( ) {
var q QueryLog
var started string
if err := rows . Scan (
& s tarted,
& q. S tarted,
& q . ClientIP ,
& q . Protocol ,
& q . Domain ,
@ -441,10 +448,6 @@ func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
return glr , fmt . Errorf ( "issues scanning rows: %w" , err )
}
if q . Started , err = time . Parse ( ISO8601 , started ) ; err != nil {
return glr , fmt . Errorf ( "could not parse time '%s': %w" , started , err )
}
glr . Logs = append ( glr . Logs , q )
}
@ -471,15 +474,20 @@ func (ss *Sqlite) GetPagingInfo(in GetLogInput) (lpi LogPageInfo, err error) {
`
pageOffset := in . Limit * in . Page
row := ss . QueryRow ( sql , in . Limit , pageOffset , in . Start .UTC ( ) . Format ( ISO8601 ) , in . End . UTC ( ) . Format ( ISO8601 ) )
row := ss . QueryRow ( sql , in . Limit , pageOffset , in . Start , in . End )
if err = row . Scan ( & lpi . Total , & lpi . PageCount , & lpi . FirstItemID ) ; err != nil {
return
}
if in . Limit < 0 {
lpi . PageCount = 0
}
if pageOffset > lpi . Total {
err = errors . New ( "page number too high" )
}
log . Printf ( "in: %+v, out: %+v" , in , lpi )
return
}
@ -497,7 +505,7 @@ func initTable(db *sql.DB) error {
recurseUpStreamIP TEXT ,
status TEXT NOT NULL
) ;
CREATE INDEX IF NOT EXISTS idx_log_started ON log ( started ) ;
CREATE TABLE IF NOT EXISTS rules (
id INTEGER PRIMARY KEY ,