diff --git a/go.mod b/go.mod index 29c876d..a0dc898 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/minus5/svckit -go 1.25.0 +go 1.26 require ( github.com/aws/aws-sdk-go v1.55.7 @@ -16,6 +16,7 @@ require ( github.com/hashicorp/consul/api v1.18.0 github.com/json-iterator/go v1.1.12 github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c + github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16 github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86 github.com/nranchev/go-libGeoIP v0.0.0-20170629073846-d6d4a9a4c7e8 github.com/nsqio/go-nsq v1.1.0 diff --git a/go.sum b/go.sum index ac21aa7..c848ca3 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= +github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16 h1:I1gKMN7Av8zos39zW3/H8HI+SWwKDoKPxWz2GVtOnFY= +github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16/go.mod h1:U0qLktCviUuRBuVAbKq/Oq7EQmObdp4g2CWrizN5fMo= github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86 h1:bsm6+lQ2ea4dD6ZzGCz6GHXxmfU0ludQ4Cjxm7h/D9o= github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86/go.mod h1:eBPLk8FJ9uc035DnbHAWvINeYXIIVduZe5EILao6s7o= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/nsqx/consumer.go b/nsqx/consumer.go new file mode 100644 index 0000000..3223bc7 --- /dev/null +++ b/nsqx/consumer.go @@ -0,0 +1,127 @@ +package nsqx + +import ( + "fmt" + + "github.com/minus5/go-nsqx" + "github.com/minus5/svckit/dcy" + "github.com/minus5/svckit/log" +) + +type Consumer struct { + nsqConsumer *nsqx.Consumer + logger func() *log.Agregator + lookups dcy.Addresses +} + +type nsqHandler struct { + fn func(*Message) error +} + +func (h *nsqHandler) HandleMessage(m *nsqx.Message) error { + // javi periodicki nsqdu da je procesiranje jos u tijeku + stop := every(DefaultMsgTouchInterval, m.Touch) + defer close(stop) + return h.fn(newMessage(m)) +} + +func MustNewConsumer(topic string, handler func(*Message) error, + opts ...func(*options)) *Consumer { + c, err := NewConsumer(topic, handler, opts...) + if err != nil { + log.S("topic", topic).Fatal(err) + } + return c +} + +func NewConsumer(topic string, handler func(*Message) error, + opts ...func(*options)) (*Consumer, error) { + + o := getDefaults().clone() + o.apply(opts...) + + cfg := o.toNsqxConfig() + + c, err := nsqx.NewConsumer(topic, o.channel, cfg) + if err != nil { + return nil, err + } + + c.SetLogger(o.logger, o.logLevel) + c.AddConcurrentHandlers(&nsqHandler{fn: handler}, o.Concurrency()) + + // Use shared Discovery so all consumers in the process share one lookupd poller + disc := getDiscovery(o) + c.SetDiscovery(disc) + + err = c.ConnectToNSQLookupds(o.lookupds.String()) + if err != nil { + return nil, err + } + + co := &Consumer{ + lookups: o.lookupds, + nsqConsumer: c, + logger: func() *log.Agregator { + return logger().S("topic", topic).S("channel", o.channel) + }, + } + + // log options on start + co.logger(). + I("MaxInFlight", o.maxInFlight). + I("Concurrency", o.Concurrency()). + I("ZeroCopyThrshld", o.zeroCopyThreshold). + I("AckBatchSize", o.ackBatchSize). + I("ProducerPipelines", o.producerPipelines). + I("MaxAttempts", int(o.maxAttempts)). + I("CircuitBreakerThrshld", int(o.circuitBreakerThreshold)). + I("OutputBuffSize", o.outputBufferSize). + S("BackoffAlg", o.backoffAlgorithm.String()). + S("Compression", o.compression.String()). + Info("starting consumer") + + dcy.Subscribe(LookupdHTTPServiceName, co.onLookupChanges) + dcy.SubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, co.onLookupChanges) + return co, nil +} + +func (c *Consumer) onLookupChanges(as dcy.Addresses) { + // deduplicate addresses, dcy.Services and dcy.ServicesByTag can return same IP addr, + // tcp-nsqlookupd and tcp.nsqlookupd + seen := make(map[string]struct{}) + var unique []string + for _, a := range as { + s := a.String() + if _, dup := seen[s]; !dup { + seen[s] = struct{}{} + unique = append(unique, s) + } + } + + // Update shared Discovery address list, it will use + // these new addresses on the next cache miss/poll cycle + disc := getDiscovery(getDefaults()) + disc.SetAddrs(unique) + c.lookups = as + c.logger().S("lookupds", fmt.Sprintf("%v", unique)).Info("lookupds update") +} + +func (c *Consumer) Close() { + dcy.Unsubscribe(LookupdHTTPServiceName, c.onLookupChanges) + dcy.UnsubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, c.onLookupChanges) + c.nsqConsumer.Stop() +} + +// StartClosing will initiate a graceful stop of the Consumer (permanent), +// receive on returned chan to block until this process completes +func (c *Consumer) StartClosing() chan int { + dcy.Unsubscribe(LookupdHTTPServiceName, c.onLookupChanges) + dcy.UnsubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, c.onLookupChanges) + c.nsqConsumer.Stop() + // go-nsqx Stop() is synchronous — blocks until fully stopped. + // Return a closed channel to maintain the same API shape. + ch := make(chan int) + close(ch) + return ch +} diff --git a/nsqx/envelope.go b/nsqx/envelope.go new file mode 100644 index 0000000..a070bbc --- /dev/null +++ b/nsqx/envelope.go @@ -0,0 +1,82 @@ +package nsqx + +import ( + "bytes" + "encoding/json" + "strings" + "time" +) + +var ( + headerSeparator = []byte{10} //new line +) + +// Envelope arround message for request response communication over nsq +type Envelope struct { + Type string `json:"t,omitempty"` // type of the message + ReplyTo string `json:"r,omitempty"` // nsq topic to send reply to + CorrelationId string `json:"c,omitempty"` // connection between request and response + ExpiresAt int64 `json:"e,omitempty"` // unix timestamp when message expires, after that should be dropped + Error string `json:"error,omitempty"` + Body []byte `json:"-"` // message body +} + +// NewEnvelope decodes envelope from bytes +func NewEnvelope(buf []byte) (*Envelope, error) { + parts := bytes.SplitN(buf, headerSeparator, 2) + e := &Envelope{} + if err := json.Unmarshal(parts[0], e); err != nil { + return nil, err + } + if len(parts) > 1 { + e.Body = parts[1] + } + return e, nil +} + +// Bytes encodes envelope into bytes for putting on wire +func (m *Envelope) Bytes() []byte { + buf, _ := json.Marshal(m) + buf = append(buf, headerSeparator...) + buf = append(buf, m.Body...) + return buf +} + +// ParseBody deocdes Evelope body into o +func (m *Envelope) ParseBody(o interface{}) error { + if err := json.Unmarshal(m.Body, o); err != nil { + return err + } + return nil +} + +// Reply creates reply Envelope from request Envelope +func (m *Envelope) Reply(o interface{}, err error) (*Envelope, error) { + e := &Envelope{ + Type: strings.Replace(m.Type, ".req", ".rsp", 1), + CorrelationId: m.CorrelationId, + } + if err != nil { + e.Error = err.Error() + } + if o != nil { + if buf, ok := o.([]byte); ok { + e.Body = buf + } else { + buf, err := json.Marshal(o) + if err != nil { + return nil, err + } + e.Body = buf + } + } + return e, nil +} + +// Expired returns true if message expired +func (m *Envelope) Expired() bool { + if m.ExpiresAt <= 0 { + return false + } + return time.Now().Unix() > m.ExpiresAt +} diff --git a/nsqx/message.go b/nsqx/message.go new file mode 100644 index 0000000..fa819d5 --- /dev/null +++ b/nsqx/message.go @@ -0,0 +1,38 @@ +package nsqx + +import ( + "time" + + "github.com/minus5/go-nsqx" +) + +// Message wraps nsqx.Message so callers dont need to import go-nsqx directly +type Message struct { + nsqm *nsqx.Message + ID nsqx.MessageID + Body []byte + Timestamp int64 // unix nanoseconds + Attempts uint16 + NSQDAddress string +} + +func newMessage(m *nsqx.Message) *Message { + body := make([]byte, len(m.Body)) + copy(body, m.Body) + return &Message{ + nsqm: m, + ID: m.ID, + Body: body, + Timestamp: m.Timestamp.UnixNano(), + Attempts: m.Attempts, + NSQDAddress: m.NSQDAddress, + } +} + +func (m *Message) RequeueWithoutBackoff(delay time.Duration) { + m.nsqm.RequeueWithoutBackoff(delay) +} + +func (m *Message) Touch() { + m.nsqm.Touch() +} diff --git a/nsqx/nsqx.go b/nsqx/nsqx.go new file mode 100644 index 0000000..f2c9f6d --- /dev/null +++ b/nsqx/nsqx.go @@ -0,0 +1,153 @@ +package nsqx + +import ( + "fmt" + "os" + "sync" + "time" + + "github.com/minus5/go-nsqx" + "github.com/minus5/svckit/dcy" + "github.com/minus5/svckit/env" + "github.com/minus5/svckit/log" + "github.com/minus5/svckit/signal" +) + +const ( + DefaultMaxInFlight = 256 + DefaultConcurrency = 16 + LookupdHTTPServiceName = "nsqlookupd-http" + LookupdHTTPServiceNameByTag = "nsqlookupd" + LookupdHTTPServiceTag = "http" + EnvNsqd = "SVCKIT_NSQD" + DefaultMsgTouchInterval = time.Second * 30 + DefaultStatsdPrefix = "nsq." // to keep the same as nsqd prefix, maybe change for diff? +) + +var ( + Pub = MustNewProducer + Sub = MustNewConsumer + + defaults *options + initMu sync.Mutex + discovery *nsqx.Discovery // shared across all consumers +) + +func getDefaults() *options { + initMu.Lock() + defer initMu.Unlock() + if defaults == nil { + initDefaults() + } + return defaults +} + +func Set(opts ...func(*options)) { + initMu.Lock() + defer initMu.Unlock() + if defaults == nil { + initDefaults() + } + defaults.apply(opts...) +} + +func initDefaults() { + defaults = &options{ + maxInFlight: DefaultMaxInFlight, + concurrency: DefaultConcurrency, + channel: fmt.Sprintf("%s-%s", env.AppName(), env.InstanceId()), + nsqdTCPAddr: "127.0.0.1:4150", + lookupds: dcy.Addresses{dcy.Address{Address: "127.0.0.1", Port: 4161}}, + logLevel: nsqx.LogLevelWarning, + zeroCopyThreshold: 0, // always copy, so that messages are not truncated + backoffAlgorithm: nsqx.BackoffFixed, // Fixed backoff every FixedInterval + backoffFixedInterval: 2 * time.Second, + logger: &nsqLogger{}, + lookupdPollInterval: 15 * time.Second, // see how will this behave in prod, current nsq pulls every 10s from consul + lookupdCacheTTL: 10 * time.Second, + lookupdPollTimeout: 5 * time.Second, + } + if e, ok := os.LookupEnv(EnvNsqd); ok && e != "" { + defaults.nsqdTCPAddr = e + logger().S("nsqd", defaults.nsqdTCPAddr).Debug("init nsqd") + } + connect := func() error { + var addrs dcy.Addresses + a, err := dcy.Services(LookupdHTTPServiceName) + if err != nil && err != dcy.ErrNotFound { + logger().Error(err) + } + if err == nil { + addrs.Append(a) + } + a, err = dcy.ServicesByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag) + if err == nil { + addrs.Append(a) + } + if err != nil && err != dcy.ErrNotFound { + logger().Error(err) + } + if len(addrs) == 0 { + return dcy.ErrNotFound + } + defaults.lookupds = addrs + logger().S("lookupds", fmt.Sprintf("%v", defaults.lookupds.String())).Info("init lookupds") + return nil + } + if err := signal.WithExponentialBackoff(connect); err != nil { + logger().Fatal(err) + } + // statsd metrics for every topic/channel, new with nsqx + if addr, ok := os.LookupEnv("STATSD_LOGGER_ADDRESS"); ok && addr != "" { + defaults.statsdAddr = addr + defaults.metricsBackend = nsqx.MetricsStatsd + defaults.statsdPrefix = fmt.Sprintf("%s.", env.AppName()) + logger().S("statsd", defaults.statsdAddr).Info("init statsd") + } +} + +// getDiscovery returns the shared Discovery instance +func getDiscovery(o *options) *nsqx.Discovery { + initMu.Lock() + defer initMu.Unlock() + if discovery == nil { + cfg := o.toNsqxConfig() + discovery = nsqx.NewDiscovery(o.lookupds.String(), cfg, o.logger) + } + return discovery +} + +func logger() *log.Agregator { + return log.S("lib", "svckit.nsqx") +} + +// ChannelAppName sets default channel name to app name +func ChannelAppName() { + Set(Channel(env.AppName())) +} + +// ChannelEphemeral sets default channel name to app name suffixed with node name and #ephemeral +func ChannelEphemeral() { + Set(Channel(fmt.Sprintf("%s-%s#ephemeral", env.AppName(), env.InstanceId()))) +} + +func DefaultChannel(c string) { + Set(Channel(c)) +} + +func every(duration time.Duration, work func()) chan struct{} { + stop := make(chan struct{}) + go func() { + ticker := time.NewTicker(duration) + for { + select { + case <-ticker.C: + work() + case <-stop: + ticker.Stop() + return + } + } + }() + return stop +} diff --git a/nsqx/options.go b/nsqx/options.go new file mode 100644 index 0000000..9a84660 --- /dev/null +++ b/nsqx/options.go @@ -0,0 +1,386 @@ +package nsqx + +import ( + "crypto/tls" + "strings" + "time" + + "github.com/minus5/go-nsqx" + "github.com/minus5/svckit/dcy" + "github.com/minus5/svckit/log" +) + +type nsqLogger struct{} + +// Ovdje ulaze logovi iz go-nsqx liba +// Logger interface is Output(calldepth int, s string) error — identical in go-nsq and go-nsqx +func (n *nsqLogger) Output(calldepth int, s string) error { + a := log.NewAgregator(nil, calldepth) + a.S("lib", "svckit.nsqx") + if strings.HasPrefix(s, "INF") { + a.Info(s) + return nil + } + if strings.HasPrefix(s, "WRN") { + if !strings.Contains(s, "there are 0 connections left alive") { + a.Info(s) + } + return nil + } + if strings.HasPrefix(s, "ERR") { + if !strings.Contains(s, "TOPIC_NOT_FOUND") { + a.ErrorS(s) + } + return nil + } + a.Debug(s) + return nil +} + +type options struct { + // Existing options (backward compatible) + maxInFlight int + concurrency int + channel string + nsqdTCPAddr string + logger *nsqLogger + logLevel nsqx.LogLevel + lookupds dcy.Addresses + + // Timeouts + dialTimeout time.Duration + readTimeout time.Duration + writeTimeout time.Duration + + // Discovery + lookupdPollInterval time.Duration + lookupdPollJitter float64 + lookupdCacheTTL time.Duration + lookupdPollTimeout time.Duration + + // Adaptive RDY + rdyEvalInterval time.Duration + rdySlowStartInitial int64 + rdySlowStartInterval time.Duration + idleConnectionRDY int64 + emaAlpha float64 + idleRateThreshold float64 + idleTicksRequired int + + // Retry & backoff + maxAttempts uint16 + backoffAlgorithm nsqx.BackoffAlgo + backoffBaseDelay time.Duration + backoffMaxDelay time.Duration + backoffMaxExponent uint + backoffFixedInterval time.Duration + backoffFixedMaxAttempts int + + // Circuit breaker + circuitBreakerThreshold int32 + circuitBreakerResetAfter time.Duration + + // Producer + producerPipelines int + batchMaxSize int + batchMaxDelay time.Duration + + // Server side buffering + heartbeatInterval time.Duration + outputBufferSize int + outputBufferTimeout time.Duration + + // Compression + compression nsqx.CompressionCodec + deflateLevel int + + // Zero copy / ACK batching / adaptive buffer + zeroCopyThreshold int + ackBatchSize int + ackBatchDelay time.Duration + adaptiveBufferEnabled *bool // pointer so we can detect "not set" + adaptiveBufferInitial int + adaptiveBufferMax int + + // Identity + clientID string + hostname string + userAgent string + + // Metrics + metricsBackend nsqx.MetricsBackend + statsdAddr string + statsdPrefix string + prometheusNamespace string + metricsFlushInterval time.Duration + + // TLS + tlsConfig *tls.Config +} + +func (o *options) clone() *options { + o2 := &options{} + *o2 = *o + return o2 +} + +func (o *options) set(opts ...func(*options)) { + o.apply(opts...) +} + +func (o *options) apply(opts ...func(*options)) *options { + for _, fn := range opts { + fn(o) + } + return o +} + +func (o *options) Concurrency() int { + if o.concurrency != 0 { + return o.concurrency + } + return o.maxInFlight +} + +// toNsqxConfig builds a go-nsqx Config from the wrapper options, +// applying only non zero overrides on top of go-nsqx defaults +func (o *options) toNsqxConfig() *nsqx.Config { + cfg := nsqx.NewConfig() + cfg.MaxInFlight = o.maxInFlight + cfg.WorkerConcurrency = o.Concurrency() + cfg.LogLevel = o.logLevel + cfg.ZeroCopyThreshold = o.zeroCopyThreshold + cfg.BackoffAlgorithm = o.backoffAlgorithm + + if o.dialTimeout > 0 { + cfg.DialTimeout = o.dialTimeout + } + if o.readTimeout > 0 { + cfg.ReadTimeout = o.readTimeout + } + if o.writeTimeout > 0 { + cfg.WriteTimeout = o.writeTimeout + } + if o.lookupdPollInterval > 0 { + cfg.LookupdPollInterval = o.lookupdPollInterval + } + if o.lookupdPollJitter > 0 { + cfg.LookupdPollJitter = o.lookupdPollJitter + } + if o.lookupdCacheTTL > 0 { + cfg.LookupdCacheTTL = o.lookupdCacheTTL + } + if o.lookupdPollTimeout > 0 { + cfg.LookupdPollTimeout = o.lookupdPollTimeout + } + if o.rdyEvalInterval > 0 { + cfg.RDYEvalInterval = o.rdyEvalInterval + } + if o.rdySlowStartInitial > 0 { + cfg.RDYSlowStartInitial = o.rdySlowStartInitial + } + if o.rdySlowStartInterval > 0 { + cfg.RDYSlowStartInterval = o.rdySlowStartInterval + } + if o.idleConnectionRDY > 0 { + cfg.IdleConnectionRDY = o.idleConnectionRDY + } + if o.emaAlpha > 0 { + cfg.EMAAlpha = o.emaAlpha + } + if o.idleRateThreshold > 0 { + cfg.IdleRateThreshold = o.idleRateThreshold + } + if o.idleTicksRequired > 0 { + cfg.IdleTicksRequired = o.idleTicksRequired + } + if o.maxAttempts > 0 { + cfg.MaxAttempts = o.maxAttempts + } + if o.backoffBaseDelay > 0 { + cfg.BackoffBaseDelay = o.backoffBaseDelay + } + if o.backoffMaxDelay > 0 { + cfg.BackoffMaxDelay = o.backoffMaxDelay + } + if o.backoffMaxExponent > 0 { + cfg.BackoffMaxExponent = o.backoffMaxExponent + } + if o.backoffFixedInterval > 0 { + cfg.BackoffFixedInterval = o.backoffFixedInterval + } + if o.backoffFixedMaxAttempts > 0 { + cfg.BackoffFixedMaxAttempts = o.backoffFixedMaxAttempts + } + if o.circuitBreakerThreshold > 0 { + cfg.CircuitBreakerThreshold = o.circuitBreakerThreshold + } + if o.circuitBreakerResetAfter > 0 { + cfg.CircuitBreakerResetAfter = o.circuitBreakerResetAfter + } + if o.producerPipelines > 0 { + cfg.ProducerPipelines = o.producerPipelines + } + if o.batchMaxSize > 0 { + cfg.BatchMaxSize = o.batchMaxSize + } + if o.batchMaxDelay > 0 { + cfg.BatchMaxDelay = o.batchMaxDelay + } + if o.heartbeatInterval > 0 { + cfg.HeartbeatInterval = o.heartbeatInterval + } + if o.outputBufferSize != 0 { + cfg.OutputBufferSize = o.outputBufferSize + } + if o.outputBufferTimeout != 0 { + cfg.OutputBufferTimeout = o.outputBufferTimeout + } + if o.compression != 0 { + cfg.Compression = o.compression + } + if o.deflateLevel > 0 { + cfg.DeflateLevel = o.deflateLevel + } + if o.ackBatchSize > 0 { + cfg.AckBatchSize = o.ackBatchSize + } + if o.ackBatchDelay > 0 { + cfg.AckBatchDelay = o.ackBatchDelay + } + if o.adaptiveBufferEnabled != nil { + cfg.AdaptiveBufferEnabled = *o.adaptiveBufferEnabled + } + if o.adaptiveBufferInitial > 0 { + cfg.AdaptiveBufferInitial = o.adaptiveBufferInitial + } + if o.adaptiveBufferMax > 0 { + cfg.AdaptiveBufferMax = o.adaptiveBufferMax + } + if o.clientID != "" { + cfg.ClientID = o.clientID + } + if o.hostname != "" { + cfg.Hostname = o.hostname + } + if o.userAgent != "" { + cfg.UserAgent = o.userAgent + } + if o.metricsBackend != 0 { + cfg.MetricsBackend = o.metricsBackend + } + if o.statsdAddr != "" { + cfg.StatsdAddr = o.statsdAddr + } + if o.statsdPrefix != "" { + cfg.StatsdPrefix = o.statsdPrefix + } + if o.prometheusNamespace != "" { + cfg.PrometheusNamespace = o.prometheusNamespace + } + if o.metricsFlushInterval > 0 { + cfg.MetricsFlushInterval = o.metricsFlushInterval + } + if o.tlsConfig != nil { + cfg.TLSConfig = o.tlsConfig + } + + return cfg +} + +// Existing options (backward compatible go-nsq/go-nsqx) + +func MaxInFlight(m int) func(*options) { return func(o *options) { o.maxInFlight = m } } +func Channel(c string) func(*options) { return func(o *options) { o.channel = c } } +func Concurrency(c int) func(*options) { return func(o *options) { o.concurrency = c } } +func Ordered() func(*options) { return func(o *options) { o.concurrency = 1 } } +func LogLevelDebug() func(*options) { return func(o *options) { o.logLevel = nsqx.LogLevelDebug } } + +// New options + +func DialTimeout(d time.Duration) func(*options) { return func(o *options) { o.dialTimeout = d } } +func ReadTimeout(d time.Duration) func(*options) { return func(o *options) { o.readTimeout = d } } +func WriteTimeout(d time.Duration) func(*options) { return func(o *options) { o.writeTimeout = d } } +func LookupdPollInterval(d time.Duration) func(*options) { + return func(o *options) { o.lookupdPollInterval = d } +} +func LookupdPollJitter(f float64) func(*options) { return func(o *options) { o.lookupdPollJitter = f } } +func LookupdCacheTTL(d time.Duration) func(*options) { + return func(o *options) { o.lookupdCacheTTL = d } +} +func LookupdPollTimeout(d time.Duration) func(*options) { + return func(o *options) { o.lookupdPollTimeout = d } +} +func RDYEvalInterval(d time.Duration) func(*options) { + return func(o *options) { o.rdyEvalInterval = d } +} +func RDYSlowStartInitial(n int64) func(*options) { + return func(o *options) { o.rdySlowStartInitial = n } +} +func RDYSlowStartInterval(d time.Duration) func(*options) { + return func(o *options) { o.rdySlowStartInterval = d } +} +func IdleConnectionRDY(n int64) func(*options) { return func(o *options) { o.idleConnectionRDY = n } } +func EMAAlpha(f float64) func(*options) { return func(o *options) { o.emaAlpha = f } } +func IdleRateThreshold(f float64) func(*options) { return func(o *options) { o.idleRateThreshold = f } } +func IdleTicksRequired(n int) func(*options) { return func(o *options) { o.idleTicksRequired = n } } +func MaxAttempts(n uint16) func(*options) { return func(o *options) { o.maxAttempts = n } } +func BackoffAlgorithm(a nsqx.BackoffAlgo) func(*options) { + return func(o *options) { o.backoffAlgorithm = a } +} +func BackoffBaseDelay(d time.Duration) func(*options) { + return func(o *options) { o.backoffBaseDelay = d } +} +func BackoffMaxDelay(d time.Duration) func(*options) { + return func(o *options) { o.backoffMaxDelay = d } +} +func BackoffMaxExponent(n uint) func(*options) { return func(o *options) { o.backoffMaxExponent = n } } +func BackoffFixedInterval(d time.Duration) func(*options) { + return func(o *options) { o.backoffFixedInterval = d } +} +func BackoffFixedMaxAttempts(n int) func(*options) { + return func(o *options) { o.backoffFixedMaxAttempts = n } +} +func CircuitBreakerThreshold(n int32) func(*options) { + return func(o *options) { o.circuitBreakerThreshold = n } +} +func CircuitBreakerResetAfter(d time.Duration) func(*options) { + return func(o *options) { o.circuitBreakerResetAfter = d } +} +func ProducerPipelines(n int) func(*options) { return func(o *options) { o.producerPipelines = n } } +func BatchMaxSize(n int) func(*options) { return func(o *options) { o.batchMaxSize = n } } +func BatchMaxDelay(d time.Duration) func(*options) { return func(o *options) { o.batchMaxDelay = d } } +func HeartbeatInterval(d time.Duration) func(*options) { + return func(o *options) { o.heartbeatInterval = d } +} +func OutputBufferSize(n int) func(*options) { return func(o *options) { o.outputBufferSize = n } } +func OutputBufferTimeout(d time.Duration) func(*options) { + return func(o *options) { o.outputBufferTimeout = d } +} +func Compression(c nsqx.CompressionCodec) func(*options) { + return func(o *options) { o.compression = c } +} +func DeflateLevel(n int) func(*options) { return func(o *options) { o.deflateLevel = n } } +func ZeroCopyThreshold(n int) func(*options) { return func(o *options) { o.zeroCopyThreshold = n } } +func AckBatchSize(n int) func(*options) { return func(o *options) { o.ackBatchSize = n } } +func AckBatchDelay(d time.Duration) func(*options) { return func(o *options) { o.ackBatchDelay = d } } +func AdaptiveBufferEnabled(b bool) func(*options) { + return func(o *options) { o.adaptiveBufferEnabled = &b } +} +func AdaptiveBufferInitial(n int) func(*options) { + return func(o *options) { o.adaptiveBufferInitial = n } +} +func AdaptiveBufferMax(n int) func(*options) { return func(o *options) { o.adaptiveBufferMax = n } } +func ClientID(s string) func(*options) { return func(o *options) { o.clientID = s } } +func Hostname(s string) func(*options) { return func(o *options) { o.hostname = s } } +func UserAgent(s string) func(*options) { return func(o *options) { o.userAgent = s } } +func MetricsPrometheus(namespace string) func(*options) { + return func(o *options) { o.metricsBackend = nsqx.MetricsPrometheus; o.prometheusNamespace = namespace } +} +func MetricsStatsd(addr, prefix string) func(*options) { + return func(o *options) { o.metricsBackend = nsqx.MetricsStatsd; o.statsdAddr = addr; o.statsdPrefix = prefix } +} +func MetricsFlushInterval(d time.Duration) func(*options) { + return func(o *options) { o.metricsFlushInterval = d } +} +func TLSConfig(cfg *tls.Config) func(*options) { return func(o *options) { o.tlsConfig = cfg } } diff --git a/nsqx/producer.go b/nsqx/producer.go new file mode 100644 index 0000000..3c3b074 --- /dev/null +++ b/nsqx/producer.go @@ -0,0 +1,58 @@ +package nsqx + +import ( + "github.com/minus5/go-nsqx" + "github.com/minus5/svckit/log" +) + +type Producer struct { + topic string + nsqProducer *nsqx.Producer +} + +func MustNewProducer(topic string, opts ...func(*options)) *Producer { + p, err := NewProducer(topic, opts...) + if err != nil { + log.Fatal(err) + } + return p +} + +func NewProducer(topic string, opts ...func(*options)) (*Producer, error) { + o := getDefaults().clone() + o.apply(opts...) + + cfg := o.toNsqxConfig() + p, err := nsqx.NewProducer(o.nsqdTCPAddr, cfg) + if err != nil { + return nil, err + } + p.SetLogger(o.logger, o.logLevel) + // log on start + logger().S("topic", topic). + I("ProducerPipelines", o.producerPipelines). + I("ZeroCopyThrshld", o.zeroCopyThreshold). + I("BatchMaxSize", o.batchMaxSize). + I("OutputBuffSize", o.outputBufferSize). + S("Nsqd", o.nsqdTCPAddr). + Info("starting producer") + return &Producer{nsqProducer: p, topic: topic}, nil +} + +func (p *Producer) Close() { + p.nsqProducer.Stop() +} + +func (p *Producer) Publish(msg []byte) error { + return p.nsqProducer.Publish(p.topic, msg) +} + +func (p *Producer) PublishTo(topic string, msg []byte) error { + return p.nsqProducer.Publish(topic, msg) +} + +func (p *Producer) MustPublish(msg []byte) { + if err := p.Publish(msg); err != nil { + log.Fatal(err) + } +} diff --git a/nsqx/rpc.go b/nsqx/rpc.go new file mode 100644 index 0000000..2973d77 --- /dev/null +++ b/nsqx/rpc.go @@ -0,0 +1,47 @@ +package nsqx + +import ( + "errors" + "time" +) + +type RpcTransport struct { + pub *RrProducer + topic string + ttl time.Duration + em *ErrorsMapping +} + +func NewRpcTransport(topic string, ttl time.Duration, em *ErrorsMapping) *RpcTransport { + return &RpcTransport{ + pub: RrPub(""), + topic: topic, + ttl: ttl, + em: em, + } +} + +func (t *RpcTransport) Call(typ string, req []byte) ([]byte, error) { + return t.pub.ReqRspBase(ReqRspBaseParams{ + Topic: t.topic, + Ttl: t.ttl, + Typ: typ, + Req: req, + Em: t.em, + }) +} + +func (t *RpcTransport) Close() { + t.pub.Close() +} + +type rpcHandler func(typ string, body []byte) ([]byte, error) + +func RpcServe(topic string, h rpcHandler) *RrConsumer { + return RrSub(topic, + func(typ string, body []byte) (interface{}, error) { + return h(typ, body) + }, + RequeueError(errors.New("newer")), + ) +} diff --git a/nsqx/rr_consumer.go b/nsqx/rr_consumer.go new file mode 100644 index 0000000..0604cf0 --- /dev/null +++ b/nsqx/rr_consumer.go @@ -0,0 +1,194 @@ +package nsqx + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "github.com/minus5/svckit/env" + "github.com/minus5/svckit/log" +) + +var ( + RequeueDelay = 2 * time.Second +) + +// RrConsumer request reponse consumer, +// implements consumer side of the request response communication over nsq +type RrConsumer struct { + topic string + sub *Consumer + producers map[string]*Producer + consumerOptions []func(*options) + requeueError error // set this error to requeue only on this, if nil requeues on all errors + sync.Mutex +} + +// RrSub creates RrConsumer +// topic - nsq topic where reuqest arrive +// handler - gets message type and body and creates response (or error) +func RrSub(topic string, handler func(string, []byte) (interface{}, error), opts ...func(*RrConsumer)) *RrConsumer { + s := &RrConsumer{ + topic: topic, + producers: make(map[string]*Producer), + } + s.apply(opts...) + h := func(m *Message) error { + eReq, err := NewEnvelope(m.Body) + if err != nil { + return err + } + + if eReq.Expired() { + log.S("type", eReq.Type).S("correlationId", eReq.CorrelationId).I("now", int(time.Now().Unix())).I("expires_at", int(eReq.ExpiresAt)).Info("expired") + return nil + } + // do request + rsp, handlerErr := handler(eReq.Type, eReq.Body) + // ako je puklo vrati poruku u nsq + if handlerErr != nil && (s.requeueError == nil || handlerErr == s.requeueError) { + m.RequeueWithoutBackoff(RequeueDelay) + log.S("type", eReq.Type).S("correlationId", eReq.CorrelationId).Error(handlerErr) + return nil + } + // treba li odgovoriti + if eReq.ReplyTo == "" { + return nil + } + // odgovori + eRsp, err := eReq.Reply(rsp, handlerErr) + if err != nil { + log.Error(err) + return err + } + pub := s.pub(eReq.ReplyTo) + if err := pub.Publish(eRsp.Bytes()); err != nil { + log.Error(err) + return err + } + return nil + } + s.consumerOptions = append(s.consumerOptions, Channel(env.AppName())) + s.sub = Sub(topic, h, s.consumerOptions...) + return s +} + +// apply calls all functions to setup options +func (s *RrConsumer) apply(opts ...func(*RrConsumer)) { + for _, fn := range opts { + fn(s) + } +} + +func RequeueError(err error) func(*RrConsumer) { + return func(s *RrConsumer) { + s.requeueError = err + } +} + +// ConsumerOptions sets configuration options for the underlying Consumer. +func ConsumerOptions(opts ...func(*options)) func(*RrConsumer) { + return func(s *RrConsumer) { + s.consumerOptions = opts + } +} + +// RrAsyncSub creates RrConsumer in async mode +// Hendler gets type, correlationId, and body. +// It is users reposibility to call Pub with that correlationId and response. +func RrAsyncSub(topic string, handler func(string, string, []byte) error) *RrConsumer { + h := func(m *Message) error { + eReq, err := NewEnvelope(m.Body) + if err != nil { + return err + } + if eReq.Expired() { + log.S("type", eReq.Type).S("correlationId", eReq.CorrelationId).Info("expired") + return nil + } + correlationId := fmt.Sprintf("%s|%s|%s", eReq.Type, eReq.CorrelationId, eReq.ReplyTo) + if err := handler(eReq.Type, correlationId, eReq.Body); err != nil { + return err + } + return nil + } + return &RrConsumer{ + topic: topic, + sub: Sub(topic, h), + producers: make(map[string]*Producer), + } +} + +// Pub replay for asycn sub. +// correlationId is the one recived in handler send to RrAsyncSub. +func (s *RrConsumer) Pub(correlationId string, body []byte) error { + parts := strings.SplitN(correlationId, "|", 3) + if len(parts) != 3 { + return fmt.Errorf("wrong correlationId: %s", correlationId) + } + eRsp := &Envelope{ + Type: parts[0], + CorrelationId: parts[1], + ReplyTo: parts[2], + Body: body, + } + pub := s.pub(eRsp.ReplyTo) + if err := pub.Publish(eRsp.Bytes()); err != nil { + return err + } + return nil +} + +func (s *RrConsumer) pub(topic string) *Producer { + s.Lock() + defer s.Unlock() + if p, ok := s.producers[topic]; ok { + return p + } + p := Pub(topic) + s.producers[topic] = p + return p +} + +// Close implements gracefully stop. +func (s *RrConsumer) Close() { + if s.sub == nil { + return + } + s.sub.Close() +} + +// StartClosing will initiate a graceful stop of the Consumer (permanent) +// Receive on returned chan to block until this process completes +func (s *RrConsumer) StartClosing() chan int { + if s.sub == nil { + return nil + } + return s.sub.StartClosing() +} + +type Server interface { + Serve(req interface{}) (interface{}, error) +} + +func NewRrServer(topic string, + srv Server, + typeFor func(string) reflect.Type, + requeueError error) *RrConsumer { + handler := func(typ string, body []byte) (interface{}, error) { + t := typeFor(typ) + req := reflect.New(t).Interface() + if err := json.Unmarshal(body, req); err != nil { + return nil, err + } + return srv.Serve(req) + } + if requeueError == nil { + requeueError = errors.New("newer") + } + return RrSub(topic, handler, RequeueError(requeueError)) +} diff --git a/nsqx/rr_producer.go b/nsqx/rr_producer.go new file mode 100644 index 0000000..9b7a2ab --- /dev/null +++ b/nsqx/rr_producer.go @@ -0,0 +1,387 @@ +package nsqx + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "math/rand" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/minus5/svckit/env" + "github.com/minus5/svckit/log" +) + +var ( + DefaultTimeout = time.Hour + ErrTimeout = errors.New("timeout") + ErrStopped = errors.New("stopped") + rrProducers = make(map[string]*RrProducer) +) + +// RrProducer request response producer. +// Implements request response communication over nsq. +type RrProducer struct { + s map[string]chan *Envelope + producers map[string]*Producer + topic string + sub *Consumer + consumerOptions []func(*options) + msgNo int + corr RrProducerCorrelation + sync.Mutex +} + +// RrProducerCorrelation interface for custom generation of correlation IDs +type RrProducerCorrelation interface { + // NewCorrelationID creates new correlation ID for message + NewCorrelationID(topic, typ string, req interface{}) string +} + +// RrPub creates new RrProducer. +// - topic is nsq topic where remote services send responses. +// - opts are functions to set additional options +func RrPub(topic string, opts ...func(*RrProducer)) *RrProducer { + if topic == "" { + topic = fmt.Sprintf("z...rsp.%s.%s", env.AppName(), env.InstanceId()) + } + if s, ok := rrProducers[topic]; ok { + return s + } + s := &RrProducer{ + msgNo: rand.Intn(math.MaxInt32), + s: make(map[string]chan *Envelope), + producers: make(map[string]*Producer), + topic: topic, + } + rrProducers[topic] = s + // Set default calc of correlation id-a + s.apply(SetRrProducerCorrelation(s)) + // Set all options + s.apply(opts...) + go s.listen() + return s +} + +func defaultErrorParser(s string) error { + if s == "" { + return nil + } + return fmt.Errorf(s) +} + +// RrProducerConsumerOptions sets configuration options for the underlying Consumer of RrProducer. +func RrProducerConsumerOptions(opts ...func(*options)) func(*RrProducer) { + return func(s *RrProducer) { + s.consumerOptions = opts + } +} + +// SetRrProducerCorrelation sets new correleationID creation function +func SetRrProducerCorrelation(corr RrProducerCorrelation) func(*RrProducer) { + return func(s *RrProducer) { + s.corr = corr + } +} + +// apply calls all functions to setup options +func (s *RrProducer) apply(opts ...func(*RrProducer)) { + for _, fn := range opts { + fn(s) + } +} + +func (s *RrProducer) add(id string, c chan *Envelope) { + s.Lock() + defer s.Unlock() + s.s[id] = c +} + +func (s *RrProducer) get(id string) (chan *Envelope, bool) { + s.Lock() + defer s.Unlock() + c, ok := s.s[id] + if ok { + delete(s.s, id) + } + return c, ok +} + +func (s *RrProducer) timeout(id string) { + s.Lock() + defer s.Unlock() + if _, found := s.s[id]; found { + s.s[id] = nil + } +} + +func (s *RrProducer) pub(topic string) *Producer { + s.Lock() + defer s.Unlock() + if p, ok := s.producers[topic]; ok { + return p + } + p := Pub(topic) + s.producers[topic] = p + return p +} + +func typeToString(i interface{}) string { + typ := reflect.TypeOf(i).String() + if strings.HasPrefix(typ, "*") { + typ = typ[1:] + } + return typ +} + +// omogucuje mapiranje errora u aplikacijski specificne +// da ne moram imati referencu na ovaj paket +type ErrorsMapping struct { + Parser func(string) error + ErrStopped error + ErrTimeout error + ErrFatal error +} + +// ReqRsp send request and wait for response +// topic - nsq topic on wich to send request +// typ - message type for envelope +// req - body of the request +// rsp - stucture to unpuck response into +// sig - timout singal to signal stop waiting for response +// ttl - time to live of message for envelope +// em - error mapping, mapping to application specific messages +func (s *RrProducer) ReqRsp(topic, typ string, req interface{}, rsp interface{}, sig chan struct{}, ttl time.Duration, em *ErrorsMapping) error { + if typ == "" { + typ = typeToString(req) + } + p := ReqRspBaseParams{ + Topic: topic, + Typ: typ, + Ttl: ttl, + Sig: sig, + Em: em, + } + p.defaults() + reqBuf, err := json.Marshal(req) + if err != nil { + return p.Fatal(err) + } + p.Req = reqBuf + p.correlationId = s.corr.NewCorrelationID(topic, typ, req) + rspBuf, err := s.ReqRspBase(p) + if err != nil { + return err + } + if rsp != nil && len(rspBuf) > 0 { + if err := json.Unmarshal(rspBuf, rsp); err != nil { + return p.Fatal(err) + } + } + return nil +} + +// ReqRspBuf sends request and waits for response, passes rsp as []byte +func (s *RrProducer) ReqRspBuf(topic, typ string, req interface{}, sig chan struct{}, ttl time.Duration, em *ErrorsMapping) ([]byte, error) { + if typ == "" { + typ = typeToString(req) + } + p := ReqRspBaseParams{ + Topic: topic, + Typ: typ, + Ttl: ttl, + Sig: sig, + Em: em, + } + p.defaults() + reqBuf, err := json.Marshal(req) + if err != nil { + return nil, p.Fatal(err) + } + p.Req = reqBuf + p.correlationId = s.corr.NewCorrelationID(topic, typ, req) + return s.ReqRspBase(p) +} + +type ReqRspBaseParams struct { + Topic string + Typ string + Req []byte + Ttl time.Duration + Sig chan struct{} + Em *ErrorsMapping + correlationId string +} + +func (p *ReqRspBaseParams) defaults() { + if p.Ttl <= 0 { + p.Ttl = DefaultTimeout + } + if p.Em == nil { + p.Em = &ErrorsMapping{ + Parser: defaultErrorParser, + ErrStopped: ErrStopped, + ErrTimeout: ErrTimeout, + } + } +} + +func (p *ReqRspBaseParams) Fatal(err error) error { + if p.Em.ErrFatal != nil { + return p.Em.ErrFatal + } + return err +} + +func (p *ReqRspBaseParams) Timeout() error { + if p.Em.ErrTimeout != nil { + return p.Em.ErrTimeout + } + return ErrTimeout +} + +func (p *ReqRspBaseParams) Stopped() error { + if p.Em.ErrStopped != nil { + return p.Em.ErrStopped + } + return ErrStopped +} + +func (p *ReqRspBaseParams) Error(text string) error { + if p.Em.Parser != nil { + return p.Em.Parser(text) + } + return defaultErrorParser(text) +} + +func (s *RrProducer) ReqRspBase(p ReqRspBaseParams) ([]byte, error) { + p.defaults() + if p.correlationId == "" { + p.correlationId = s.NewCorrelationID("", "", nil) + } + + eReq := &Envelope{ + Type: p.Typ, + ReplyTo: s.topic, + CorrelationId: p.correlationId, + Body: p.Req, + ExpiresAt: time.Now().Add(p.Ttl).Unix(), + } + c := make(chan *Envelope) + s.add(p.correlationId, c) + + if err := s.pub(p.Topic).Publish(eReq.Bytes()); err != nil { + return nil, p.Fatal(err) + } + + timer := time.NewTimer(p.Ttl) + defer timer.Stop() + select { + case re := <-c: + return re.Body, p.Error(re.Error) + case <-timer.C: + s.timeout(p.correlationId) + return nil, p.Timeout() + case <-p.Sig: + s.timeout(p.correlationId) + return nil, p.Stopped() + } + return nil, nil +} + +// NewCorrelationID creates unique correlationID as request identifier +func (s *RrProducer) NewCorrelationID(topic, typ string, req interface{}) string { + s.Lock() + defer s.Unlock() + s.msgNo++ + return strconv.Itoa(s.msgNo) +} + +// Pub send request without waiting for response. +// topic - nsq topic on wich to send request +// typ - message type for envelope +// req - body of the request +func (s *RrProducer) Pub(topic, typ string, req interface{}) error { + buf, err := json.Marshal(req) + if err != nil { + return err + } + eReq := &Envelope{ + Type: typ, + Body: buf, + } + p := s.pub(topic) + if err := p.Publish(eReq.Bytes()); err != nil { + return err + } + return nil +} + +func (s *RrProducer) listen() { + handler := func(m *Message) error { + e, err := NewEnvelope(m.Body) + if err != nil { + log.Error(err) + return err + } + if s, found := s.get(e.CorrelationId); found { + // when s == nil, means that request timed out, nobody is waiting for response + // nothing to do in that case + if s != nil { + s <- e + } + return nil + } + log.S("id", e.CorrelationId).Info("subscriber not found") + return nil + } + s.sub = Sub(s.topic, handler, s.consumerOptions...) +} + +// Close implements gracefully stop. +func (s *RrProducer) Close() { + if s.sub != nil { + s.sub.Close() + } + for _, p := range s.producers { + p.Close() + } +} + +type RrClient struct { + pub *RrProducer + topic string + nameFor func(interface{}) string + sig chan struct{} + ttl time.Duration + em *ErrorsMapping +} + +func NewRrClient(topic string, + nameFor func(interface{}) string, + sig chan struct{}, + ttl time.Duration, + em *ErrorsMapping) *RrClient { + return &RrClient{ + pub: RrPub(""), + topic: topic, + nameFor: nameFor, + sig: sig, + ttl: ttl, + em: em, + } +} + +func (c *RrClient) Call(req, rsp interface{}) error { + return c.pub.ReqRsp(c.topic, + c.nameFor(req), req, rsp, c.sig, c.ttl, c.em, + ) +} + +func (c *RrClient) Close() { + c.pub.Close() +}