diff --git a/go.mod b/go.mod index 3332446..08c9b95 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,14 @@ require ( github.com/gorilla/mux v1.8.0 github.com/hashicorp/consul/api v1.18.0 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.13.6 github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86 github.com/nranchev/go-libGeoIP v0.0.0-20170629073846-d6d4a9a4c7e8 github.com/nsqio/go-nsq v1.1.0 github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/oschwald/geoip2-golang v1.7.0 + github.com/pierrec/lz4/v4 v4.1.26 github.com/pkg/errors v0.9.1 github.com/satori/go.uuid v1.2.0 github.com/smira/go-statsd v1.3.2 @@ -36,7 +38,7 @@ require ( require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/gobwas/pool v0.2.1 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.4.0 // indirect @@ -45,7 +47,6 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.13.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect diff --git a/go.sum b/go.sum index 90a35b2..b88842d 100644 --- a/go.sum +++ b/go.sum @@ -52,8 +52,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -192,6 +192,8 @@ github.com/oschwald/maxminddb-golang v1.9.0/go.mod h1:TK+s/Z2oZq0rSl4PSeAEoP0bgm github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/pkg/compress/compress.go b/pkg/compress/compress.go index cf3e8cd..0ee00b7 100644 --- a/pkg/compress/compress.go +++ b/pkg/compress/compress.go @@ -3,10 +3,42 @@ package compress import ( "bytes" "compress/gzip" - "io/ioutil" + "io" "sync" + + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" +) + +// Encoding is the name of a compression algorithm as used in message headers. +type Encoding string + +const ( + EncodingNone Encoding = "" + EncodingGzip Encoding = "gzip" + EncodingZstd Encoding = "zstd" + EncodingSnappy Encoding = "snappy" + EncodingLz4 Encoding = "lz4" +) + +var ( + zstdEncoder *zstd.Encoder + zstdDecoder *zstd.Decoder ) +func init() { + var err error + zstdEncoder, err = zstd.NewWriter(nil) + if err != nil { + panic(err) + } + zstdDecoder, err = zstd.NewReader(nil) + if err != nil { + panic(err) + } +} + //Gzip - compess input func Gzip(data []byte) []byte { var b bytes.Buffer @@ -30,7 +62,7 @@ func Gunzip(data []byte) ([]byte, error) { return nil, err } defer r.Close() - out, err := ioutil.ReadAll(r) + out, err := io.ReadAll(r) if err != nil { return nil, err } @@ -62,6 +94,103 @@ func IsGziped(buf []byte) bool { return false } +// Zstd compresses data with zstandard. +func Zstd(data []byte) []byte { + return zstdEncoder.EncodeAll(data, nil) +} + +// Unzstd decompresses zstandard data. +func Unzstd(data []byte) ([]byte, error) { + return zstdDecoder.DecodeAll(data, nil) +} + +// IsZstd reports whether buf starts with the zstandard frame magic. +func IsZstd(buf []byte) bool { + return len(buf) >= 4 && + buf[0] == 0x28 && buf[1] == 0xB5 && buf[2] == 0x2F && buf[3] == 0xFD +} + +// Snappy compresses data using the snappy streaming (framing) format. +func Snappy(data []byte) []byte { + var b bytes.Buffer + w := snappy.NewBufferedWriter(&b) + w.Write(data) + w.Close() + return b.Bytes() +} + +// Unsnappy decompresses snappy streaming-format data. +func Unsnappy(data []byte) ([]byte, error) { + r := snappy.NewReader(bytes.NewReader(data)) + return io.ReadAll(r) +} + +// IsSnappy reports whether buf starts with the snappy framing-format stream identifier. +func IsSnappy(buf []byte) bool { + // Stream identifier chunk: 0xff followed by chunk length 0x060000 and "sNaPpY" + return len(buf) >= 10 && + buf[0] == 0xff && + buf[1] == 0x06 && buf[2] == 0x00 && buf[3] == 0x00 && + buf[4] == 0x73 && buf[5] == 0x4e && buf[6] == 0x61 && + buf[7] == 0x50 && buf[8] == 0x70 && buf[9] == 0x59 +} + +// Lz4 compresses data using the lz4 frame format. +func Lz4(data []byte) []byte { + var b bytes.Buffer + w := lz4.NewWriter(&b) + w.Write(data) + w.Close() + return b.Bytes() +} + +// Unlz4 decompresses lz4 frame-format data. +func Unlz4(data []byte) ([]byte, error) { + r := lz4.NewReader(bytes.NewReader(data)) + return io.ReadAll(r) +} + +// IsLz4 reports whether buf starts with the lz4 frame magic number. +func IsLz4(buf []byte) bool { + return len(buf) >= 4 && + buf[0] == 0x04 && buf[1] == 0x22 && buf[2] == 0x4d && buf[3] == 0x18 +} + +// Detect returns the encoding of buf or EncodingNone if unrecognized. +func Detect(buf []byte) Encoding { + switch { + case IsGziped(buf): + return EncodingGzip + case IsZstd(buf): + return EncodingZstd + case IsSnappy(buf): + return EncodingSnappy + case IsLz4(buf): + return EncodingLz4 + } + return EncodingNone +} + +// DecompressIf decompresses buf if a known compression format is detected. +// Returns the (possibly decompressed) data, the detected encoding, and any error. +func DecompressIf(data []byte) ([]byte, Encoding, error) { + switch { + case IsGziped(data): + out, err := Gunzip(data) + return out, EncodingGzip, err + case IsZstd(data): + out, err := Unzstd(data) + return out, EncodingZstd, err + case IsSnappy(data): + out, err := Unsnappy(data) + return out, EncodingSnappy, err + case IsLz4(data): + out, err := Unlz4(data) + return out, EncodingLz4, err + } + return data, EncodingNone, nil +} + // Gzipper koristi jedan gzip writer, namijenjen je da se koristi single threaded // jer uvijek koristi isti buffer za pisanje kompresiranih podataka type Gzipper struct { diff --git a/pkg/compress/compress_test.go b/pkg/compress/compress_test.go index c3e950e..9384366 100644 --- a/pkg/compress/compress_test.go +++ b/pkg/compress/compress_test.go @@ -145,6 +145,81 @@ func cacheAdd(b []byte) { } } +func TestZstd(t *testing.T) { + input := []byte("iso medo u ducan nije rekao dobar dan") + + compressed := Zstd(input) + assert.True(t, IsZstd(compressed)) + assert.False(t, IsZstd(input)) + + out, err := Unzstd(compressed) + assert.Nil(t, err) + assert.Equal(t, input, out) +} + +func TestSnappy(t *testing.T) { + input := []byte("iso medo u ducan nije rekao dobar dan") + + compressed := Snappy(input) + assert.True(t, IsSnappy(compressed)) + assert.False(t, IsSnappy(input)) + + out, err := Unsnappy(compressed) + assert.Nil(t, err) + assert.Equal(t, input, out) +} + +func TestLz4(t *testing.T) { + input := []byte("iso medo u ducan nije rekao dobar dan") + + compressed := Lz4(input) + assert.True(t, IsLz4(compressed)) + assert.False(t, IsLz4(input)) + + out, err := Unlz4(compressed) + assert.Nil(t, err) + assert.Equal(t, input, out) +} + +func TestDetect(t *testing.T) { + input := []byte("iso medo u ducan nije rekao dobar dan") + + assert.Equal(t, EncodingGzip, Detect(Gzip(input))) + assert.Equal(t, EncodingZstd, Detect(Zstd(input))) + assert.Equal(t, EncodingSnappy, Detect(Snappy(input))) + assert.Equal(t, EncodingLz4, Detect(Lz4(input))) + assert.Equal(t, EncodingNone, Detect(input)) +} + +func TestDecompressIf(t *testing.T) { + input := []byte("iso medo u ducan nije rekao dobar dan") + + cases := []struct { + name string + compress func([]byte) []byte + encoding Encoding + }{ + {"gzip", Gzip, EncodingGzip}, + {"zstd", Zstd, EncodingZstd}, + {"snappy", Snappy, EncodingSnappy}, + {"lz4", Lz4, EncodingLz4}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + out, enc, err := DecompressIf(tc.compress(input)) + assert.Nil(t, err) + assert.Equal(t, input, out) + assert.Equal(t, tc.encoding, enc) + }) + } + + // uncompressed passthrough + out, enc, err := DecompressIf(input) + assert.Nil(t, err) + assert.Equal(t, input, out) + assert.Equal(t, EncodingNone, enc) +} + // BenchmarkGzip benchmark za gzip da vidim potrosnju memorije // Pokrecem da radi samo BenchmarkGzip // go test -v -run NOTest -benchmem -benchtime 10s -bench BenchmarkGzip -memprofile=mem0.out diff --git a/pkg/msgs/backend.go b/pkg/msgs/backend.go index c3f4610..3900efe 100644 --- a/pkg/msgs/backend.go +++ b/pkg/msgs/backend.go @@ -37,25 +37,25 @@ const ( // Backend - poruka koja dolazi iz backend servisa type Backend struct { - Type string `json:"type,omitempty"` - Id string `json:"id,omitempty"` - IgracId string `json:"igrac_id,omitempty"` - ClientId int `json:"client_id,omitempty"` - AccountType int `json:"account_type,omitempty"` - No int `json:"no,omitempty"` - From string `json:"from,omitempty"` - To string `json:"to,omitempty"` - IsDel bool `json:"is_del,omitempty"` - Ts int `json:"ts,omitempty"` - Dc string `json:"dc,omitempty"` - Version string `json:"version,omitempty"` - Encoding string `json:"encoding,omitempty"` - MessageType string `json:"message_type,omitempty"` - SrcMsgID string `json:"src_msg_id,omitempty"` - NSQReqRspEnvelope // NSQ Req/Rsp message envelope, za potrebe prebacivanja NSQ Req/Rsp poruka sa nsq_to_ws - Body []byte `json:"-"` //raspakovan body - RawBody []byte `json:"-"` - RawHeader []byte `json:"-"` + Type string `json:"type,omitempty"` + Id string `json:"id,omitempty"` + IgracId string `json:"igrac_id,omitempty"` + ClientId int `json:"client_id,omitempty"` + AccountType int `json:"account_type,omitempty"` + No int `json:"no,omitempty"` + From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + IsDel bool `json:"is_del,omitempty"` + Ts int `json:"ts,omitempty"` + Dc string `json:"dc,omitempty"` + Version string `json:"version,omitempty"` + Encoding compress.Encoding `json:"encoding,omitempty"` + MessageType string `json:"message_type,omitempty"` + SrcMsgID string `json:"src_msg_id,omitempty"` + NSQReqRspEnvelope // NSQ Req/Rsp message envelope, za potrebe prebacivanja NSQ Req/Rsp poruka sa nsq_to_ws + Body []byte `json:"-"` //raspakovan body + RawBody []byte `json:"-"` + RawHeader []byte `json:"-"` rawMsg []byte jsonBody *simplejson.Json } @@ -139,14 +139,18 @@ func CreateBackendDel(typ string) []byte { } func CreateBackend(typ string, no int, body []byte) []byte { - return createBackend(typ, no, 0, body, true) + return createBackend(typ, no, 0, body, compress.EncodingGzip) } func CreateBackendNoGzip(typ string, no int, body []byte) []byte { - return createBackend(typ, no, 0, body, false) + return createBackend(typ, no, 0, body, compress.EncodingNone) } -func createBackend(typ string, no int, ts int, body []byte, shouldCompress bool) []byte { +func CreateBackendEncoded(typ string, no int, body []byte, encoding compress.Encoding) []byte { + return createBackend(typ, no, 0, body, encoding) +} + +func createBackend(typ string, no int, ts int, body []byte, encoding compress.Encoding) []byte { header := map[string]interface{}{ "type": typ, } @@ -158,9 +162,9 @@ func createBackend(typ string, no int, ts int, body []byte, shouldCompress bool) } else { header["ts"] = time.Now().UnixNano() } - if shouldCompress && len(body) > GzipMsgSizeLimit { - body = compress.Gzip(body) - header["encoding"] = "gzip" + if encoding != "" && len(body) > GzipMsgSizeLimit { + body, encoding = compressBody(body, encoding) + header["encoding"] = encoding } buf, _ := json.Marshal(header) buf = append(buf, HeaderSeparator...) @@ -168,17 +172,38 @@ func createBackend(typ string, no int, ts int, body []byte, shouldCompress bool) return buf } +func compressBody(body []byte, encoding compress.Encoding) ([]byte, compress.Encoding) { + switch encoding { + case compress.EncodingZstd: + return compress.Zstd(body), compress.EncodingZstd + case compress.EncodingSnappy: + return compress.Snappy(body), compress.EncodingSnappy + case compress.EncodingLz4: + return compress.Lz4(body), compress.EncodingLz4 + default: + return compress.Gzip(body), compress.EncodingGzip + } +} + func Header(key string, value interface{}) func(map[string]interface{}) { return func(h map[string]interface{}) { h[key] = value } } -var gzipKey = "__gzipKey__" +var encodingKey = "__encodingKey__" func NoGzip() func(map[string]interface{}) { return func(h map[string]interface{}) { - h[gzipKey] = false + h[encodingKey] = compress.EncodingNone + } +} + +// WithEncoding sets the compression algorithm for BackendFactory. +// See compress.Encoding for possible values. +func WithEncoding(encoding compress.Encoding) func(map[string]any) { + return func(h map[string]any) { + h[encodingKey] = encoding } } @@ -189,14 +214,14 @@ func BackendFactory(typ string, body []byte, opts ...func(map[string]interface{} for _, o := range opts { o(header) } - shouldCompress := true - if v, ok := header[gzipKey]; ok { - shouldCompress = v.(bool) - delete(header, gzipKey) + encoding := compress.EncodingGzip + if v, ok := header[encodingKey]; ok { + encoding = v.(compress.Encoding) + delete(header, encodingKey) } - if shouldCompress && len(body) > GzipMsgSizeLimit { - body = compress.Gzip(body) - header["encoding"] = "gzip" + if encoding != "" && len(body) > GzipMsgSizeLimit { + body, encoding = compressBody(body, encoding) + header["encoding"] = encoding } buf, _ := json.Marshal(header) buf = append(buf, HeaderSeparator...) @@ -205,7 +230,7 @@ func BackendFactory(typ string, body []byte, opts ...func(map[string]interface{} } func CreateBackendTs(typ string, no int, ts int, body []byte) []byte { - return createBackend(typ, no, ts, body, true) + return createBackend(typ, no, ts, body, compress.EncodingGzip) } func parseAsBackend(buf []byte) *Backend { @@ -213,42 +238,46 @@ func parseAsBackend(buf []byte) *Backend { rawHeader := parts[0] msg, err := parseHeader(rawHeader) if len(parts) == 1 || err != nil { - msg.Body, _ = compress.GunzipIf(buf) + msg.Body, _, _ = compress.DecompressIf(buf) msg.RawBody = buf msg.RawHeader = nil return msg } body := parts[1] msg.RawBody = body - msg.Body, _ = compress.GunzipIf(body) + var detected compress.Encoding + msg.Body, detected, _ = compress.DecompressIf(body) + if msg.Encoding != compress.EncodingNone && detected != compress.EncodingNone && detected != msg.Encoding { + log.Printf("[WARN] encoding mismatch: header says %q but detected %q", msg.Encoding, detected) + } msg.rawMsg = buf return msg } func parseHeader(rawHeader []byte) (*Backend, error) { header := struct { - DocType string `json:"doc_type"` - Type string `json:"type"` - DocId string `json:"doc_id"` - Id string `json:"id"` - DocAction string `json:"doc_action"` - Action string `json:"action"` - IgracId string `json:"igrac_id"` - AccountType int `json:"account_type"` - From string `json:"from"` - To string `json:"to"` - Ts int `json:"ts"` - No int `json:"no"` - MsgNo int `json:"msg_no"` - Encoding string `json:"encoding"` - DeletedId string `json:"_deleted_id"` - IsDel bool `json:"is_del"` - Id2 string `json:"_id"` - Dc string `json:"dc"` - Version string `json:"version"` - MessageType string `json:"message_type,omitempty"` - SrcMsgID string `json:"src_msg_id,omitempty"` - Obrisan bool `json:"obrisan"` + DocType string `json:"doc_type"` + Type string `json:"type"` + DocId string `json:"doc_id"` + Id string `json:"id"` + DocAction string `json:"doc_action"` + Action string `json:"action"` + IgracId string `json:"igrac_id"` + AccountType int `json:"account_type"` + From string `json:"from"` + To string `json:"to"` + Ts int `json:"ts"` + No int `json:"no"` + MsgNo int `json:"msg_no"` + Encoding compress.Encoding `json:"encoding"` + DeletedId string `json:"_deleted_id"` + IsDel bool `json:"is_del"` + Id2 string `json:"_id"` + Dc string `json:"dc"` + Version string `json:"version"` + MessageType string `json:"message_type,omitempty"` + SrcMsgID string `json:"src_msg_id,omitempty"` + Obrisan bool `json:"obrisan"` NSQReqRspEnvelope }{ No: -1, @@ -366,7 +395,7 @@ func (b *Backend) pack() []byte { if b.jsonBody != nil { b.Body, _ = b.jsonBody.Encode() b.RawBody = b.Body - b.Encoding = "" + b.Encoding = compress.EncodingNone } //igracid i no imaju defaulte koji se ne serijaliziraju lijepo uz ommitempty, pa malo kemijam oko toga //volio bi neko inteligentnije rjesenje diff --git a/pkg/msgs/backend_test.go b/pkg/msgs/backend_test.go index 650cbc6..4ae9b5e 100644 --- a/pkg/msgs/backend_test.go +++ b/pkg/msgs/backend_test.go @@ -1,7 +1,7 @@ package msgs import ( - "io/ioutil" + "os" "sort" "strings" "testing" @@ -53,7 +53,35 @@ func TestTecajnaManifest(t *testing.T) { } func TestMsgGzip(t *testing.T) { - content, err := ioutil.ReadFile("./fixtures/backend_gz") + content, err := os.ReadFile("./fixtures/backend_gz") + assert.Nil(t, err) + msg := parseAsBackend(content) + assert.NotNil(t, msg) + assert.Equal(t, "pero", msg.Type) + assert.Equal(t, "iso medo u ducan", msg.bodyStr()) +} + +func TestMsgZstd(t *testing.T) { + content, err := os.ReadFile("./fixtures/backend_zstd") + assert.Nil(t, err) + msg := parseAsBackend(content) + assert.NotNil(t, msg) + assert.Equal(t, "pero", msg.Type) + assert.Equal(t, "iso medo u ducan", msg.bodyStr()) +} + + +func TestMsgSnappy(t *testing.T) { + content, err := os.ReadFile("./fixtures/backend_snappy") + assert.Nil(t, err) + msg := parseAsBackend(content) + assert.NotNil(t, msg) + assert.Equal(t, "pero", msg.Type) + assert.Equal(t, "iso medo u ducan", msg.bodyStr()) +} + +func TestMsgLz4(t *testing.T) { + content, err := os.ReadFile("./fixtures/backend_lz4") assert.Nil(t, err) msg := parseAsBackend(content) assert.NotNil(t, msg) @@ -62,7 +90,7 @@ func TestMsgGzip(t *testing.T) { } func TestMsgLiveDogadjaj(t *testing.T) { - content, err := ioutil.ReadFile("./fixtures/live_dogadjaj_web2_6430239_full") + content, err := os.ReadFile("./fixtures/live_dogadjaj_web2_6430239_full") assert.Nil(t, err) msg := parseAsBackend(content) assert.NotNil(t, msg) @@ -87,7 +115,7 @@ func TestIsFullIsDiff(t *testing.T) { } func TestCreateBackend(t *testing.T) { - buf := createBackend("pero", 12, 14, nil, true) + buf := createBackend("pero", 12, 14, nil, "gzip") assert.NotNil(t, buf) buf = CreateBackendNoGzip("pero", 12, nil) diff --git a/pkg/msgs/fixtures/backend_lz4 b/pkg/msgs/fixtures/backend_lz4 new file mode 100644 index 0000000..f0fa65d Binary files /dev/null and b/pkg/msgs/fixtures/backend_lz4 differ diff --git a/pkg/msgs/fixtures/backend_snappy b/pkg/msgs/fixtures/backend_snappy new file mode 100644 index 0000000..d68e255 Binary files /dev/null and b/pkg/msgs/fixtures/backend_snappy differ diff --git a/pkg/msgs/fixtures/backend_zstd b/pkg/msgs/fixtures/backend_zstd new file mode 100644 index 0000000..3b98441 Binary files /dev/null and b/pkg/msgs/fixtures/backend_zstd differ