Compare commits

..

2 Commits

Author SHA1 Message Date
Adam Veldhousen 8d564e441a
got dashboard api call to work 2021-06-09 00:20:42 -05:00
Adam Veldhousen 29b500668e
tstorage sucks 2021-06-08 21:57:38 -05:00
9 changed files with 2431 additions and 188 deletions

2238
client/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -62,11 +62,20 @@ export enum StatSearchKey {
Protocol = "protocol" Protocol = "protocol"
} }
export interface Stat { export interface StatDataset {
label: string
data: DataPoint[]
}
export interface DataPoint {
Header: string Header: string
AverageTotalTime: Number Value: Number
Count: number, Count: number,
Time: string Time: string
}
export interface Stat {
labels: string[]
datasets: StatDataset[]
}; };
export const getStats = async ({ export const getStats = async ({
@ -74,7 +83,7 @@ export const getStats = async ({
end = new Date(), end = new Date(),
key = StatSearchKey.Domain, key = StatSearchKey.Domain,
interval = 30, interval = 30,
}: StatsSearchOptions) => await apiCall<Stat[]>('metrics/stats', 'GET', { }: StatsSearchOptions) => await apiCall<Stat>('metrics/stats', 'GET', {
start: getUnixTime(start), start: getUnixTime(start),
end: getUnixTime(end), end: getUnixTime(end),
key, key,

View File

@ -2,45 +2,23 @@
import { onMount } from "svelte"; import { onMount } from "svelte";
import type { Stat } from "../api"; import type { Stat } from "../api";
import randomColor from "randomcolor"; import randomColor from "randomcolor";
import { Chart, registerables } from "chart.js"; import { Chart, registerables } from "chart.js";
Chart.register(...registerables); Chart.register(...registerables);
export let stats: Stat[] = []; export let stats: Stat = null;
export let column: string = null;
const transformStats = (ostats) => { const generateChartOptions = (s: Stat, empty: Boolean = false) => {
const chartData = ostats.reduce((agg, x) => {
let root = agg[x.Header] || {
labels: [],
dataset: {
label: x.Header,
borderColor: randomColor({
luminosity: "dark",
}), //"rgb(75,192,192)",
data: [],
},
};
root.dataset.data = root.dataset.data.concat(x.Count);
root.labels = root.labels.concat(x.Time);
agg[x.Header] = root;
return agg;
}, {});
const finalChartData = Object.keys(chartData).map((x) => chartData[x]);
const finalChartLabels =
finalChartData.length > 0 ? finalChartData[0].labels : [];
return {
labels: finalChartLabels,
datasets: finalChartData.map((x) => x.dataset),
};
};
const generateChartOptions = (s: [], empty: Boolean = false) => {
let labels = []; let labels = [];
let datasets = []; let datasets = [];
if (s && s.length > 0) { if (s) {
({ labels, datasets } = transformStats(s)); labels = s.labels;
datasets = s.datasets.map(({ label, data }) => ({
label,
data: data.map((x) => x.Count),
borderColor: randomColor(),
}));
} }
var delayed; var delayed;
@ -54,7 +32,11 @@
responsive: true, responsive: true,
maintainAspectRatio: false, maintainAspectRatio: false,
scales: { scales: {
// x: { x: {
title: {
label: "time",
display: true,
},
// type: "time", // type: "time",
// ticks: { // ticks: {
// source: "auto", // source: "auto",
@ -62,9 +44,13 @@
// maxRotation: 0, // maxRotation: 0,
// autoSkip: true, // autoSkip: true,
// }, // },
// }, },
y: { y: {
stacked: true, stacked: true,
min: 0,
ticks: {
stepSize: 5,
},
}, },
}, },
hoverRadius: 5, hoverRadius: 5,
@ -79,6 +65,10 @@
algorithm: "lttb", algorithm: "lttb",
samples: 60, samples: 60,
}, },
title: {
display: true,
text: `Count by ${column}`,
},
}, },
animations: { animations: {
radius: { radius: {
@ -117,7 +107,7 @@
chartInstance = new Chart(ctx, generateChartOptions(stats, true)); chartInstance = new Chart(ctx, generateChartOptions(stats, true));
}); });
const update = (s) => { const update = (s: Stat) => {
if (chartInstance) { if (chartInstance) {
const { options, data } = generateChartOptions(s, false); const { options, data } = generateChartOptions(s, false);
chartInstance.options = options; chartInstance.options = options;

View File

@ -32,7 +32,7 @@
let chartDataLoading: Boolean = false; let chartDataLoading: Boolean = false;
let logDataLoading: Boolean = false; let logDataLoading: Boolean = false;
let chartData: Stat[] = []; let chartData: Stat = null;
let logs: Log[] = []; let logs: Log[] = [];
let pageSize: number = 50; let pageSize: number = 50;
let pageCount: number = 0; let pageCount: number = 0;
@ -62,7 +62,7 @@
return payload; return payload;
}; };
const fetchStats = async () => { const fetchStats = async (): Promise<Stat> => {
if (chartDataLoading) { if (chartDataLoading) {
console.warn("tried loading stats while already loading"); console.warn("tried loading stats while already loading");
return; return;
@ -80,7 +80,7 @@
if (error) { if (error) {
chartErrorMsg = error; chartErrorMsg = error;
return []; return null;
} }
return payload; return payload;
@ -137,7 +137,7 @@
{:else if chartErrorMsg} {:else if chartErrorMsg}
<p>{chartErrorMsg}</p> <p>{chartErrorMsg}</p>
{:else} {:else}
<TimeChart stats={chartData} /> <TimeChart stats={chartData} column={chartKey} />
{/if} {/if}
</section> </section>
<section class="my-5"> <section class="my-5">

View File

@ -1,13 +1,11 @@
package internal package internal
import ( import (
"fmt"
"log" "log"
"strings" "strings"
"time" "time"
"github.com/miekg/dns" "github.com/miekg/dns"
"github.com/nakabonne/tstorage"
) )
type DomainManager struct { type DomainManager struct {
@ -57,12 +55,13 @@ func (dm *DomainManager) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
responseMessage.Compress = true responseMessage.Compress = true
ql.TotalTimeMs = int(time.Since(start).Milliseconds()) ql.TotalTimeMs = int(time.Since(start).Milliseconds())
log.Printf("%+v", ql) log.Printf("%+v", ql)
go func() { go func(q QueryLog) {
if err := dm.Storage.Log(ql); err != nil { if err := dm.Storage.Log(q); err != nil {
log.Printf("ERROR WRITING LOG: %v", err) log.Printf("ERROR WRITING LOG: %v", err)
} }
}() }(ql)
if err := w.WriteMsg(responseMessage); err != nil { if err := w.WriteMsg(responseMessage); err != nil {
log.Println(err) log.Println(err)
@ -91,36 +90,15 @@ type QueryLog struct {
Status ResponseStatus Status ResponseStatus
} }
func QueryLogLabels(q QueryLog) []tstorage.Label { func GetAggregateColumnHeader(ql QueryLog, h LogAggregateColumn) string {
return []tstorage.Label{ switch h {
{ case ClientIP:
Name: string(ClientIP), return ql.ClientIP
Value: q.ClientIP, case Status:
}, return string(ql.Status)
{ case Protocol:
Name: string(Domain), return ql.Protocol
Value: q.Domain,
},
{
Name: string(Protocol),
Value: q.Protocol,
},
{
Name: string(Status),
Value: string(q.Status),
},
{
Name: string(RecursedUpstream),
Value: string(q.RecurseUpstreamIP),
},
{
Name: string(LookupError),
Value: fmt.Sprintf("%s", q.Error != ""),
},
} }
}
func GetLabelsFromQuery(la LogAggregateInput) []tstorage.Label {
return nil
return ql.Domain
} }

View File

@ -2,20 +2,20 @@ package internal
import ( import (
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"log"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/nakabonne/tstorage"
) )
type Sqlite struct { type Sqlite struct {
Path string Path string
*sql.DB *sql.DB
TS tstorage.Storage
} }
func (ss *Sqlite) Open() error { func (ss *Sqlite) Open() error {
@ -26,14 +26,7 @@ func (ss *Sqlite) Open() error {
db.SetMaxOpenConns(1) db.SetMaxOpenConns(1)
ts, _ := tstorage.NewStorage(
tstorage.WithTimestampPrecision(tstorage.Nanoseconds),
tstorage.WithPartitionDuration(time.Minute*30),
tstorage.WithDataPath(ss.Path),
)
ss.DB = db ss.DB = db
ss.TS = ts
if err := initTable(db); err != nil { if err := initTable(db); err != nil {
return err return err
@ -42,6 +35,129 @@ func (ss *Sqlite) Open() error {
return nil return nil
} }
func (ss *Sqlite) Close() error {
ss.DB.Close()
return nil
}
const defaultSamples = 64
const maxSamples = 128
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) (LogAggregate, error) {
if la.End.IsZero() || la.End.After(time.Now()) {
la.End = time.Now().UTC()
}
if la.Start.After(la.End) {
return LogAggregate{}, errors.New("Start time cannot be before end time")
}
if la.Start.IsZero() {
la.Start = time.Now().UTC().Add(time.Hour * -12)
}
timespanSecs := int(la.End.Sub(la.Start) / time.Second)
// how many data points to show on the line plot
sampleCount := defaultSamples
if la.IntervalSeconds <= 0 {
la.IntervalSeconds = timespanSecs / sampleCount
}
sampleCount = timespanSecs / la.IntervalSeconds
// cap to prevent performance issues
if sampleCount > maxSamples {
sampleCount = maxSamples
la.IntervalSeconds = timespanSecs / sampleCount
log.Printf("got %v samples, capping to 256 for perf", sampleCount)
}
log.Printf("%+v - samples: %v - timespan (seconds): %v", la, sampleCount, timespanSecs)
switch la.Column {
case string(Domain):
case string(Status):
case string(ClientIP):
case string(Protocol):
break
default:
la.Column = string(Domain)
}
logs, err := ss.GetLog(GetLogInput{
Start: la.Start,
End: la.End,
Limit: 10000,
Page: 0,
})
if err != nil {
return LogAggregate{}, err
}
if logs.PageCount > 1 {
return LogAggregate{}, fmt.Errorf("more than one page available: %v", logs.PageCount)
}
lut := [][]StatsDataPoint{}
buckets := map[string][]StatsDataPoint{}
for _, l := range logs.Logs {
k := GetAggregateColumnHeader(l, LogAggregateColumn(la.Column))
if _, ok := buckets[k]; !ok {
buckets[k] = make([]StatsDataPoint, sampleCount)
lut = append(lut, buckets[k])
}
dataset := buckets[k]
timeIndex := int(l.Started.Sub(la.Start)/time.Second) / la.IntervalSeconds
ladp := dataset[timeIndex]
ladp.Header = k
offsetSecs := (timeIndex * la.IntervalSeconds)
ladp.Time = la.Start.Add(time.Duration(offsetSecs) * time.Second)
ladp.Count += 1
ladp.Value += float64(l.TotalTimeMs)
buckets[k][timeIndex] = ladp
}
laResult := LogAggregate{
Labels: make([]string, sampleCount),
Datasets: make([]LogAggregateDataset, len(buckets)),
}
for idx := 0; idx < sampleCount; idx++ {
offsetSecs := (idx * la.IntervalSeconds)
ts := la.Start.Add(time.Duration(offsetSecs) * time.Second)
laResult.Labels[idx] = ts.Format("01-02 15:04:05")
idx := 0
for k, v := range buckets {
ladp := v[idx]
if ladp.Time.IsZero() {
v[idx].Time = ts
}
laResult.Datasets[idx].Dataset = v
laResult.Datasets[idx].Label = k
idx++
}
}
return laResult, nil
}
type LogAggregate struct {
Labels []string `json:"labels"`
Datasets []LogAggregateDataset `json:"datasets"`
}
type LogAggregateDataset struct {
Label string `json:"label"`
Dataset []StatsDataPoint `json:"data"`
}
func (ss *Sqlite) Log(ql QueryLog) error { func (ss *Sqlite) Log(ql QueryLog) error {
sql := ` sql := `
INSERT INTO log INSERT INTO log
@ -63,49 +179,6 @@ func (ss *Sqlite) Log(ql QueryLog) error {
return err return err
} }
labels := QueryLogLabels(ql)
isError := 0
if ql.Error != "" {
isError = 1
}
if err := ss.TS.InsertRows([]tstorage.Row{
{
Metric: "Count",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: 1,
},
},
{
Metric: "TotalTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.TotalTimeMs),
},
},
{
Metric: "Errors",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(isError),
},
},
{
Metric: "RecurseTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.RecurseRoundTripTimeMs),
},
},
}); err != nil {
return err
}
return nil return nil
} }
@ -380,50 +453,6 @@ func (ss *Sqlite) GetPagingInfo(in GetLogInput) (totalItems, pageCount int, err
return return
} }
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) ([]LogAggregateDataPoint, error) {
timeWindow := int64(5 * 60)
column := "domain"
if lac, ok := AggregateKeys[la.Column]; ok {
column = string(lac)
}
if la.IntervalSeconds > 0 {
timeWindow = int64(la.IntervalSeconds)
}
points, err := ss.TS.Select(column, GetLabelsFromQuery(la), la.Start.UnixNano(), la.End.UnixNano())
var
for _, p := range points {
ladp := LogAggregateDataPoint{
Header: column,
Count: int(p.Value),
Time: time.Unix(0, p.Timestamp),
}
results = append(results, ladp)
}
// for rows.Next() {
// var ladp LogAggregateDataPoint
// var timeInterval int64
// if err := rows.Scan(
// &ladp.Header,
// &ladp.AverageTotalTime,
// &ladp.Count,
// &timeInterval,
// ); err != nil {
// return nil, err
// }
// ladp.Time = time.Unix(timeInterval*timeWindow, 0)
// results = append(results, ladp)
// }
return results, nil
}
func initTable(db *sql.DB) error { func initTable(db *sql.DB) error {
sql := ` sql := `
CREATE TABLE IF NOT EXISTS log ( CREATE TABLE IF NOT EXISTS log (

View File

@ -37,9 +37,9 @@ type LogAggregateInput struct {
Column string Column string
} }
type LogAggregateDataPoint struct { type StatsDataPoint struct {
Header string Header string
AverageTotalTime float64 Value float64
Count int Count int
Time time.Time Time time.Time
} }
@ -60,7 +60,7 @@ type Storage interface {
Log(QueryLog) error Log(QueryLog) error
GetLog(GetLogInput) (GetLogResult, error) GetLog(GetLogInput) (GetLogResult, error)
GetLogAggregate(LogAggregateInput) ([]LogAggregateDataPoint, error) GetLogAggregate(LogAggregateInput) (LogAggregate, error)
} }
type RecursorRow struct { type RecursorRow struct {

View File

@ -37,6 +37,8 @@ func main() {
Path: conf.DatabaseURL, Path: conf.DatabaseURL,
} }
defer store.Close()
if err := store.Open(); err != nil { if err := store.Open(); err != nil {
log.Fatalf("COULD NOT OPEN SQLITE DB: %v", err) log.Fatalf("COULD NOT OPEN SQLITE DB: %v", err)
} }

View File

@ -25,7 +25,6 @@ test:
mkdir -p .bin mkdir -p .bin
.bin/gopherhole: .bin .bin/gopherhole: .bin
# @go build --tags "sqlite_foreign_keys fts5" -v -o .bin/gopherhole .
@go build --tags "fts5" -v -o .bin/gopherhole . @go build --tags "fts5" -v -o .bin/gopherhole .
.bin/config.json: .bin/config.json: