diff --git a/glide.lock b/glide.lock index e17fba9..f1b072a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 2936591cb6ef55dc5517f96d731851b2135281bd2200919933219089b837e517 -updated: 2017-06-11T21:38:09.459666062+10:00 +hash: 9205cd1cc00a2ac50c6d793dafb1850c244853cabb3cfab85be7b6f524dd2464 +updated: 2018-07-12T16:47:57.80074341+08:00 imports: - name: github.com/beorn7/perks version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 @@ -72,6 +72,18 @@ imports: - leveldb/storage - leveldb/table - leveldb/util +- name: go.uber.org/atomic + version: 1ea20fb1cbb1cc08cbd0d913a96dead89aa18289 +- name: go.uber.org/multierr + version: 3c4937480c32f4c13a875a1829af76c98ca3d40a +- name: go.uber.org/zap + version: eeedf312bc6c57391d84767a4cd413f02a917974 + subpackages: + - buffer + - internal/bufferpool + - internal/color + - internal/exit + - zapcore - name: golang.org/x/net version: 1a68b1313cf4ad7778376e82641197b60c02f65c subpackages: diff --git a/glide.yaml b/glide.yaml index 4d790d8..d10bb39 100644 --- a/glide.yaml +++ b/glide.yaml @@ -18,3 +18,5 @@ import: - package: gopkg.in/tylerb/graceful.v1 version: v1.2.15 - package: github.com/kshvakov/clickhouse +- package: go.uber.org/zap + version: ^1.8.0 diff --git a/main.go b/main.go index 02e6b09..c693e50 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "time" + + "go.uber.org/zap" ) // a lot of this borrows directly from: @@ -28,11 +30,10 @@ type config struct { var ( versionFlag bool + debug bool ) func main() { - excode := 0 - conf := parseFlags() if versionFlag { @@ -41,27 +42,32 @@ func main() { if VersionPrerelease != "" { fmt.Println("Version PreRelease:", VersionPrerelease) } - os.Exit(excode) + return + } + + var logger *zap.Logger + if debug { + logger, _ = zap.NewDevelopment() + } else { + logger, _ = zap.NewProduction() } - fmt.Println("Starting up..") + defer logger.Sync() // flushes buffer, if any + sugar := logger.Sugar() - srv, err := NewP2CServer(conf) + sugar.Info("Starting up..") + + srv, err := NewP2CServer(conf, sugar) if err != nil { - fmt.Printf("Error: could not create server: %s\n", err.Error()) - excode = 1 - os.Exit(excode) + sugar.Fatalf("could not create server: %s\n", err.Error()) } err = srv.Start() if err != nil { - fmt.Printf("Error: http server returned error: %s\n", err.Error()) - excode = 1 + sugar.Fatalf("http server returned error: %s\n", err.Error()) } - - fmt.Println("Shutting down..") + sugar.Info("Shutting down..") srv.Shutdown() - fmt.Println("Exiting..") - os.Exit(excode) + sugar.Info("Exiting..") } func parseFlags() *config { @@ -70,6 +76,9 @@ func parseFlags() *config { // print version? flag.BoolVar(&versionFlag, "version", false, "Version") + // turn on debug? + flag.BoolVar(&debug, "debug", false, "turn on debug mode") + // clickhouse dsn ddsn := "tcp://127.0.0.1:9000?username=&password=&database=metrics&" + "read_timeout=10&write_timeout=10&alt_hosts=" diff --git a/reader.go b/reader.go index 2715132..2b0fa14 100644 --- a/reader.go +++ b/reader.go @@ -7,13 +7,18 @@ import ( "fmt" "strings" + "github.com/kshvakov/clickhouse" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/remote" + "go.uber.org/zap" ) +var readerContent = []interface{}{"component", "reader"} + type p2cReader struct { - conf *config - db *sql.DB + conf *config + db *sql.DB + logger *zap.SugaredLogger } // getTimePeriod return select and where SQL chunks relating to the time period -or- error @@ -152,13 +157,23 @@ func (r *p2cReader) getSQL(query *remote.Query) (string, error) { return sql, nil } -func NewP2CReader(conf *config) (*p2cReader, error) { +func NewP2CReader(conf *config, sugar *zap.SugaredLogger) (*p2cReader, error) { var err error r := new(p2cReader) r.conf = conf + r.logger = sugar r.db, err = sql.Open("clickhouse", r.conf.ChDSN) if err != nil { - fmt.Printf("Error connecting to clickhouse: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("connecting to clickhouse: %s", err.Error()) + return r, err + } + + if err := r.db.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + r.logger.With(readerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) + } else { + r.logger.With(readerContent...).Error(err.Error()) + } return r, err } @@ -182,27 +197,27 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) rcount := 0 for _, q := range req.Queries { // remove me.. - fmt.Printf("\nquery: start: %d, end: %d\n\n", q.StartTimestampMs, q.EndTimestampMs) + r.logger.With(readerContent...).Debug("\nquery: start: %d, end: %d", q.StartTimestampMs, q.EndTimestampMs) // get the select sql sqlStr, err = r.getSQL(q) - fmt.Printf("query: running sql: %s\n\n", sqlStr) + r.logger.With(readerContent...).Debug("query: running sql: %s", sqlStr) if err != nil { - fmt.Printf("Error: reader: getSQL: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error()) return &resp, err } // get the select sql if err != nil { - fmt.Printf("Error: reader: getSQL: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("reader: getSQL: %s", err.Error()) return &resp, err } // todo: metrics on number of errors, rows, selects, timings, etc rows, err = r.db.Query(sqlStr) if err != nil { - fmt.Printf("Error: query failed: %s", sqlStr) - fmt.Printf("Error: query error: %s\n", err) + r.logger.With(readerContent...).Errorf("query failed: %s", sqlStr) + r.logger.With(readerContent...).Errorf("query error: %s", err) return &resp, err } @@ -218,7 +233,7 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) value float64 ) if err = rows.Scan(&cnt, &t, &name, &tags, &value); err != nil { - fmt.Printf("Error: scan: %s\n", err.Error()) + r.logger.With(readerContent...).Errorf("scan: %s", err.Error()) } // remove this.. //fmt.Printf(fmt.Sprintf("%d,%d,%s,%s,%f\n", cnt, t, name, strings.Join(tags, ":"), value)) @@ -244,7 +259,7 @@ func (r *p2cReader) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) } - fmt.Printf("query: returning %d rows for %d queries\n", rcount, len(req.Queries)) + r.logger.With(readerContent...).Debug("query: returning %d rows for %d queries", rcount, len(req.Queries)) return &resp, nil diff --git a/srv.go b/srv.go index 1a9918a..efb6c61 100644 --- a/srv.go +++ b/srv.go @@ -1,17 +1,17 @@ package main import ( + "fmt" "io/ioutil" "net/http" "time" - "fmt" - "github.com/golang/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/remote" + "go.uber.org/zap" "gopkg.in/tylerb/graceful.v1" ) @@ -29,24 +29,26 @@ type p2cServer struct { writer *p2cWriter reader *p2cReader rx prometheus.Counter + logger *zap.SugaredLogger } -func NewP2CServer(conf *config) (*p2cServer, error) { +func NewP2CServer(conf *config, sugar *zap.SugaredLogger) (*p2cServer, error) { var err error c := new(p2cServer) c.requests = make(chan *p2cRequest, conf.ChanSize) c.mux = http.NewServeMux() c.conf = conf + c.logger = sugar - c.writer, err = NewP2CWriter(conf, c.requests) + c.writer, err = NewP2CWriter(conf, c.requests, sugar) if err != nil { - fmt.Printf("Error creating clickhouse writer: %s\n", err.Error()) + c.logger.Errorf("creating clickhouse writer: %s\n", err.Error()) return c, err } - c.reader, err = NewP2CReader(conf) + c.reader, err = NewP2CReader(conf, sugar) if err != nil { - fmt.Printf("Error creating clickhouse reader: %s\n", err.Error()) + c.logger.Errorf("creating clickhouse reader: %s\n", err.Error()) return c, err } @@ -161,7 +163,7 @@ func (c *p2cServer) process(req remote.WriteRequest) { } func (c *p2cServer) Start() error { - fmt.Println("HTTP server starting...") + c.logger.Info("HTTP server starting...") c.writer.Start() return graceful.RunWithErr(c.conf.HTTPAddr, c.conf.HTTPTimeout, c.mux) } @@ -178,10 +180,10 @@ func (c *p2cServer) Shutdown() { select { case <-wchan: - fmt.Println("Writer shutdown cleanly..") + c.logger.Info("Writer shutdown cleanly..") // All done! case <-time.After(10 * time.Second): - fmt.Println("Writer shutdown timed out, samples will be lost..") + c.logger.Info("Writer shutdown timed out, samples will be lost..") } } diff --git a/writer.go b/writer.go index bba4ea8..b6cd48b 100644 --- a/writer.go +++ b/writer.go @@ -4,18 +4,20 @@ import ( "database/sql" "fmt" "sort" - "time" - "sync" + "time" "github.com/kshvakov/clickhouse" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) var insertSQL = `INSERT INTO %s.%s (date, name, tags, val, ts) VALUES (?, ?, ?, ?, ?)` +var writerContent = []interface{}{"component", "writer"} + type p2cWriter struct { conf *config requests chan *p2cRequest @@ -25,16 +27,28 @@ type p2cWriter struct { ko prometheus.Counter test prometheus.Counter timings prometheus.Histogram + + logger *zap.SugaredLogger } -func NewP2CWriter(conf *config, reqs chan *p2cRequest) (*p2cWriter, error) { +func NewP2CWriter(conf *config, reqs chan *p2cRequest, sugar *zap.SugaredLogger) (*p2cWriter, error) { var err error w := new(p2cWriter) w.conf = conf w.requests = reqs + w.logger = sugar w.db, err = sql.Open("clickhouse", w.conf.ChDSN) if err != nil { - fmt.Printf("Error connecting to clickhouse: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("connecting to clickhouse: %s", err.Error()) + return w, err + } + + if err := w.db.Ping(); err != nil { + if exception, ok := err.(*clickhouse.Exception); ok { + w.logger.With(writerContent...).Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) + } else { + w.logger.With(writerContent...).Error(err.Error()) + } return w, err } @@ -78,7 +92,7 @@ func (w *p2cWriter) Start() { go func() { w.wg.Add(1) - fmt.Println("Writer starting..") + w.logger.With(writerContent...).Info("Writer starting..") sql := fmt.Sprintf(insertSQL, w.conf.ChDB, w.conf.ChTable) ok := true for ok { @@ -92,7 +106,7 @@ func (w *p2cWriter) Start() { // get requet and also check if channel is closed req, ok = <-w.requests if !ok { - fmt.Println("Writer stopping..") + w.logger.With(writerContent...).Info("Writer stopping..") break } reqs = append(reqs, req) @@ -107,7 +121,7 @@ func (w *p2cWriter) Start() { // post them to db all at once tx, err := w.db.Begin() if err != nil { - fmt.Printf("Error: begin transaction: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("begin transaction: %s", err.Error()) w.ko.Add(1.0) continue } @@ -116,7 +130,7 @@ func (w *p2cWriter) Start() { smt, err := tx.Prepare(sql) for _, req := range reqs { if err != nil { - fmt.Printf("Error: prepare statement: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("prepare statement: %s", err.Error()) w.ko.Add(1.0) continue } @@ -128,14 +142,14 @@ func (w *p2cWriter) Start() { req.val, req.ts) if err != nil { - fmt.Printf("Error: statement exec: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("statement exec: %s", err.Error()) w.ko.Add(1.0) } } // commit and record metrics if err = tx.Commit(); err != nil { - fmt.Printf("Error: commit failed: %s\n", err.Error()) + w.logger.With(writerContent...).Errorf("commit failed: %s", err.Error()) w.ko.Add(1.0) } else { w.tx.Add(float64(nmetrics)) @@ -143,7 +157,7 @@ func (w *p2cWriter) Start() { } } - fmt.Println("Writer stopped..") + w.logger.With(writerContent...).Info("Writer stopped..") w.wg.Done() }() }