pull/1/head
Adam Veldhousen 2021-06-07 21:41:56 -05:00
parent f00be3686d
commit abc1017453
Signed by: adam
GPG Key ID: 6DB29003C6DD1E4B
7 changed files with 264 additions and 179 deletions

View File

@ -1,5 +1,5 @@
{
"database": "./db.sqlite",
"database": ".",
"cache": "in-memory",
"http-addr": "localhost:8000",
"dns-addr": "localhost:5353",

1
go.mod
View File

@ -9,4 +9,5 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/mattn/go-sqlite3 v1.14.7
github.com/miekg/dns v1.1.41
github.com/nakabonne/tstorage v0.1.2 // indirect
)

8
go.sum
View File

@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs=
github.com/go-chi/chi v1.5.4/go.mod h1:uaf8YgoFazUOkPBG7fxPftUylNumIev9awIWOENIuEg=
github.com/go-chi/chi/v5 v5.0.2 h1:4xKeALZdMEsuI5s05PU2Bm89Uc5iM04qFubUCl5LfAQ=
@ -10,6 +11,11 @@ github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEg
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/nakabonne/tstorage v0.1.2 h1:kZcXXyO10DDUvDBRHhRo9+VwSTAuvIGt/WpEb+TXAbM=
github.com/nakabonne/tstorage v0.1.2/go.mod h1:n1v68nvIeUguEaYuSqz1ycmiMkF02CJMKSJV0Of1h4w=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -19,3 +25,5 @@ golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,11 +1,13 @@
package internal
import (
"fmt"
"log"
"strings"
"time"
"github.com/miekg/dns"
"github.com/nakabonne/tstorage"
)
type DomainManager struct {
@ -88,3 +90,37 @@ type QueryLog struct {
Error string
Status ResponseStatus
}
func QueryLogLabels(q QueryLog) []tstorage.Label {
return []tstorage.Label{
{
Name: string(ClientIP),
Value: q.ClientIP,
},
{
Name: string(Domain),
Value: q.Domain,
},
{
Name: string(Protocol),
Value: q.Protocol,
},
{
Name: string(Status),
Value: string(q.Status),
},
{
Name: string(RecursedUpstream),
Value: string(q.RecurseUpstreamIP),
},
{
Name: string(LookupError),
Value: fmt.Sprintf("%s", q.Error != ""),
},
}
}
func GetLabelsFromQuery(la LogAggregateInput) []tstorage.Label {
return nil
}

View File

@ -42,24 +42,8 @@ func NewAdminHandler(c Cache, s Storage, re *RuleEngine, content fs.FS) http.Han
handler.Use(middleware.RealIP)
handler.Use(middleware.Logger)
handler.Use(middleware.Recoverer)
handler.Use(middleware.Timeout(time.Second * 5))
// TODO: smarter way https://github.com/go-chi/chi/issues/403
// handler.Handle("/build/", http.StripPrefix("/build/", http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// file, err := content.Open(fmt.Sprintf("client/public/build/%s", r.URL.Path))
// if err != nil {
// rw.WriteHeader(http.StatusNotFound)
// return
// }
// defer file.Close()
// if _, err := io.Copy(rw, file); err != nil {
// rw.WriteHeader(http.StatusInternalServerError)
// return
// }
// })))
handler.Route("/api/v1", func(r chi.Router) {
r.Use(middleware.AllowContentType("application/json; utf-8", "application/json"))
r.Use(cors.Handler(cors.Options{
@ -69,6 +53,10 @@ func NewAdminHandler(c Cache, s Storage, re *RuleEngine, content fs.FS) http.Han
AllowCredentials: false,
MaxAge: 300,
}))
r.Use(middleware.SetHeader("Content-Type", "application/json; utf-8"))
r.Use(middleware.Timeout(time.Second * 5))
r.Get("/metrics/log", RestHandler(a.getLog).ToHF())
r.Get("/metrics/stats", RestHandler(a.getStats).ToHF())
@ -83,6 +71,7 @@ func NewAdminHandler(c Cache, s Storage, re *RuleEngine, content fs.FS) http.Han
r.Get("/recursors/{id:[0-9]+}", RestHandler(a.getRecursor).ToHF())
r.Delete("/recursor/{id:[0-9]+}", RestHandler(a.deleteRecursor).ToHF())
r.HandleFunc("/signal", a.signal)
// r.Put("/rules/lists", a.addRulelist)
// r.Get("/rules/lists", a.getRuleLists)
// r.Post("/rules/lists/reload/{id}", a.reloadRuleLists)
@ -92,7 +81,6 @@ func NewAdminHandler(c Cache, s Storage, re *RuleEngine, content fs.FS) http.Han
// r.Delete("/cache/purgeall", RestHandler(a.purgeAll).ToHF())
// r.Delete("/cache/purge", a.purgeKey)
// r.Get("/cache", a.getCacheContents)
r.HandleFunc("/signal", a.signal)
})
fs := http.FS(content)

View File

@ -3,39 +3,110 @@ package internal
import (
"database/sql"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/nakabonne/tstorage"
)
const ISO8601 = "2006-01-02 15:04:05.999"
type Storage interface {
io.Closer
Open() error
AddRecursors(net.IP, int, int, int) error
GetRecursors() ([]RecursorRow, error)
UpdateRecursor(int, RecursorRow) error
DeleteRecursors(int) error
AddRule(RuleRow) error
GetRule(int) (RuleRow, error)
GetRules() ([]RuleRow, error)
UpdateRule(int, RuleRow) error
DeleteRule(int) error
Log(QueryLog) error
GetLog(GetLogInput) (GetLogResult, error)
GetLogAggregate(LogAggregateInput) ([]LogAggregateDataPoint, error)
}
type Sqlite struct {
Path string
*sql.DB
TS tstorage.Storage
}
func (ss *Sqlite) Open() error {
db, err := sql.Open("sqlite3", fmt.Sprintf("%s/db.sqlite?cache=shared&_journal=WAL", ss.Path))
if err != nil {
return fmt.Errorf("could not open db: %w", err)
}
db.SetMaxOpenConns(1)
ts, _ := tstorage.NewStorage(
tstorage.WithTimestampPrecision(tstorage.Nanoseconds),
tstorage.WithPartitionDuration(time.Minute*30),
tstorage.WithDataPath(ss.Path),
)
ss.DB = db
ss.TS = ts
if err := initTable(db); err != nil {
return err
}
return nil
}
func (ss *Sqlite) Log(ql QueryLog) error {
sql := `
INSERT INTO log
(started, clientIp, protocol, domain, totalTimeMs, error, recurseRoundTripTimeMs, recurseUpstreamIp, status)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?);
`
if _, err := ss.DB.Exec(sql,
ql.Started.UTC().Format(ISO8601),
ql.ClientIP,
ql.Protocol,
ql.Domain,
ql.TotalTimeMs,
ql.Error,
ql.RecurseRoundTripTimeMs,
ql.RecurseUpstreamIP,
ql.Status,
); err != nil {
return err
}
labels := QueryLogLabels(ql)
isError := 0
if ql.Error != "" {
isError = 1
}
if err := ss.TS.InsertRows([]tstorage.Row{
{
Metric: "Count",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: 1,
},
},
{
Metric: "TotalTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.TotalTimeMs),
},
},
{
Metric: "Errors",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(isError),
},
},
{
Metric: "RecurseTimeMs",
Labels: labels,
DataPoint: tstorage.DataPoint{
Timestamp: ql.Started.UnixNano(),
Value: float64(ql.RecurseRoundTripTimeMs),
},
},
}); err != nil {
return err
}
return nil
}
func (ss *Sqlite) GetRecursors() ([]RecursorRow, error) {
@ -73,13 +144,6 @@ func (ss *Sqlite) DeleteRecursors(id int) error {
return nil
}
type RecursorRow struct {
ID int `json:"id"`
IpAddress string `json:"ipAddress"`
TimeoutMs int `json:"timeoutMs"`
Weight int `json:"weight"`
}
func (rr RecursorRow) ValidIp() (net.IP, int, bool) {
ipAddrFrags := strings.Split(rr.IpAddress, ":")
if len(ipAddrFrags) == 0 || len(ipAddrFrags) > 2 {
@ -118,22 +182,6 @@ func (ss *Sqlite) AddRecursors(ip net.IP, port, timeout, weight int) error {
return nil
}
type GetLogInput struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
DomainFilter string `json:"rawfilter"`
Limit int `json:"pageSize"`
Page int `json:"page"`
}
type RuleRow struct {
ID int `json:"id"`
Weight int `json:"weight"`
Enabled bool `json:"enabled"`
Created time.Time `json:"created"`
Rule
}
func (ss *Sqlite) UpdateRule(id int, in RuleRow) error {
sql := `UPDATE rules SET
name = ?,
@ -234,13 +282,6 @@ func (ss *Sqlite) GetRules() ([]RuleRow, error) {
return results, nil
}
type GetLogResult struct {
GetLogInput
TotalResults int `json:"total"`
PageCount int `json:"pageCount"`
Logs []QueryLog `json:"logs"`
}
func (ss *Sqlite) GetLog(in GetLogInput) (GetLogResult, error) {
if in.Limit <= 0 {
in.Limit = 100
@ -339,38 +380,6 @@ func (ss *Sqlite) GetPagingInfo(in GetLogInput) (totalItems, pageCount int, err
return
}
type LogAggregateColumn string
var (
Domain = LogAggregateColumn("domain")
ClientIP = LogAggregateColumn("clientIp")
RecurseIP = LogAggregateColumn("recurseUpStreamIP")
Protocol = LogAggregateColumn("protocol")
Status = LogAggregateColumn("status")
AggregateKeys = map[string]LogAggregateColumn{
"domain": Domain,
"clientIp": ClientIP,
"recurseUpStreamIP": RecurseIP,
"protocol": Protocol,
"status": Status,
}
)
type LogAggregateInput struct {
IntervalSeconds int
Start time.Time
End time.Time
Column string
}
type LogAggregateDataPoint struct {
Header string
AverageTotalTime float64
Count int
Time time.Time
}
func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) ([]LogAggregateDataPoint, error) {
timeWindow := int64(5 * 60)
column := "domain"
@ -383,91 +392,38 @@ func (ss *Sqlite) GetLogAggregate(la LogAggregateInput) ([]LogAggregateDataPoint
timeWindow = int64(la.IntervalSeconds)
}
sql := `
SELECT
%s,
ROUND(AVG(totalTimeMs), 3) as averageTotalTime,
COUNT(*) as requests,
strftime('%%s', started)/(%d) as "timeWindow"
FROM log
GROUP BY %s, strftime('%%s', started) / (%d)
ORDER BY started ASC;
`
points, err := ss.TS.Select(column, GetLabelsFromQuery(la), la.Start.UnixNano(), la.End.UnixNano())
sql = fmt.Sprintf(sql, column, timeWindow, column, timeWindow)
rows, err := ss.Query(sql)
if err != nil {
return nil, err
}
defer rows.Close()
if err := rows.Err(); err != nil {
return nil, err
}
var results []LogAggregateDataPoint
for rows.Next() {
var ladp LogAggregateDataPoint
var timeInterval int64
if err := rows.Scan(
&ladp.Header,
&ladp.AverageTotalTime,
&ladp.Count,
&timeInterval,
); err != nil {
return nil, err
var
for _, p := range points {
ladp := LogAggregateDataPoint{
Header: column,
Count: int(p.Value),
Time: time.Unix(0, p.Timestamp),
}
ladp.Time = time.Unix(timeInterval*timeWindow, 0)
results = append(results, ladp)
}
// for rows.Next() {
// var ladp LogAggregateDataPoint
// var timeInterval int64
// if err := rows.Scan(
// &ladp.Header,
// &ladp.AverageTotalTime,
// &ladp.Count,
// &timeInterval,
// ); err != nil {
// return nil, err
// }
// ladp.Time = time.Unix(timeInterval*timeWindow, 0)
// results = append(results, ladp)
// }
return results, nil
}
func (ss *Sqlite) Log(ql QueryLog) error {
sql := `
INSERT INTO log
(started, clientIp, protocol, domain, totalTimeMs, error, recurseRoundTripTimeMs, recurseUpstreamIp, status)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?);
`
if _, err := ss.DB.Exec(sql,
ql.Started.UTC().Format(ISO8601),
ql.ClientIP,
ql.Protocol,
ql.Domain,
ql.TotalTimeMs,
ql.Error,
ql.RecurseRoundTripTimeMs,
ql.RecurseUpstreamIP,
ql.Status,
); err != nil {
return err
}
return nil
}
func (ss *Sqlite) Open() error {
db, err := sql.Open("sqlite3", fmt.Sprintf("%s?cache=shared&_journal=WAL", ss.Path))
if err != nil {
return fmt.Errorf("could not open db: %w", err)
}
db.SetMaxOpenConns(1)
ss.DB = db
if err := initTable(db); err != nil {
return err
}
return nil
}
func initTable(db *sql.DB) error {
sql := `
CREATE TABLE IF NOT EXISTS log (

96
internal/storage.go Normal file
View File

@ -0,0 +1,96 @@
package internal
import (
"io"
"net"
"time"
)
const ISO8601 = "2006-01-02 15:04:05.999"
var (
Domain = LogAggregateColumn("domain")
ClientIP = LogAggregateColumn("clientIp")
RecurseIP = LogAggregateColumn("recurseUpStreamIP")
Protocol = LogAggregateColumn("protocol")
Status = LogAggregateColumn("status")
LookupError = LogAggregateColumn("error")
AggregateKeys = map[string]LogAggregateColumn{
"domain": Domain,
"clientIp": ClientIP,
"recurseUpStreamIP": RecurseIP,
"protocol": Protocol,
"status": Status,
}
CountMetric = "Count"
TotalTimeMetric = "TotalTimeMs"
ErrorMetric = "Errors"
RecurseTimeMetric = "RecurseTimeMs"
)
type LogAggregateInput struct {
IntervalSeconds int
Start time.Time
End time.Time
Column string
}
type LogAggregateDataPoint struct {
Header string
AverageTotalTime float64
Count int
Time time.Time
}
type Storage interface {
io.Closer
Open() error
AddRecursors(net.IP, int, int, int) error
GetRecursors() ([]RecursorRow, error)
UpdateRecursor(int, RecursorRow) error
DeleteRecursors(int) error
AddRule(RuleRow) error
GetRule(int) (RuleRow, error)
GetRules() ([]RuleRow, error)
UpdateRule(int, RuleRow) error
DeleteRule(int) error
Log(QueryLog) error
GetLog(GetLogInput) (GetLogResult, error)
GetLogAggregate(LogAggregateInput) ([]LogAggregateDataPoint, error)
}
type RecursorRow struct {
ID int `json:"id"`
IpAddress string `json:"ipAddress"`
TimeoutMs int `json:"timeoutMs"`
Weight int `json:"weight"`
}
type GetLogInput struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
DomainFilter string `json:"rawfilter"`
Limit int `json:"pageSize"`
Page int `json:"page"`
}
type RuleRow struct {
ID int `json:"id"`
Weight int `json:"weight"`
Enabled bool `json:"enabled"`
Created time.Time `json:"created"`
Rule
}
type GetLogResult struct {
GetLogInput
TotalResults int `json:"total"`
PageCount int `json:"pageCount"`
Logs []QueryLog `json:"logs"`
}
type LogAggregateColumn string