Compare commits

..

2 Commits

Author SHA1 Message Date
Adam Veldhousen 8d564e441a
got dashboard api call to work 2021-06-09 00:20:42 -05:00
Adam Veldhousen 29b500668e
tstorage sucks 2021-06-08 21:57:38 -05:00
9 changed files with 2431 additions and 188 deletions

2238
client/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -62,11 +62,20 @@ export enum StatSearchKey {
Protocol = "protocol"
}
export interface Stat {
export interface StatDataset {
label: string
data: DataPoint[]
}
export interface DataPoint {
Header: string
AverageTotalTime: Number
Value: Number
Count: number,
Time: string
}
export interface Stat {
labels: string[]
datasets: StatDataset[]
};
export const getStats = async ({
@ -74,7 +83,7 @@ export const getStats = async ({
end = new Date(),
key = StatSearchKey.Domain,
interval = 30,
}: StatsSearchOptions) => await apiCall<Stat[]>('metrics/stats', 'GET', {
}: StatsSearchOptions) => await apiCall<Stat>('metrics/stats', 'GET', {
start: getUnixTime(start),
end: getUnixTime(end),
key,

View File

@ -2,45 +2,23 @@
import { onMount } from "svelte";
import type { Stat } from "../api";
import randomColor from "randomcolor";
import { Chart, registerables } from "chart.js";
Chart.register(...registerables);
export let stats: Stat[] = [];
export let stats: Stat = null;
export let column: string = null;
const transformStats = (ostats) => {
const chartData = ostats.reduce((agg, x) => {
let root = agg[x.Header] || {
labels: [],
dataset: {
label: x.Header,
borderColor: randomColor({
luminosity: "dark",
}), //"rgb(75,192,192)",
data: [],
},
};
root.dataset.data = root.dataset.data.concat(x.Count);
root.labels = root.labels.concat(x.Time);
agg[x.Header] = root;
return agg;
}, {});
const finalChartData = Object.keys(chartData).map((x) => chartData[x]);
const finalChartLabels =
finalChartData.length > 0 ? finalChartData[0].labels : [];
return {
labels: finalChartLabels,
datasets: finalChartData.map((x) => x.dataset),
};
};
const generateChartOptions = (s: [], empty: Boolean = false) => {
const generateChartOptions = (s: Stat, empty: Boolean = false) => {
let labels = [];
let datasets = [];
if (s && s.length > 0) {
({ labels, datasets } = transformStats(s));
if (s) {
labels = s.labels;
datasets = s.datasets.map(({ label, data }) => ({
label,
data: data.map((x) => x.Count),
borderColor: randomColor(),
}));
}
var delayed;
@ -54,7 +32,11 @@
responsive: true,
maintainAspectRatio: false,
scales: {
// x: {
x: {
title: {
label: "time",
display: true,
},
// type: "time",
// ticks: {
// source: "auto",
@ -62,9 +44,13 @@
// maxRotation: 0,
// autoSkip: true,
// },
// },
},
y: {
stacked: true,
min: 0,
ticks: {
stepSize: 5,
},
},
},
hoverRadius: 5,
@ -79,6 +65,10 @@
algorithm: "lttb",
samples: 60,
},
title: {
display: true,
text: `Count by ${column}`,
},
},
animations: {
radius: {
@ -117,7 +107,7 @@
chartInstance = new Chart(ctx, generateChartOptions(stats, true));
});
const update = (s) => {
const update = (s: Stat) => {
if (chartInstance) {
const { options, data } = generateChartOptions(s, false);
chartInstance.options = options;

View File

@ -32,7 +32,7 @@
let chartDataLoading: Boolean = false;
let logDataLoading: Boolean = false;
let chartData: Stat[] = [];
let chartData: Stat = null;
let logs: Log[] = [];
let pageSize: number = 50;
let pageCount: number = 0;
@ -62,7 +62,7 @@
return payload;
};
const fetchStats = async () => {
const fetchStats = async (): Promise<Stat> => {
if (chartDataLoading) {
console.warn("tried loading stats while already loading");
return;
@ -80,7 +80,7 @@
if (error) {
chartErrorMsg = error;
return [];
return null;
}
return payload;
@ -137,7 +137,7 @@
{:else if chartErrorMsg}
<p>{chartErrorMsg}</p>
{:else}
<TimeChart stats={chartData} />
<TimeChart stats={chartData} column={chartKey} />
{/if}
</section>
<section class="my-5">

View File

@ -1,13 +1,11 @@
package internal
import (
"fmt"
"log"
"strings"
"time"
"github.com/miekg/dns"
"github.com/nakabonne/tstorage"
)
type DomainManager struct {
@ -57,12 +55,13 @@ func (dm *DomainManager) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
responseMessage.Compress = true
ql.TotalTimeMs = int(time.Since(start).Milliseconds())
log.Printf("%+v", ql)
go func() {
if err := dm.Storage.Log(ql); err != nil {
go func(q QueryLog) {
if err := dm.Storage.Log(q); err != nil {
log.Printf("ERROR WRITING LOG: %v", err)
}
}()
}(ql)
if err := w.WriteMsg(responseMessage); err != nil {
log.Println(err)
@ -91,36 +90,15 @@ type QueryLog struct {
Status ResponseStatus
}
func QueryLogLabels(q QueryLog) []tstorage.Label {
return []tstorage.Label{
{
Name: string(ClientIP),
Value: q.ClientIP,
},
{
Name: string(Domain),
Value: q.Domain,
},
{
Name: string(Protocol),
Value: q.Protocol,
},
{
Name: string(Status),
Value: string(q.Status),
},
{
Name: string(RecursedUpstream),
Value: string(q.RecurseUpstreamIP),
},
{
Name: string(LookupError),
Value: fmt.Sprintf("%s", q.Error != ""),
},
func GetAggregateColumnHeader(ql QueryLog, h LogAggregateColumn) string {
switch h {
case ClientIP:
return ql.ClientIP
case Status:
return string(ql.Status)
case Protocol:
return ql.Protocol
}
}
func GetLabelsFromQuery(la LogAggregateInput) []tstorage.Label {
return nil
return ql.Domain
}

View File

@ -2,20 +2,20 @@ package internal
import (
"database/sql"
"errors"
"fmt"
"log"
"net"
"strconv"
"strings"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/nakabonne/tstorage"
)
type Sqlite struct {
Path string
*sql.DB
TS tstorage.Storage
}
func (ss *Sqlite) Open() error {
@ -26,14 +26,7 @@ func (ss *Sqlite) Open() error {
db.SetMaxOpenConns(1)
ts, _ := tstorage.NewStorage(
tstorage.WithTimestampPrecision(tstorage.Nanoseconds),
tstorage.WithPartitionDuration(time.Minute*30),
tstorage.WithDataPath(ss.Path),
)
ss.DB = db
ss.TS = ts
if err := initTable(db); err != nil {
return err
@ -42,6 +35,129 @@ func (ss *Sqlite) Open() error {
return nil
}
func (ss *Sqlite) Close() error {
ss.DB.Close()
return nil
}
const defaultSamples = 64
const maxSamples = 128
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) (LogAggregate, error) {
if la.End.IsZero() || la.End.After(time.Now()) {
la.End = time.Now().UTC()
}
if la.Start.After(la.End) {
return LogAggregate{}, errors.New("Start time cannot be before end time")
}
if la.Start.IsZero() {
la.Start = time.Now().UTC().Add(time.Hour * -12)
}
timespanSecs := int(la.End.Sub(la.Start) / time.Second)
// how many data points to show on the line plot
sampleCount := defaultSamples
if la.IntervalSeconds <= 0 {
la.IntervalSeconds = timespanSecs / sampleCount
}
sampleCount = timespanSecs / la.IntervalSeconds
// cap to prevent performance issues
if sampleCount > maxSamples {
sampleCount = maxSamples
la.IntervalSeconds = timespanSecs / sampleCount
log.Printf("got %v samples, capping to 256 for perf", sampleCount)
}
log.Printf("%+v - samples: %v - timespan (seconds): %v", la, sampleCount, timespanSecs)
switch la.Column {
case string(Domain):
case string(Status):
case string(ClientIP):
case string(Protocol):
break
default:
la.Column = string(Domain)
}
logs, err := ss.GetLog(GetLogInput{
Start: la.Start,
End: la.End,
Limit: 10000,
Page: 0,
})
if err != nil {
return LogAggregate{}, err
}
if logs.PageCount > 1 {
return LogAggregate{}, fmt.Errorf("more than one page available: %v", logs.PageCount)
}
lut := [][]StatsDataPoint{}
buckets := map[string][]StatsDataPoint{}
for _, l := range logs.Logs {
k := GetAggregateColumnHeader(l, LogAggregateColumn(la.Column))
if _, ok := buckets[k]; !ok {
buckets[k] = make([]StatsDataPoint, sampleCount)
lut = append(lut, buckets[k])
}
dataset := buckets[k]
timeIndex := int(l.Started.Sub(la.Start)/time.Second) / la.IntervalSeconds
ladp := dataset[timeIndex]
ladp.Header = k
offsetSecs := (timeIndex * la.IntervalSeconds)
ladp.Time = la.Start.Add(time.Duration(offsetSecs) * time.Second)
ladp.Count += 1
ladp.Value += float64(l.TotalTimeMs)
buckets[k][timeIndex] = ladp
}
laResult := LogAggregate{
Labels: make([]string, sampleCount),
Datasets: make([]LogAggregateDataset, len(buckets)),
}
for idx := 0; idx < sampleCount; idx++ {
offsetSecs := (idx * la.IntervalSeconds)
ts := la.Start.Add(time.Duration(offsetSecs) * time.Second)
laResult.Labels[idx] = ts.Format("01-02 15:04:05")
idx := 0
for k, v := range buckets {
ladp := v[idx]
if ladp.Time.IsZero() {
v[idx].Time = ts
}
laResult.Datasets[idx].Dataset = v
laResult.Datasets[idx].Label = k
idx++
}
}
return laResult, nil
}
type LogAggregate struct {
Labels []string `json:"labels"`
Datasets []LogAggregateDataset `json:"datasets"`
}
type LogAggregateDataset struct {
Label string `json:"label"`
Dataset []StatsDataPoint `json:"data"`
}
func (ss *Sqlite) Log(ql QueryLog) error {
sql := `
INSERT INTO log
@ -63,49 +179,6 @@ func (ss *Sqlite) Log(ql QueryLog) error {
return err
}
labels := QueryLogLabels(ql)
isError := 0
if ql.Error != "" {
isError = 1
}
if err := ss.TS.InsertRows([]tstorage.Row{
{
Metric: "Count",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: 1,
},
},
{
Metric: "TotalTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.TotalTimeMs),
},
},
{
Metric: "Errors",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(isError),
},
},
{
Metric: "RecurseTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.RecurseRoundTripTimeMs),
},
},
}); err != nil {
return err
}
return nil
}
@ -380,50 +453,6 @@ func (ss *Sqlite) GetPagingInfo(in GetLogInput) (totalItems, pageCount int, err
return
}
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) ([]LogAggregateDataPoint, error) {
timeWindow := int64(5 * 60)
column := "domain"
if lac, ok := AggregateKeys[la.Column]; ok {
column = string(lac)
}
if la.IntervalSeconds > 0 {
timeWindow = int64(la.IntervalSeconds)
}
points, err := ss.TS.Select(column, GetLabelsFromQuery(la), la.Start.UnixNano(), la.End.UnixNano())
var
for _, p := range points {
ladp := LogAggregateDataPoint{
Header: column,
Count: int(p.Value),
Time: time.Unix(0, p.Timestamp),
}
results = append(results, ladp)
}
// for rows.Next() {
// var ladp LogAggregateDataPoint
// var timeInterval int64
// if err := rows.Scan(
// &ladp.Header,
// &ladp.AverageTotalTime,
// &ladp.Count,
// &timeInterval,
// ); err != nil {
// return nil, err
// }
// ladp.Time = time.Unix(timeInterval*timeWindow, 0)
// results = append(results, ladp)
// }
return results, nil
}
func initTable(db *sql.DB) error {
sql := `
CREATE TABLE IF NOT EXISTS log (

View File

@ -37,9 +37,9 @@ type LogAggregateInput struct {
Column string
}
type LogAggregateDataPoint struct {
type StatsDataPoint struct {
Header string
AverageTotalTime float64
Value float64
Count int
Time time.Time
}
@ -60,7 +60,7 @@ type Storage interface {
Log(QueryLog) error
GetLog(GetLogInput) (GetLogResult, error)
GetLogAggregate(LogAggregateInput) ([]LogAggregateDataPoint, error)
GetLogAggregate(LogAggregateInput) (LogAggregate, error)
}
type RecursorRow struct {

View File

@ -37,6 +37,8 @@ func main() {
Path: conf.DatabaseURL,
}
defer store.Close()
if err := store.Open(); err != nil {
log.Fatalf("COULD NOT OPEN SQLITE DB: %v", err)
}

View File

@ -25,7 +25,6 @@ test:
mkdir -p .bin
.bin/gopherhole: .bin
# @go build --tags "sqlite_foreign_keys fts5" -v -o .bin/gopherhole .
@go build --tags "fts5" -v -o .bin/gopherhole .
.bin/config.json: