Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/minus5/svckit

go 1.25.0
go 1.26

require (
github.com/aws/aws-sdk-go v1.55.7
Expand All @@ -16,6 +16,7 @@ require (
github.com/hashicorp/consul/api v1.18.0
github.com/json-iterator/go v1.1.12
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16
github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86
github.com/nranchev/go-libGeoIP v0.0.0-20170629073846-d6d4a9a4c7e8
github.com/nsqio/go-nsq v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16 h1:I1gKMN7Av8zos39zW3/H8HI+SWwKDoKPxWz2GVtOnFY=
github.com/minus5/go-nsqx v1.0.1-0.20260418103232-cae33d0b0a16/go.mod h1:U0qLktCviUuRBuVAbKq/Oq7EQmObdp4g2CWrizN5fMo=
github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86 h1:bsm6+lQ2ea4dD6ZzGCz6GHXxmfU0ludQ4Cjxm7h/D9o=
github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86/go.mod h1:eBPLk8FJ9uc035DnbHAWvINeYXIIVduZe5EILao6s7o=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down
127 changes: 127 additions & 0 deletions nsqx/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package nsqx

import (
"fmt"

"github.com/minus5/go-nsqx"
"github.com/minus5/svckit/dcy"
"github.com/minus5/svckit/log"
)

type Consumer struct {
nsqConsumer *nsqx.Consumer
logger func() *log.Agregator
lookups dcy.Addresses
}

type nsqHandler struct {
fn func(*Message) error
}

func (h *nsqHandler) HandleMessage(m *nsqx.Message) error {
// javi periodicki nsqdu da je procesiranje jos u tijeku
stop := every(DefaultMsgTouchInterval, m.Touch)
defer close(stop)
return h.fn(newMessage(m))
}

func MustNewConsumer(topic string, handler func(*Message) error,
opts ...func(*options)) *Consumer {
c, err := NewConsumer(topic, handler, opts...)
if err != nil {
log.S("topic", topic).Fatal(err)
}
return c
}

func NewConsumer(topic string, handler func(*Message) error,
opts ...func(*options)) (*Consumer, error) {

o := getDefaults().clone()
o.apply(opts...)

cfg := o.toNsqxConfig()

c, err := nsqx.NewConsumer(topic, o.channel, cfg)
if err != nil {
return nil, err
}

c.SetLogger(o.logger, o.logLevel)
c.AddConcurrentHandlers(&nsqHandler{fn: handler}, o.Concurrency())

// Use shared Discovery so all consumers in the process share one lookupd poller
disc := getDiscovery(o)
c.SetDiscovery(disc)

err = c.ConnectToNSQLookupds(o.lookupds.String())
if err != nil {
return nil, err
}

co := &Consumer{
lookups: o.lookupds,
nsqConsumer: c,
logger: func() *log.Agregator {
return logger().S("topic", topic).S("channel", o.channel)
},
}

// log options on start
co.logger().
I("MaxInFlight", o.maxInFlight).
I("Concurrency", o.Concurrency()).
I("ZeroCopyThrshld", o.zeroCopyThreshold).
I("AckBatchSize", o.ackBatchSize).
I("ProducerPipelines", o.producerPipelines).
I("MaxAttempts", int(o.maxAttempts)).
I("CircuitBreakerThrshld", int(o.circuitBreakerThreshold)).
I("OutputBuffSize", o.outputBufferSize).
S("BackoffAlg", o.backoffAlgorithm.String()).
S("Compression", o.compression.String()).
Info("starting consumer")

dcy.Subscribe(LookupdHTTPServiceName, co.onLookupChanges)
dcy.SubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, co.onLookupChanges)
return co, nil
}

func (c *Consumer) onLookupChanges(as dcy.Addresses) {
// deduplicate addresses, dcy.Services and dcy.ServicesByTag can return same IP addr,
// tcp-nsqlookupd and tcp.nsqlookupd
seen := make(map[string]struct{})
var unique []string
for _, a := range as {
s := a.String()
if _, dup := seen[s]; !dup {
seen[s] = struct{}{}
unique = append(unique, s)
}
}

// Update shared Discovery address list, it will use
// these new addresses on the next cache miss/poll cycle
disc := getDiscovery(getDefaults())
disc.SetAddrs(unique)
c.lookups = as
c.logger().S("lookupds", fmt.Sprintf("%v", unique)).Info("lookupds update")
}

func (c *Consumer) Close() {
dcy.Unsubscribe(LookupdHTTPServiceName, c.onLookupChanges)
dcy.UnsubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, c.onLookupChanges)
c.nsqConsumer.Stop()
}

