Compare commits
2 Commits
abc1017453
...
8d564e441a
| Author | SHA1 | Date |
|---|---|---|
|
|
8d564e441a | |
|
|
29b500668e |
File diff suppressed because it is too large
Load Diff
|
|
@ -62,11 +62,20 @@ export enum StatSearchKey {
|
|||
Protocol = "protocol"
|
||||
}
|
||||
|
||||
export interface Stat {
|
||||
export interface StatDataset {
|
||||
label: string
|
||||
data: DataPoint[]
|
||||
}
|
||||
|
||||
export interface DataPoint {
|
||||
Header: string
|
||||
AverageTotalTime: Number
|
||||
Value: Number
|
||||
Count: number,
|
||||
Time: string
|
||||
}
|
||||
export interface Stat {
|
||||
labels: string[]
|
||||
datasets: StatDataset[]
|
||||
};
|
||||
|
||||
export const getStats = async ({
|
||||
|
|
@ -74,7 +83,7 @@ export const getStats = async ({
|
|||
end = new Date(),
|
||||
key = StatSearchKey.Domain,
|
||||
interval = 30,
|
||||
}: StatsSearchOptions) => await apiCall<Stat[]>('metrics/stats', 'GET', {
|
||||
}: StatsSearchOptions) => await apiCall<Stat>('metrics/stats', 'GET', {
|
||||
start: getUnixTime(start),
|
||||
end: getUnixTime(end),
|
||||
key,
|
||||
|
|
|
|||
|
|
@ -2,45 +2,23 @@
|
|||
import { onMount } from "svelte";
|
||||
import type { Stat } from "../api";
|
||||
import randomColor from "randomcolor";
|
||||
|
||||
import { Chart, registerables } from "chart.js";
|
||||
Chart.register(...registerables);
|
||||
|
||||
export let stats: Stat[] = [];
|
||||
export let stats: Stat = null;
|
||||
export let column: string = null;
|
||||
|
||||
const transformStats = (ostats) => {
|
||||
const chartData = ostats.reduce((agg, x) => {
|
||||
let root = agg[x.Header] || {
|
||||
labels: [],
|
||||
dataset: {
|
||||
label: x.Header,
|
||||
borderColor: randomColor({
|
||||
luminosity: "dark",
|
||||
}), //"rgb(75,192,192)",
|
||||
data: [],
|
||||
},
|
||||
};
|
||||
root.dataset.data = root.dataset.data.concat(x.Count);
|
||||
root.labels = root.labels.concat(x.Time);
|
||||
agg[x.Header] = root;
|
||||
return agg;
|
||||
}, {});
|
||||
|
||||
const finalChartData = Object.keys(chartData).map((x) => chartData[x]);
|
||||
const finalChartLabels =
|
||||
finalChartData.length > 0 ? finalChartData[0].labels : [];
|
||||
return {
|
||||
labels: finalChartLabels,
|
||||
datasets: finalChartData.map((x) => x.dataset),
|
||||
};
|
||||
};
|
||||
|
||||
const generateChartOptions = (s: [], empty: Boolean = false) => {
|
||||
const generateChartOptions = (s: Stat, empty: Boolean = false) => {
|
||||
let labels = [];
|
||||
let datasets = [];
|
||||
|
||||
if (s && s.length > 0) {
|
||||
({ labels, datasets } = transformStats(s));
|
||||
if (s) {
|
||||
labels = s.labels;
|
||||
datasets = s.datasets.map(({ label, data }) => ({
|
||||
label,
|
||||
data: data.map((x) => x.Count),
|
||||
borderColor: randomColor(),
|
||||
}));
|
||||
}
|
||||
|
||||
var delayed;
|
||||
|
|
@ -54,7 +32,11 @@
|
|||
responsive: true,
|
||||
maintainAspectRatio: false,
|
||||
scales: {
|
||||
// x: {
|
||||
x: {
|
||||
title: {
|
||||
label: "time",
|
||||
display: true,
|
||||
},
|
||||
// type: "time",
|
||||
// ticks: {
|
||||
// source: "auto",
|
||||
|
|
@ -62,9 +44,13 @@
|
|||
// maxRotation: 0,
|
||||
// autoSkip: true,
|
||||
// },
|
||||
// },
|
||||
},
|
||||
y: {
|
||||
stacked: true,
|
||||
min: 0,
|
||||
ticks: {
|
||||
stepSize: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
hoverRadius: 5,
|
||||
|
|
@ -79,6 +65,10 @@
|
|||
algorithm: "lttb",
|
||||
samples: 60,
|
||||
},
|
||||
title: {
|
||||
display: true,
|
||||
text: `Count by ${column}`,
|
||||
},
|
||||
},
|
||||
animations: {
|
||||
radius: {
|
||||
|
|
@ -117,7 +107,7 @@
|
|||
chartInstance = new Chart(ctx, generateChartOptions(stats, true));
|
||||
});
|
||||
|
||||
const update = (s) => {
|
||||
const update = (s: Stat) => {
|
||||
if (chartInstance) {
|
||||
const { options, data } = generateChartOptions(s, false);
|
||||
chartInstance.options = options;
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@
|
|||
let chartDataLoading: Boolean = false;
|
||||
let logDataLoading: Boolean = false;
|
||||
|
||||
let chartData: Stat[] = [];
|
||||
let chartData: Stat = null;
|
||||
let logs: Log[] = [];
|
||||
let pageSize: number = 50;
|
||||
let pageCount: number = 0;
|
||||
|
|
@ -62,7 +62,7 @@
|
|||
return payload;
|
||||
};
|
||||
|
||||
const fetchStats = async () => {
|
||||
const fetchStats = async (): Promise<Stat> => {
|
||||
if (chartDataLoading) {
|
||||
console.warn("tried loading stats while already loading");
|
||||
return;
|
||||
|
|
@ -80,7 +80,7 @@
|
|||
|
||||
if (error) {
|
||||
chartErrorMsg = error;
|
||||
return [];
|
||||
return null;
|
||||
}
|
||||
|
||||
return payload;
|
||||
|
|
@ -137,7 +137,7 @@
|
|||
{:else if chartErrorMsg}
|
||||
<p>{chartErrorMsg}</p>
|
||||
{:else}
|
||||
<TimeChart stats={chartData} />
|
||||
<TimeChart stats={chartData} column={chartKey} />
|
||||
{/if}
|
||||
</section>
|
||||
<section class="my-5">
|
||||
|
|
|
|||
|
|
@ -1,13 +1,11 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/nakabonne/tstorage"
|
||||
)
|
||||
|
||||
type DomainManager struct {
|
||||
|
|
@ -57,12 +55,13 @@ func (dm *DomainManager) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
|
|||
responseMessage.Compress = true
|
||||
|
||||
ql.TotalTimeMs = int(time.Since(start).Milliseconds())
|
||||
|
||||
log.Printf("%+v", ql)
|
||||
go func() {
|
||||
if err := dm.Storage.Log(ql); err != nil {
|
||||
go func(q QueryLog) {
|
||||
if err := dm.Storage.Log(q); err != nil {
|
||||
log.Printf("ERROR WRITING LOG: %v", err)
|
||||
}
|
||||
}()
|
||||
}(ql)
|
||||
|
||||
if err := w.WriteMsg(responseMessage); err != nil {
|
||||
log.Println(err)
|
||||
|
|
@ -91,36 +90,15 @@ type QueryLog struct {
|
|||
Status ResponseStatus
|
||||
}
|
||||
|
||||
func QueryLogLabels(q QueryLog) []tstorage.Label {
|
||||
return []tstorage.Label{
|
||||
{
|
||||
Name: string(ClientIP),
|
||||
Value: q.ClientIP,
|
||||
},
|
||||
{
|
||||
Name: string(Domain),
|
||||
Value: q.Domain,
|
||||
},
|
||||
{
|
||||
Name: string(Protocol),
|
||||
Value: q.Protocol,
|
||||
},
|
||||
{
|
||||
Name: string(Status),
|
||||
Value: string(q.Status),
|
||||
},
|
||||
{
|
||||
Name: string(RecursedUpstream),
|
||||
Value: string(q.RecurseUpstreamIP),
|
||||
},
|
||||
{
|
||||
Name: string(LookupError),
|
||||
Value: fmt.Sprintf("%s", q.Error != ""),
|
||||
},
|
||||
func GetAggregateColumnHeader(ql QueryLog, h LogAggregateColumn) string {
|
||||
switch h {
|
||||
case ClientIP:
|
||||
return ql.ClientIP
|
||||
case Status:
|
||||
return string(ql.Status)
|
||||
case Protocol:
|
||||
return ql.Protocol
|
||||
}
|
||||
}
|
||||
|
||||
func GetLabelsFromQuery(la LogAggregateInput) []tstorage.Label {
|
||||
return nil
|
||||
|
||||
return ql.Domain
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,20 +2,20 @@ package internal
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/nakabonne/tstorage"
|
||||
)
|
||||
|
||||
type Sqlite struct {
|
||||
Path string
|
||||
*sql.DB
|
||||
TS tstorage.Storage
|
||||
}
|
||||
|
||||
func (ss *Sqlite) Open() error {
|
||||
|
|
@ -26,14 +26,7 @@ func (ss *Sqlite) Open() error {
|
|||
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
ts, _ := tstorage.NewStorage(
|
||||
tstorage.WithTimestampPrecision(tstorage.Nanoseconds),
|
||||
tstorage.WithPartitionDuration(time.Minute*30),
|
||||
tstorage.WithDataPath(ss.Path),
|
||||
)
|
||||
|
||||
ss.DB = db
|
||||
ss.TS = ts
|
||||
|
||||
if err := initTable(db); err != nil {
|
||||
return err
|
||||
|
|
@ -42,6 +35,129 @@ func (ss *Sqlite) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ss *Sqlite) Close() error {
|
||||
ss.DB.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
const defaultSamples = 64
|
||||
const maxSamples = 128
|
||||
|
||||
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) (LogAggregate, error) {
|
||||
if la.End.IsZero() || la.End.After(time.Now()) {
|
||||
la.End = time.Now().UTC()
|
||||
}
|
||||
|
||||
if la.Start.After(la.End) {
|
||||
return LogAggregate{}, errors.New("Start time cannot be before end time")
|
||||
}
|
||||
|
||||
if la.Start.IsZero() {
|
||||
la.Start = time.Now().UTC().Add(time.Hour * -12)
|
||||
}
|
||||
|
||||
timespanSecs := int(la.End.Sub(la.Start) / time.Second)
|
||||
|
||||
// how many data points to show on the line plot
|
||||
sampleCount := defaultSamples
|
||||
|
||||
if la.IntervalSeconds <= 0 {
|
||||
la.IntervalSeconds = timespanSecs / sampleCount
|
||||
}
|
||||
|
||||
sampleCount = timespanSecs / la.IntervalSeconds
|
||||
|
||||
// cap to prevent performance issues
|
||||
if sampleCount > maxSamples {
|
||||
sampleCount = maxSamples
|
||||
la.IntervalSeconds = timespanSecs / sampleCount
|
||||
log.Printf("got %v samples, capping to 256 for perf", sampleCount)
|
||||
}
|
||||
|
||||
log.Printf("%+v - samples: %v - timespan (seconds): %v", la, sampleCount, timespanSecs)
|
||||
|
||||
switch la.Column {
|
||||
case string(Domain):
|
||||
case string(Status):
|
||||
case string(ClientIP):
|
||||
case string(Protocol):
|
||||
break
|
||||
default:
|
||||
la.Column = string(Domain)
|
||||
}
|
||||
|
||||
logs, err := ss.GetLog(GetLogInput{
|
||||
Start: la.Start,
|
||||
End: la.End,
|
||||
Limit: 10000,
|
||||
Page: 0,
|
||||
})
|
||||
if err != nil {
|
||||
return LogAggregate{}, err
|
||||
}
|
||||
|
||||
if logs.PageCount > 1 {
|
||||
return LogAggregate{}, fmt.Errorf("more than one page available: %v", logs.PageCount)
|
||||
}
|
||||
|
||||
lut := [][]StatsDataPoint{}
|
||||
buckets := map[string][]StatsDataPoint{}
|
||||
for _, l := range logs.Logs {
|
||||
k := GetAggregateColumnHeader(l, LogAggregateColumn(la.Column))
|
||||
if _, ok := buckets[k]; !ok {
|
||||
buckets[k] = make([]StatsDataPoint, sampleCount)
|
||||
lut = append(lut, buckets[k])
|
||||
}
|
||||
dataset := buckets[k]
|
||||
|
||||
timeIndex := int(l.Started.Sub(la.Start)/time.Second) / la.IntervalSeconds
|
||||
|
||||
ladp := dataset[timeIndex]
|
||||
ladp.Header = k
|
||||
offsetSecs := (timeIndex * la.IntervalSeconds)
|
||||
ladp.Time = la.Start.Add(time.Duration(offsetSecs) * time.Second)
|
||||
ladp.Count += 1
|
||||
ladp.Value += float64(l.TotalTimeMs)
|
||||
|
||||
buckets[k][timeIndex] = ladp
|
||||
}
|
||||
|
||||
laResult := LogAggregate{
|
||||
Labels: make([]string, sampleCount),
|
||||
Datasets: make([]LogAggregateDataset, len(buckets)),
|
||||
}
|
||||
|
||||
for idx := 0; idx < sampleCount; idx++ {
|
||||
offsetSecs := (idx * la.IntervalSeconds)
|
||||
ts := la.Start.Add(time.Duration(offsetSecs) * time.Second)
|
||||
laResult.Labels[idx] = ts.Format("01-02 15:04:05")
|
||||
|
||||
idx := 0
|
||||
for k, v := range buckets {
|
||||
ladp := v[idx]
|
||||
if ladp.Time.IsZero() {
|
||||
v[idx].Time = ts
|
||||
}
|
||||
laResult.Datasets[idx].Dataset = v
|
||||
laResult.Datasets[idx].Label = k
|
||||
idx++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return laResult, nil
|
||||
}
|
||||
|
||||
type LogAggregate struct {
|
||||
Labels []string `json:"labels"`
|
||||
Datasets []LogAggregateDataset `json:"datasets"`
|
||||
}
|
||||
|
||||
type LogAggregateDataset struct {
|
||||
Label string `json:"label"`
|
||||
Dataset []StatsDataPoint `json:"data"`
|
||||
}
|
||||
|
||||
func (ss *Sqlite) Log(ql QueryLog) error {
|
||||
sql := `
|
||||
INSERT INTO log
|
||||
|
|
@ -63,49 +179,6 @@ func (ss *Sqlite) Log(ql QueryLog) error {
|
|||
return err
|
||||
}
|
||||
|
||||
labels := QueryLogLabels(ql)
|
||||
isError := 0
|
||||
if ql.Error != "" {
|
||||
isError = 1
|
||||
}
|
||||
|
||||
if err := ss.TS.InsertRows([]tstorage.Row{
|
||||
{
|
||||
Metric: "Count",
|
||||
Labels: labels,
|
||||
DataPoint: tstorage.DataPoint{
|
||||
Timestamp: ql.Started.UnixNano(),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Metric: "TotalTimeMs",
|
||||
Labels: labels,
|
||||
DataPoint: tstorage.DataPoint{
|
||||
Timestamp: ql.Started.UnixNano(),
|
||||
Value: float64(ql.TotalTimeMs),
|
||||
},
|
||||
},
|
||||
{
|
||||
Metric: "Errors",
|
||||
Labels: labels,
|
||||
DataPoint: tstorage.DataPoint{
|
||||
Timestamp: ql.Started.UnixNano(),
|
||||
Value: float64(isError),
|
||||
},
|
||||
},
|
||||
{
|
||||
Metric: "RecurseTimeMs",
|
||||
Labels: labels,
|
||||
DataPoint: tstorage.DataPoint{
|
||||
Timestamp: ql.Started.UnixNano(),
|
||||
Value: float64(ql.RecurseRoundTripTimeMs),
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -380,50 +453,6 @@ func (ss *Sqlite) GetPagingInfo(in GetLogInput) (totalItems, pageCount int, err
|
|||
return
|
||||
}
|
||||
|
||||
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) ([]LogAggregateDataPoint, error) {
|
||||
timeWindow := int64(5 * 60)
|
||||
column := "domain"
|
||||
|
||||
if lac, ok := AggregateKeys[la.Column]; ok {
|
||||
column = string(lac)
|
||||
}
|
||||
|
||||
if la.IntervalSeconds > 0 {
|
||||
timeWindow = int64(la.IntervalSeconds)
|
||||
}
|
||||
|
||||
points, err := ss.TS.Select(column, GetLabelsFromQuery(la), la.Start.UnixNano(), la.End.UnixNano())
|
||||
|
||||
var
|
||||
for _, p := range points {
|
||||
ladp := LogAggregateDataPoint{
|
||||
Header: column,
|
||||
Count: int(p.Value),
|
||||
Time: time.Unix(0, p.Timestamp),
|
||||
}
|
||||
results = append(results, ladp)
|
||||
}
|
||||
|
||||
// for rows.Next() {
|
||||
// var ladp LogAggregateDataPoint
|
||||
// var timeInterval int64
|
||||
// if err := rows.Scan(
|
||||
// &ladp.Header,
|
||||
// &ladp.AverageTotalTime,
|
||||
// &ladp.Count,
|
||||
// &timeInterval,
|
||||
// ); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// ladp.Time = time.Unix(timeInterval*timeWindow, 0)
|
||||
|
||||
// results = append(results, ladp)
|
||||
// }
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func initTable(db *sql.DB) error {
|
||||
sql := `
|
||||
CREATE TABLE IF NOT EXISTS log (
|
||||
|
|
|
|||
|
|
@ -37,9 +37,9 @@ type LogAggregateInput struct {
|
|||
Column string
|
||||
}
|
||||
|
||||
type LogAggregateDataPoint struct {
|
||||
type StatsDataPoint struct {
|
||||
Header string
|
||||
AverageTotalTime float64
|
||||
Value float64
|
||||
Count int
|
||||
Time time.Time
|
||||
}
|
||||
|
|
@ -60,7 +60,7 @@ type Storage interface {
|
|||
|
||||
Log(QueryLog) error
|
||||
GetLog(GetLogInput) (GetLogResult, error)
|
||||
GetLogAggregate(LogAggregateInput) ([]LogAggregateDataPoint, error)
|
||||
GetLogAggregate(LogAggregateInput) (LogAggregate, error)
|
||||
}
|
||||
|
||||
type RecursorRow struct {
|
||||
|
|
|
|||
2
main.go
2
main.go
|
|
@ -37,6 +37,8 @@ func main() {
|
|||
Path: conf.DatabaseURL,
|
||||
}
|
||||
|
||||
defer store.Close()
|
||||
|
||||
if err := store.Open(); err != nil {
|
||||
log.Fatalf("COULD NOT OPEN SQLITE DB: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue