Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/consul/api v1.18.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.13.6
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/minus5/go-simplejson v0.5.1-0.20190518182223-8af509724a86
github.com/nranchev/go-libGeoIP v0.0.0-20170629073846-d6d4a9a4c7e8
github.com/nsqio/go-nsq v1.1.0
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/oschwald/geoip2-golang v1.7.0
github.com/pierrec/lz4/v4 v4.1.26
github.com/pkg/errors v0.9.1
github.com/satori/go.uuid v1.2.0
github.com/smira/go-statsd v1.3.2
Expand All @@ -36,7 +38,7 @@ require (
require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.4.0 // indirect
Expand All @@ -45,7 +47,6 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -192,6 +192,8 @@ github.com/oschwald/maxminddb-golang v1.9.0/go.mod h1:TK+s/Z2oZq0rSl4PSeAEoP0bgm
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
133 changes: 131 additions & 2 deletions pkg/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,42 @@ package compress
import (
"bytes"
"compress/gzip"
"io/ioutil"
"io"
"sync"

"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)

// Encoding is the name of a compression algorithm as used in message headers.
type Encoding string

const (
EncodingNone Encoding = ""
EncodingGzip Encoding = "gzip"
EncodingZstd Encoding = "zstd"
EncodingSnappy Encoding = "snappy"
EncodingLz4 Encoding = "lz4"
)

var (
zstdEncoder *zstd.Encoder
zstdDecoder *zstd.Decoder
)

func init() {
var err error
zstdEncoder, err = zstd.NewWriter(nil)
if err != nil {
panic(err)
}
zstdDecoder, err = zstd.NewReader(nil)
if err != nil {
panic(err)
}
}

//Gzip - compess input
func Gzip(data []byte) []byte {
var b bytes.Buffer
Expand All @@ -30,7 +62,7 @@ func Gunzip(data []byte) ([]byte, error) {
return nil, err
}
defer r.Close()
out, err := ioutil.ReadAll(r)
out, err := io.ReadAll(r)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -62,6 +94,103 @@ func IsGziped(buf []byte) bool {
return false
}

// Zstd compresses data with zstandard.
func Zstd(data []byte) []byte {
return zstdEncoder.EncodeAll(data, nil)
}

// Unzstd decompresses zstandard data.
func Unzstd(data []byte) ([]byte, error) {
return zstdDecoder.DecodeAll(data, nil)
}

// IsZstd reports whether buf starts with the zstandard frame magic.
func IsZstd(buf []byte) bool {
return len(buf) >= 4 &&
buf[0] == 0x28 && buf[1] == 0xB5 && buf[2] == 0x2F && buf[3] == 0xFD
}

// Snappy compresses data using the snappy streaming (framing) format.
func Snappy(data []byte) []byte {
var b bytes.Buffer
w := snappy.NewBufferedWriter(&b)
w.Write(data)
w.Close()
return b.Bytes()
}

// Unsnappy decompresses snappy streaming-format data.
func Unsnappy(data []byte) ([]byte, error) {
r := snappy.NewReader(bytes.NewReader(data))
return io.ReadAll(r)
}

// IsSnappy reports whether buf starts with the snappy framing-format stream identifier.
func IsSnappy(buf []byte) bool {
// Stream identifier chunk: 0xff followed by chunk length 0x060000 and "sNaPpY"
return len(buf) >= 10 &&
buf[0] == 0xff &&
buf[1] == 0x06 && buf[2] == 0x00 && buf[3] == 0x00 &&
buf[4] == 0x73 && buf[5] == 0x4e && buf[6] == 0x61 &&
buf[7] == 0x50 && buf[8] == 0x70 && buf[9] == 0x59
}

// Lz4 compresses data using the lz4 frame format.
func Lz4(data []byte) []byte {
var b bytes.Buffer
w := lz4.NewWriter(&b)
w.Write(data)
w.Close()
return b.Bytes()
}

// Unlz4 decompresses lz4 frame-format data.
func Unlz4(data []byte) ([]byte, error) {
r := lz4.NewReader(bytes.NewReader(data))
return io.ReadAll(r)
}

// IsLz4 reports whether buf starts with the lz4 frame magic number.
func IsLz4(buf []byte) bool {
return len(buf) >= 4 &&
buf[0] == 0x04 && buf[1] == 0x22 && buf[2] == 0x4d && buf[3] == 0x18
}

// Detect returns the encoding of buf or EncodingNone if unrecognized.
func Detect(buf []byte) Encoding {
switch {
case IsGziped(buf):
return EncodingGzip
case IsZstd(buf):
return EncodingZstd
case IsSnappy(buf):
return EncodingSnappy
case IsLz4(buf):
return EncodingLz4
}
return EncodingNone
}

// DecompressIf decompresses buf if a known compression format is detected.
// Returns the (possibly decompressed) data, the detected encoding, and any error.
func DecompressIf(data []byte) ([]byte, Encoding, error) {
switch {
case IsGziped(data):
out, err := Gunzip(data)
return out, EncodingGzip, err
case IsZstd(data):
out, err := Unzstd(data)
return out, EncodingZstd, err
case IsSnappy(data):
out, err := Unsnappy(data)
return out, EncodingSnappy, err
case IsLz4(data):
out, err := Unlz4(data)
return out, EncodingLz4, err
}
return data, EncodingNone, nil
}

// Gzipper koristi jedan gzip writer, namijenjen je da se koristi single threaded
// jer uvijek koristi isti buffer za pisanje kompresiranih podataka
type Gzipper struct {
Expand Down
75 changes: 75 additions & 0 deletions pkg/compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,81 @@ func cacheAdd(b []byte) {
}
}

func TestZstd(t *testing.T) {
input := []byte("iso medo u ducan nije rekao dobar dan")

compressed := Zstd(input)
assert.True(t, IsZstd(compressed))
assert.False(t, IsZstd(input))

out, err := Unzstd(compressed)
assert.Nil(t, err)
assert.Equal(t, input, out)
}

func TestSnappy(t *testing.T) {
input := []byte("iso medo u ducan nije rekao dobar dan")

compressed := Snappy(input)
assert.True(t, IsSnappy(compressed))
assert.False(t, IsSnappy(input))

out, err := Unsnappy(compressed)
assert.Nil(t, err)
assert.Equal(t, input, out)
}

func TestLz4(t *testing.T) {
input := []byte("iso medo u ducan nije rekao dobar dan")

compressed := Lz4(input)
assert.True(t, IsLz4(compressed))
assert.False(t, IsLz4(input))

out, err := Unlz4(compressed)
assert.Nil(t, err)
assert.Equal(t, input, out)
}

func TestDetect(t *testing.T) {
input := []byte("iso medo u ducan nije rekao dobar dan")

assert.Equal(t, EncodingGzip, Detect(Gzip(input)))
assert.Equal(t, EncodingZstd, Detect(Zstd(input)))
assert.Equal(t, EncodingSnappy, Detect(Snappy(input)))
assert.Equal(t, EncodingLz4, Detect(Lz4(input)))
assert.Equal(t, EncodingNone, Detect(input))
}

func TestDecompressIf(t *testing.T) {
input := []byte("iso medo u ducan nije rekao dobar dan")

cases := []struct {
name string
compress func([]byte) []byte
encoding Encoding
}{
{"gzip", Gzip, EncodingGzip},
{"zstd", Zstd, EncodingZstd},
{"snappy", Snappy, EncodingSnappy},
{"lz4", Lz4, EncodingLz4},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
out, enc, err := DecompressIf(tc.compress(input))
assert.Nil(t, err)
assert.Equal(t, input, out)
assert.Equal(t, tc.encoding, enc)
})
}

// uncompressed passthrough
out, enc, err := DecompressIf(input)
assert.Nil(t, err)
assert.Equal(t, input, out)
assert.Equal(t, EncodingNone, enc)
}

// BenchmarkGzip benchmark za gzip da vidim potrosnju memorije
// Pokrecem da radi samo BenchmarkGzip
// go test -v -run NOTest -benchmem -benchtime 10s -bench BenchmarkGzip -memprofile=mem0.out
Expand Down
Loading