// StartClosing will initiate a graceful stop of the Consumer (permanent),
// receive on returned chan to block until this process completes
func (c *Consumer) StartClosing() chan int {
dcy.Unsubscribe(LookupdHTTPServiceName, c.onLookupChanges)
dcy.UnsubscribeByTag(LookupdHTTPServiceNameByTag, LookupdHTTPServiceTag, c.onLookupChanges)
c.nsqConsumer.Stop()
// go-nsqx Stop() is synchronous — blocks until fully stopped.
// Return a closed channel to maintain the same API shape.
ch := make(chan int)
close(ch)
return ch
}
82 changes: 82 additions & 0 deletions nsqx/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package nsqx

import (
"bytes"
"encoding/json"
"strings"
"time"
)

var (
headerSeparator = []byte{10} //new line
)

// Envelope arround message for request response communication over nsq
type Envelope struct {
Type string `json:"t,omitempty"` // type of the message
ReplyTo string `json:"r,omitempty"` // nsq topic to send reply to
CorrelationId string `json:"c,omitempty"` // connection between request and response
ExpiresAt int64 `json:"e,omitempty"` // unix timestamp when message expires, after that should be dropped
Error string `json:"error,omitempty"`
Body []byte `json:"-"` // message body
}

// NewEnvelope decodes envelope from bytes
func NewEnvelope(buf []byte) (*Envelope, error) {
parts := bytes.SplitN(buf, headerSeparator, 2)
e := &Envelope{}
if err := json.Unmarshal(parts[0], e); err != nil {
return nil, err
}
if len(parts) > 1 {
e.Body = parts[1]
}
return e, nil
}

// Bytes encodes envelope into bytes for putting on wire
func (m *Envelope) Bytes() []byte {
buf, _ := json.Marshal(m)
buf = append(buf, headerSeparator...)
buf = append(buf, m.Body...)
return buf
}

// ParseBody deocdes Evelope body into o
func (m *Envelope) ParseBody(o interface{}) error {
if err := json.Unmarshal(m.Body, o); err != nil {
return err
}
return nil
}

// Reply creates reply Envelope from request Envelope
func (m *Envelope) Reply(o interface{}, err error) (*Envelope, error) {
e := &Envelope{
Type: strings.Replace(m.Type, ".req", ".rsp", 1),
CorrelationId: m.CorrelationId,
}
if err != nil {
e.Error = err.Error()
}
if o != nil {
if buf, ok := o.([]byte); ok {
e.Body = buf
} else {
buf, err := json.Marshal(o)
if err != nil {
return nil, err
}
e.Body = buf
}
}
return e, nil
}

// Expired returns true if message expired
func (m *Envelope) Expired() bool {
if m.ExpiresAt <= 0 {
return false
}
return time.Now().Unix() > m.ExpiresAt
}
38 changes: 38 additions & 0 deletions nsqx/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nsqx

import (
"time"

"github.com/minus5/go-nsqx"
)

// Message wraps nsqx.Message so callers dont need to import go-nsqx directly
type Message struct {
nsqm *nsqx.Message
ID nsqx.MessageID
Body []byte
Timestamp int64 // unix nanoseconds
Attempts uint16
NSQDAddress string
}

func newMessage(m *nsqx.Message) *Message {
body := make([]byte, len(m.Body))
copy(body, m.Body)
return &Message{
nsqm: m,
ID: m.ID,
Body: body,
Timestamp: m.Timestamp.UnixNano(),
Attempts: m.Attempts,
NSQDAddress: m.NSQDAddress,
}
}

func (m *Message) RequeueWithoutBackoff(delay time.Duration) {
m.nsqm.RequeueWithoutBackoff(delay)
}

func (m *Message) Touch() {
m.nsqm.Touch()
}
Loading