From 3de7b8c5faed7876f886d755f6f2860d5471788d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 09:38:10 +0000 Subject: [PATCH 1/9] feat(model): add CommandStore abstraction with bbolt backend Introduce a CommandStore interface that abstracts local command buffering, with two implementations: - fileStore: wraps the existing append-only txt files (pre/post/cursor), the always-available fallback. Behavior unchanged. - boltStore: a bbolt-backed store intended to be owned by the single long-lived daemon (bbolt holds an exclusive file lock). Pre commands go in the "active" bucket, post in "archived", and the sync cursor in "meta", matching the pre-existing activeBucket/archivedBucket constants. Keys are time-ordered (8-byte big-endian UnixNano + sequence) so iteration is chronological and identical timestamps never collide. Add a feature-gated config field (storage.engine: file|bolt, default file) selected via NewCommandStore. Nothing is wired into the track hot path or daemon yet; this is an additive, behavior-neutral first step. --- go.mod | 1 + go.sum | 4 + model/path.go | 5 + model/store.go | 73 +++++++++++++ model/store_bolt.go | 220 +++++++++++++++++++++++++++++++++++++++ model/store_bolt_test.go | 156 +++++++++++++++++++++++++++ model/store_file.go | 159 ++++++++++++++++++++++++++++ model/types.go | 12 +++ 8 files changed, 630 insertions(+) create mode 100644 model/store.go create mode 100644 model/store_bolt.go create mode 100644 model/store_bolt_test.go create mode 100644 model/store_file.go diff --git a/go.mod b/go.mod index 62f4977..7ac0a90 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/uptrace/uptrace-go v1.39.0 github.com/urfave/cli/v2 v2.27.7 + go.etcd.io/bbolt v1.4.3 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 go.opentelemetry.io/otel v1.39.0 go.opentelemetry.io/otel/trace v1.39.0 diff --git a/go.sum b/go.sum index d48dac6..fb86f4e 100644 --- a/go.sum +++ b/go.sum @@ -181,6 +181,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJu github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= @@ -234,6 +236,8 @@ golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/model/path.go b/model/path.go index 0d206c5..639bb84 100644 --- a/model/path.go +++ b/model/path.go @@ -79,6 +79,11 @@ func GetCursorFilePath() string { return GetStoragePath("commands", "cursor.txt") } +// GetBoltDBPath returns the path to the bbolt command database (daemon-owned). +func GetBoltDBPath() string { + return GetStoragePath("commands", "commands.db") +} + // GetHeartbeatLogFilePath returns the path to the heartbeat log file func GetHeartbeatLogFilePath() string { return GetStoragePath("coding-heartbeat.data.log") diff --git a/model/store.go b/model/store.go new file mode 100644 index 0000000..e93ccd9 --- /dev/null +++ b/model/store.go @@ -0,0 +1,73 @@ +package model + +import ( + "context" + "time" +) + +// CommandStore abstracts persistence of tracked commands so the buffering +// backend can be swapped without touching call sites. +// +// Two implementations exist: +// - fileStore: the historical append-only txt files (pre.txt/post.txt/cursor.txt). +// Always available; used as the fallback whenever bolt is disabled or no daemon +// is running. +// - boltStore: a bbolt embedded KV store. bbolt holds an exclusive OS file lock, +// so it is meant to be owned by the single long-lived daemon process. +// +// Pre commands live in the "active" bucket, post commands in "archived"; the +// sync cursor lives in a "meta" bucket. These names mirror the activeBucket / +// archivedBucket constants in command.go. +type CommandStore interface { + // SavePre persists a pre-execution command record. + SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error + // SavePost persists a post-execution command record with its exit code. + SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error + + // GetPreTree returns pre commands indexed by their unique key, used to pair + // post commands with the originating pre command during sync. + GetPreTree(ctx context.Context) (preCommandTree, error) + // GetPreCommands returns all pre commands (used by compaction). + GetPreCommands(ctx context.Context) ([]*Command, error) + // GetPostCommands returns all post commands with RecordingTime populated. + GetPostCommands(ctx context.Context) ([]*Command, error) + + // GetLastCursor returns the last synced recording time. noCursorExist is true + // when no cursor has ever been written (first sync). + GetLastCursor(ctx context.Context) (cursorTime time.Time, noCursorExist bool, err error) + // SetCursor advances the sync cursor. + SetCursor(ctx context.Context, cursor time.Time) error + + // Prune removes records that have already been synced (recording time at or + // before the cursor), keeping unfinished pre commands. + Prune(ctx context.Context, cursor time.Time) error + + // Close releases any resources held by the store (no-op for fileStore). + Close() error +} + +// StorageEngineFile is the default, always-available txt-file backend. +const StorageEngineFile = "file" + +// StorageEngineBolt selects the bbolt backend (daemon-owned). +const StorageEngineBolt = "bolt" + +// NewCommandStore builds the store selected by config. It falls back to the +// file store for any unknown / empty engine so misconfiguration never loses data. +// +// The bolt store opens (and locks) the DB file; callers must Close it. Because +// bbolt takes an exclusive lock, only one process (the daemon) should construct +// a bolt store at a time. +func NewCommandStore(cfg ShellTimeConfig) (CommandStore, error) { + engine := StorageEngineFile + if cfg.Storage != nil && cfg.Storage.Engine != "" { + engine = cfg.Storage.Engine + } + + switch engine { + case StorageEngineBolt: + return newBoltStore(GetBoltDBPath()) + default: + return newFileStore(), nil + } +} diff --git a/model/store_bolt.go b/model/store_bolt.go new file mode 100644 index 0000000..c1c5387 --- /dev/null +++ b/model/store_bolt.go @@ -0,0 +1,220 @@ +package model + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "time" + + bolt "go.etcd.io/bbolt" +) + +const ( + // metaBucket holds singleton values such as the sync cursor. + metaBucket = "meta" + // cursorKey is the key under metaBucket storing the last synced recording time. + cursorKey = "cursor" + // boltOpenTimeout bounds how long we wait for the exclusive file lock before + // giving up, so a stale lock can't hang the daemon forever. + boltOpenTimeout = 5 * time.Second +) + +// boltStore persists commands in a bbolt database. It holds an exclusive OS +// file lock for its lifetime, so only the daemon should own one. +type boltStore struct { + db *bolt.DB +} + +func newBoltStore(path string) (*boltStore, error) { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, fmt.Errorf("failed to create bolt db folder: %w", err) + } + + db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: boltOpenTimeout}) + if err != nil { + return nil, fmt.Errorf("failed to open bolt db %s: %w", path, err) + } + + err = db.Update(func(tx *bolt.Tx) error { + for _, name := range []string{activeBucket, archivedBucket, metaBucket} { + if _, err := tx.CreateBucketIfNotExists([]byte(name)); err != nil { + return err + } + } + return nil + }) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to init bolt buckets: %w", err) + } + + return &boltStore{db: db}, nil +} + +// encodeKey produces a time-ordered, collision-free key: +// 8-byte big-endian UnixNano of the recording time + 8-byte sequence. +func encodeKey(recordingTime time.Time, seq uint64) []byte { + key := make([]byte, 16) + binary.BigEndian.PutUint64(key[0:8], uint64(recordingTime.UnixNano())) + binary.BigEndian.PutUint64(key[8:16], seq) + return key +} + +func decodeKeyNano(key []byte) int64 { + return int64(binary.BigEndian.Uint64(key[0:8])) +} + +func (s *boltStore) put(bucket string, cmd Command, recordingTime time.Time) error { + val, err := json.Marshal(cmd) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + seq, err := b.NextSequence() + if err != nil { + return err + } + return b.Put(encodeKey(recordingTime, seq), val) + }) +} + +func (s *boltStore) all(bucket string) ([]*Command, error) { + result := make([]*Command, 0) + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + return b.ForEach(func(k, v []byte) error { + cmd := new(Command) + if err := json.Unmarshal(v, cmd); err != nil { + slog.Warn("failed to unmarshal command from bolt", slog.Any("err", err)) + return nil + } + cmd.RecordingTime = time.Unix(0, decodeKeyNano(k)) + result = append(result, cmd) + return nil + }) + }) + return result, err +} + +func (s *boltStore) SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error { + cmd.Phase = CommandPhasePre + return s.put(activeBucket, cmd, recordingTime) +} + +func (s *boltStore) SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error { + cmd.Phase = CommandPhasePost + cmd.Result = result + cmd.EndTime = time.Now() + return s.put(archivedBucket, cmd, recordingTime) +} + +func (s *boltStore) GetPreTree(ctx context.Context) (preCommandTree, error) { + cmds, err := s.all(activeBucket) + if err != nil { + return nil, err + } + tree := make(preCommandTree) + for _, cmd := range cmds { + key := cmd.GetUniqueKey() + tree[key] = append(tree[key], cmd) + } + return tree, nil +} + +func (s *boltStore) GetPreCommands(ctx context.Context) ([]*Command, error) { + return s.all(activeBucket) +} + +func (s *boltStore) GetPostCommands(ctx context.Context) ([]*Command, error) { + return s.all(archivedBucket) +} + +func (s *boltStore) GetLastCursor(ctx context.Context) (cursorTime time.Time, noCursorExist bool, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket([]byte(metaBucket)).Get([]byte(cursorKey)) + if v == nil { + noCursorExist = true + cursorTime = time.Time{} + return nil + } + cursorTime = time.Unix(0, int64(binary.BigEndian.Uint64(v))) + return nil + }) + return +} + +func (s *boltStore) SetCursor(ctx context.Context, cursor time.Time) error { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(cursor.UnixNano())) + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket([]byte(metaBucket)).Put([]byte(cursorKey), buf) + }) +} + +// Prune deletes synced post commands (recording time <= cursor) and the pre +// commands that have a matching post. Unfinished pre commands are kept, matching +// the historical gc behavior. +func (s *boltStore) Prune(ctx context.Context, cursor time.Time) error { + postCommands, err := s.GetPostCommands(ctx) + if err != nil { + return err + } + cursorNano := cursor.UnixNano() + + return s.db.Update(func(tx *bolt.Tx) error { + archived := tx.Bucket([]byte(archivedBucket)) + var delArchived [][]byte + if err := archived.ForEach(func(k, v []byte) error { + if decodeKeyNano(k) <= cursorNano { + delArchived = append(delArchived, append([]byte(nil), k...)) + } + return nil + }); err != nil { + return err + } + for _, k := range delArchived { + if err := archived.Delete(k); err != nil { + return err + } + } + + active := tx.Bucket([]byte(activeBucket)) + var delActive [][]byte + if err := active.ForEach(func(k, v []byte) error { + nano := decodeKeyNano(k) + if nano > cursorNano { + return nil // keep anything newer than the cursor + } + cmd := new(Command) + if err := json.Unmarshal(v, cmd); err != nil { + return nil + } + cmd.RecordingTime = time.Unix(0, nano) + closest := cmd.FindClosestCommand(postCommands, true) + if closest != nil && !closest.IsNil() { + delActive = append(delActive, append([]byte(nil), k...)) + } + return nil + }); err != nil { + return err + } + for _, k := range delActive { + if err := active.Delete(k); err != nil { + return err + } + } + return nil + }) +} + +func (s *boltStore) Close() error { + if s.db == nil { + return nil + } + return s.db.Close() +} diff --git a/model/store_bolt_test.go b/model/store_bolt_test.go new file mode 100644 index 0000000..d6fc61b --- /dev/null +++ b/model/store_bolt_test.go @@ -0,0 +1,156 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type BoltStoreTestSuite struct { + suite.Suite + store *boltStore + ctx context.Context +} + +func (s *BoltStoreTestSuite) SetupTest() { + dbPath := filepath.Join(s.T().TempDir(), "commands.db") + st, err := newBoltStore(dbPath) + s.Require().NoError(err) + s.store = st + s.ctx = context.Background() +} + +func (s *BoltStoreTestSuite) TearDownTest() { + s.Require().NoError(s.store.Close()) +} + +func (s *BoltStoreTestSuite) newCmd(cmd string, t time.Time) Command { + return Command{ + Shell: "bash", + SessionID: 1, + Command: cmd, + Username: "tester", + Time: t, + } +} + +func (s *BoltStoreTestSuite) TestSavePreAndRead() { + now := time.Now() + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("ls", now), now)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("pwd", now.Add(time.Second)), now.Add(time.Second))) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 2) + s.EqualValues(CommandPhasePre, pre[0].Phase) + // RecordingTime recovered from the key + s.Equal(now.UnixNano(), pre[0].RecordingTime.UnixNano()) +} + +func (s *BoltStoreTestSuite) TestTimeOrdering() { + base := time.Now() + // insert out of order + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("c", base.Add(2*time.Second)), base.Add(2*time.Second))) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("a", base), base)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("b", base.Add(time.Second)), base.Add(time.Second))) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 3) + s.Equal("a", pre[0].Command) + s.Equal("b", pre[1].Command) + s.Equal("c", pre[2].Command) +} + +func (s *BoltStoreTestSuite) TestSameNanoNoCollision() { + now := time.Now() + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("first", now), now)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("second", now), now)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Len(pre, 2, "identical recording times must not overwrite each other") +} + +func (s *BoltStoreTestSuite) TestPreTree() { + now := time.Now() + c := s.newCmd("git status", now) + s.Require().NoError(s.store.SavePre(s.ctx, c, now)) + + tree, err := s.store.GetPreTree(s.ctx) + s.Require().NoError(err) + s.Require().Len(tree[c.GetUniqueKey()], 1) +} + +func (s *BoltStoreTestSuite) TestSavePost() { + now := time.Now() + s.Require().NoError(s.store.SavePost(s.ctx, s.newCmd("make", now), 2, now)) + + post, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(post, 1) + s.EqualValues(CommandPhasePost, post[0].Phase) + s.Equal(2, post[0].Result) +} + +func (s *BoltStoreTestSuite) TestCursor() { + _, noCursor, err := s.store.GetLastCursor(s.ctx) + s.Require().NoError(err) + s.True(noCursor, "fresh db has no cursor") + + c := time.Unix(0, 1700000000123456789) + s.Require().NoError(s.store.SetCursor(s.ctx, c)) + + got, noCursor, err := s.store.GetLastCursor(s.ctx) + s.Require().NoError(err) + s.False(noCursor) + s.Equal(c.UnixNano(), got.UnixNano()) +} + +func (s *BoltStoreTestSuite) TestPrune() { + base := time.Now() + synced := base + unsyncedPost := base.Add(10 * time.Second) + + // a finished command (pre+post at the same time) before the cursor -> both pruned + finished := s.newCmd("finished", synced) + s.Require().NoError(s.store.SavePre(s.ctx, finished, synced)) + s.Require().NoError(s.store.SavePost(s.ctx, finished, 0, synced)) + + // an unfinished pre before the cursor -> kept + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("running-server", synced), synced)) + + // a post after the cursor -> kept + s.Require().NoError(s.store.SavePost(s.ctx, s.newCmd("later", unsyncedPost), 0, unsyncedPost)) + + s.Require().NoError(s.store.Prune(s.ctx, synced)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 1) + s.Equal("running-server", pre[0].Command) + + post, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(post, 1) + s.Equal("later", post[0].Command) +} + +func TestBoltStoreSuite(t *testing.T) { + suite.Run(t, new(BoltStoreTestSuite)) +} + +func TestNewCommandStoreSelectsEngine(t *testing.T) { + fileCfg := ShellTimeConfig{} + st, err := NewCommandStore(fileCfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok := st.(*fileStore); !ok { + t.Fatalf("expected fileStore by default, got %T", st) + } + _ = st.Close() +} diff --git a/model/store_file.go b/model/store_file.go new file mode 100644 index 0000000..6f4e992 --- /dev/null +++ b/model/store_file.go @@ -0,0 +1,159 @@ +package model + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "os" + "sort" + "time" +) + +// fileStore is the historical append-only txt backend. It delegates to the +// existing package-level reader functions so behavior is unchanged. +type fileStore struct{} + +func newFileStore() *fileStore { return &fileStore{} } + +func (s *fileStore) appendLine(path string, cmd Command, recordingTime time.Time) error { + if err := ensureStorageFolder(); err != nil { + return err + } + buf, err := cmd.ToLine(recordingTime) + if err != nil { + return err + } + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("failed to open command storage file %s: %w", path, err) + } + defer f.Close() + if _, err := f.Write(buf); err != nil { + return fmt.Errorf("failed to write command storage file %s: %w", path, err) + } + return nil +} + +func (s *fileStore) SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error { + cmd.Phase = CommandPhasePre + return s.appendLine(GetPreCommandFilePath(), cmd, recordingTime) +} + +func (s *fileStore) SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error { + cmd.Phase = CommandPhasePost + cmd.Result = result + cmd.EndTime = time.Now() + return s.appendLine(GetPostCommandFilePath(), cmd, recordingTime) +} + +func (s *fileStore) GetPreTree(ctx context.Context) (preCommandTree, error) { + return GetPreCommandsTree(ctx) +} + +func (s *fileStore) GetPreCommands(ctx context.Context) ([]*Command, error) { + return GetPreCommands(ctx) +} + +func (s *fileStore) GetPostCommands(ctx context.Context) ([]*Command, error) { + raw, _, err := GetPostCommands(ctx) + if err != nil { + return nil, err + } + result := make([]*Command, 0, len(raw)) + for _, line := range raw { + cmd := new(Command) + if _, err := cmd.FromLineBytes(line); err != nil { + slog.Warn("failed to parse post command line", slog.Any("err", err)) + continue + } + result = append(result, cmd) + } + return result, nil +} + +func (s *fileStore) GetLastCursor(ctx context.Context) (time.Time, bool, error) { + return GetLastCursor(ctx) +} + +func (s *fileStore) SetCursor(ctx context.Context, cursor time.Time) error { + if err := ensureStorageFolder(); err != nil { + return err + } + cursorFile, err := os.OpenFile(GetCursorFilePath(), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return err + } + defer cursorFile.Close() + _, err = cursorFile.WriteString(fmt.Sprintf("\n%d\n", cursor.UnixNano())) + return err +} + +// Prune compacts the txt files, dropping synced records and keeping unfinished +// pre commands. Mirrors the historical gc cleanCommandFiles behavior. +func (s *fileStore) Prune(ctx context.Context, cursor time.Time) error { + postCommands, err := s.GetPostCommands(ctx) + if err != nil { + return err + } + if len(postCommands) == 0 { + return nil + } + preCommands, err := s.GetPreCommands(ctx) + if err != nil { + return err + } + + newPost := make([]*Command, 0) + for _, cmd := range postCommands { + if cmd != nil && cmd.RecordingTime.After(cursor) { + newPost = append(newPost, cmd) + } + } + + newPre := make([]*Command, 0) + for _, row := range preCommands { + if row == nil { + continue + } + if row.RecordingTime.After(cursor) { + newPre = append(newPre, row) + continue + } + // keep pre commands that never got a matching post (unfinished) + closest := row.FindClosestCommand(postCommands, true) + if closest == nil || closest.IsNil() { + newPre = append(newPre, row) + } + } + + sort.Slice(newPre, func(i, j int) bool { return newPre[i].RecordingTime.Before(newPre[j].RecordingTime) }) + sort.Slice(newPost, func(i, j int) bool { return newPost[i].RecordingTime.Before(newPost[j].RecordingTime) }) + + preBuf := bytes.Buffer{} + for _, cmd := range newPre { + line, err := cmd.ToLine(cmd.RecordingTime) + if err != nil { + return err + } + preBuf.Write(line) + } + postBuf := bytes.Buffer{} + for _, cmd := range newPost { + line, err := cmd.ToLine(cmd.RecordingTime) + if err != nil { + return err + } + postBuf.Write(line) + } + + if err := os.WriteFile(GetPreCommandFilePath(), preBuf.Bytes(), 0644); err != nil { + return err + } + if err := os.WriteFile(GetPostCommandFilePath(), postBuf.Bytes(), 0644); err != nil { + return err + } + return os.WriteFile(GetCursorFilePath(), []byte(fmt.Sprintf("%d", cursor.UnixNano())), 0644) +} + +func (s *fileStore) Close() error { return nil } diff --git a/model/types.go b/model/types.go index dc8e208..8d6ff66 100644 --- a/model/types.go +++ b/model/types.go @@ -97,11 +97,23 @@ type ShellTimeConfig struct { // LogCleanup configuration for automatic log file cleanup in daemon LogCleanup *LogCleanup `toml:"logCleanup" yaml:"logCleanup" json:"logCleanup"` + // Storage selects the local command buffering backend. When unset the + // always-available txt file store is used. + Storage *StorageConfig `toml:"storage" yaml:"storage,omitempty" json:"storage,omitempty"` + // SocketPath is the path to the Unix domain socket used for communication // between the CLI and the daemon. SocketPath string `toml:"socketPath" yaml:"socketPath" json:"socketPath"` } +// StorageConfig selects which CommandStore backend buffers tracked commands +// before they sync to the server. +type StorageConfig struct { + // Engine is "file" (default) or "bolt". The bolt engine is daemon-owned; + // see CommandStore for details. + Engine string `toml:"engine" yaml:"engine" json:"engine"` +} + var DefaultAIConfig = &AIConfig{ Agent: AIAgentConfig{ View: false, From c2dd737dccd8df8a59346bc31b7e9aa48b57b8dd Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 09:49:26 +0000 Subject: [PATCH 2/9] feat(daemon): persist track events to the bolt store Wire the daemon to own the bbolt CommandStore and accept raw command events over the socket when the bolt storage engine is enabled. - Add track_pre / track_post socket message types and TrackEventPayload; route them through the existing pub/sub topic to new handlers. - handlePubSubTrackPre persists to the active bucket; handlePubSubTrackPost persists to archived, then runs the flush/sync/cursor/prune cycle against the store (the daemon-side equivalent of trySyncLocalToServer). On send failure it leaves data in bolt and does not advance the cursor. - Extract the server-send path (terminal resolution, daemon source, encryption, circuit breaker) from handlePubSubSync into a reusable sendTrackArgsToServer, shared by the sync and track handlers. - Extract the pre/post -> []TrackingData assembly into model.BuildTrackingData so both the CLI fallback and the daemon build payloads from one path. - The daemon constructs and owns the bolt store for its lifetime in cmd/daemon/main.go (gated on storage.engine == bolt). - Make CommandStore.GetPreTree return map[string][]*Command so the interface is implementable/fakeable outside the model package. Adds self-contained track handler tests (fake store + config, no mockery). Nothing in the CLI hot path emits these events yet; that follows next. --- cmd/daemon/main.go | 13 +++ daemon/base.go | 11 +++ daemon/handlers.go | 4 + daemon/handlers.sync.go | 7 ++ daemon/handlers.track.go | 101 ++++++++++++++++++++++ daemon/handlers.track_test.go | 158 ++++++++++++++++++++++++++++++++++ daemon/socket.go | 22 +++++ model/store.go | 2 +- model/store_bolt.go | 4 +- model/store_file.go | 2 +- model/tracking_build.go | 124 ++++++++++++++++++++++++++ 11 files changed, 444 insertions(+), 4 deletions(-) create mode 100644 daemon/handlers.track.go create mode 100644 daemon/handlers.track_test.go create mode 100644 model/tracking_build.go diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index a9280dd..63429fd 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -82,6 +82,19 @@ func main() { model.InjectVar(version) cmdService := model.NewCommandService() + // When the bolt storage engine is enabled, the daemon owns the bolt-backed + // command store for its lifetime (bbolt holds an exclusive file lock). + if cfg.Storage != nil && cfg.Storage.Engine == model.StorageEngineBolt { + store, err := model.NewCommandStore(cfg) + if err != nil { + slog.Error("Failed to open bolt command store", slog.Any("err", err)) + } else { + daemon.InitCommandStore(store) + defer store.Close() + slog.Info("Bolt command store initialized") + } + } + pubsub := daemon.NewGoChannel(daemon.PubSubConfig{}, watermill.NewSlogLogger(slog.Default())) msg, err := pubsub.Subscribe(context.Background(), daemon.PubSubTopic) diff --git a/daemon/base.go b/daemon/base.go index 82787c1..6648217 100644 --- a/daemon/base.go +++ b/daemon/base.go @@ -10,6 +10,17 @@ var stConfig model.ConfigService var version string var startedAt time.Time +// commandStore is the daemon-owned bolt CommandStore, set when the bolt storage +// engine is enabled. It is nil when running with the default file engine. +var commandStore model.CommandStore + +// InitCommandStore registers the daemon-owned command store used by the bolt +// track handlers. The daemon owns the store for its lifetime (bbolt holds an +// exclusive file lock). +func InitCommandStore(store model.CommandStore) { + commandStore = store +} + const ( PubSubTopic = "socket" ) diff --git a/daemon/handlers.go b/daemon/handlers.go index 080f6fc..9a452e1 100644 --- a/daemon/handlers.go +++ b/daemon/handlers.go @@ -24,6 +24,10 @@ func SocketTopicProcessor(messages <-chan *message.Message) { switch socketMsg.Type { case SocketMessageTypeSync: err = handlePubSubSync(ctx, socketMsg.Payload) + case SocketMessageTypeTrackPre: + err = handlePubSubTrackPre(ctx, socketMsg.Payload) + case SocketMessageTypeTrackPost: + err = handlePubSubTrackPost(ctx, socketMsg.Payload) case SocketMessageTypeHeartbeat: err = handlePubSubHeartbeat(ctx, socketMsg.Payload) default: diff --git a/daemon/handlers.sync.go b/daemon/handlers.sync.go index b3e21cc..d0dbbdb 100644 --- a/daemon/handlers.sync.go +++ b/daemon/handlers.sync.go @@ -34,6 +34,13 @@ func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { return err } + return sendTrackArgsToServer(ctx, syncMsg) +} + +// sendTrackArgsToServer enriches a tracking payload (terminal resolution, daemon +// source, optional encryption) and sends it to the server, recording circuit +// breaker state. It is shared by the sync handler and the bolt track handler. +func sendTrackArgsToServer(ctx context.Context, syncMsg model.PostTrackArgs) error { // Resolve terminal from PPID (use first data item's PPID) if len(syncMsg.Data) > 0 && syncMsg.Data[0].PPID > 0 { terminal, multiplexer := ResolveTerminal(syncMsg.Data[0].PPID) diff --git a/daemon/handlers.track.go b/daemon/handlers.track.go new file mode 100644 index 0000000..d2ffe16 --- /dev/null +++ b/daemon/handlers.track.go @@ -0,0 +1,101 @@ +package daemon + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "time" + + "github.com/malamtime/cli/model" +) + +// errNoCommandStore is returned when a track event arrives but the bolt store +// was not initialized (bolt engine disabled). The CLI only emits track events +// when bolt is enabled, so this indicates a misconfiguration. +var errNoCommandStore = errors.New("command store not initialized; bolt storage engine is disabled") + +func parseTrackEvent(payload interface{}) (model.Command, time.Time, error) { + pb, err := json.Marshal(payload) + if err != nil { + return model.Command{}, time.Time{}, err + } + var ev TrackEventPayload + if err := json.Unmarshal(pb, &ev); err != nil { + return model.Command{}, time.Time{}, err + } + recordingTime := time.Unix(0, ev.RecordingTimeNano) + return ev.Command, recordingTime, nil +} + +// handlePubSubTrackPre persists a pre-execution command to the active bucket. +func handlePubSubTrackPre(ctx context.Context, payload interface{}) error { + if commandStore == nil { + return errNoCommandStore + } + cmd, recordingTime, err := parseTrackEvent(payload) + if err != nil { + slog.Error("Failed to parse track_pre payload", slog.Any("err", err)) + return err + } + return commandStore.SavePre(ctx, cmd, recordingTime) +} + +// handlePubSubTrackPost persists a post-execution command, then runs the +// flush/sync/cursor/prune cycle against the bolt store — the daemon-side +// equivalent of commands.trySyncLocalToServer. +func handlePubSubTrackPost(ctx context.Context, payload interface{}) error { + if commandStore == nil { + return errNoCommandStore + } + cmd, recordingTime, err := parseTrackEvent(payload) + if err != nil { + slog.Error("Failed to parse track_post payload", slog.Any("err", err)) + return err + } + + if err := commandStore.SavePost(ctx, cmd, cmd.Result, recordingTime); err != nil { + return err + } + + cfg, err := stConfig.ReadConfigFile(ctx) + if err != nil { + slog.Error("Failed to read config in track_post handler", slog.Any("err", err)) + return err + } + + result, err := model.BuildTrackingData(ctx, commandStore, cfg) + if err != nil { + return err + } + if len(result.Data) == 0 { + return nil + } + + // Respect FlushCount, allowing the very first batch through (no cursor yet). + if !result.NoCursorExist && len(result.Data) < cfg.FlushCount { + slog.Debug("not enough data to flush", slog.Int("current", len(result.Data)), slog.Int("flushCount", cfg.FlushCount)) + return nil + } + + args := model.PostTrackArgs{ + CursorID: result.LatestRecordingTime.UnixNano(), + Data: result.Data, + Meta: result.Meta, + } + + if err := sendTrackArgsToServer(ctx, args); err != nil { + slog.Error("Failed to send tracking data from bolt store", slog.Any("err", err)) + // Leave the data in bolt; a later post will retry. Do not advance cursor. + return err + } + + if err := commandStore.SetCursor(ctx, result.LatestRecordingTime); err != nil { + slog.Error("Failed to advance cursor", slog.Any("err", err)) + return err + } + if err := commandStore.Prune(ctx, result.LatestRecordingTime); err != nil { + slog.Warn("Failed to prune synced commands", slog.Any("err", err)) + } + return nil +} diff --git a/daemon/handlers.track_test.go b/daemon/handlers.track_test.go new file mode 100644 index 0000000..020553b --- /dev/null +++ b/daemon/handlers.track_test.go @@ -0,0 +1,158 @@ +package daemon + +import ( + "context" + "testing" + "time" + + "github.com/malamtime/cli/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// fakeCommandStore is an in-memory CommandStore for handler tests. +type fakeCommandStore struct { + pre []*model.Command + post []*model.Command + + cursor time.Time + noCursorExist bool + + cursorSetCalls int + pruneCalls int +} + +func (f *fakeCommandStore) SavePre(ctx context.Context, cmd model.Command, rt time.Time) error { + c := cmd + c.RecordingTime = rt + f.pre = append(f.pre, &c) + return nil +} + +func (f *fakeCommandStore) SavePost(ctx context.Context, cmd model.Command, result int, rt time.Time) error { + c := cmd + c.Result = result + c.RecordingTime = rt + f.post = append(f.post, &c) + return nil +} + +func (f *fakeCommandStore) GetPreTree(ctx context.Context) (map[string][]*model.Command, error) { + tree := make(map[string][]*model.Command) + for _, c := range f.pre { + k := c.GetUniqueKey() + tree[k] = append(tree[k], c) + } + return tree, nil +} + +func (f *fakeCommandStore) GetPreCommands(ctx context.Context) ([]*model.Command, error) { + return f.pre, nil +} + +func (f *fakeCommandStore) GetPostCommands(ctx context.Context) ([]*model.Command, error) { + return f.post, nil +} + +func (f *fakeCommandStore) GetLastCursor(ctx context.Context) (time.Time, bool, error) { + return f.cursor, f.noCursorExist, nil +} + +func (f *fakeCommandStore) SetCursor(ctx context.Context, cursor time.Time) error { + f.cursor = cursor + f.cursorSetCalls++ + return nil +} + +func (f *fakeCommandStore) Prune(ctx context.Context, cursor time.Time) error { + f.pruneCalls++ + return nil +} + +func (f *fakeCommandStore) Close() error { return nil } + +// fakeConfigService implements model.ConfigService without mockery. +type fakeConfigService struct { + cfg model.ShellTimeConfig +} + +func (f fakeConfigService) ReadConfigFile(ctx context.Context, opts ...model.ReadConfigOption) (model.ShellTimeConfig, error) { + return f.cfg, nil +} + +type TrackHandlerTestSuite struct { + suite.Suite + prevStore model.CommandStore + prevConfig model.ConfigService +} + +func (s *TrackHandlerTestSuite) SetupTest() { + s.prevStore = commandStore + s.prevConfig = stConfig +} + +func (s *TrackHandlerTestSuite) TearDownTest() { + commandStore = s.prevStore + stConfig = s.prevConfig +} + +func (s *TrackHandlerTestSuite) TestParseTrackEvent() { + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", Command: "ls"}, + RecordingTimeNano: now.UnixNano(), + } + cmd, rt, err := parseTrackEvent(payload) + require.NoError(s.T(), err) + assert.Equal(s.T(), "ls", cmd.Command) + assert.Equal(s.T(), now.UnixNano(), rt.UnixNano()) +} + +func (s *TrackHandlerTestSuite) TestTrackPreNoStore() { + commandStore = nil + err := handlePubSubTrackPre(context.Background(), TrackEventPayload{}) + assert.ErrorIs(s.T(), err, errNoCommandStore) +} + +func (s *TrackHandlerTestSuite) TestTrackPrePersists() { + store := &fakeCommandStore{} + commandStore = store + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u"}, + RecordingTimeNano: now.UnixNano(), + } + err := handlePubSubTrackPre(context.Background(), payload) + require.NoError(s.T(), err) + require.Len(s.T(), store.pre, 1) + assert.Equal(s.T(), "ls", store.pre[0].Command) +} + +func (s *TrackHandlerTestSuite) TestTrackPostNotEnoughToFlush() { + store := &fakeCommandStore{ + // cursor exists, so FlushCount gating applies + noCursorExist: false, + cursor: time.Now().Add(-time.Hour), + } + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{FlushCount: 10}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + // seed a matching pre so BuildTrackingData yields one data row + require.NoError(s.T(), store.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + err := handlePubSubTrackPost(context.Background(), payload) + require.NoError(s.T(), err) + + // one data row < FlushCount(10): must not send / advance cursor / prune + assert.Len(s.T(), store.post, 1, "post should still be persisted") + assert.Equal(s.T(), 0, store.cursorSetCalls, "cursor must not advance below flush threshold") + assert.Equal(s.T(), 0, store.pruneCalls) +} + +func TestTrackHandlerSuite(t *testing.T) { + suite.Run(t, new(TrackHandlerTestSuite)) +} diff --git a/daemon/socket.go b/daemon/socket.go index 132ddb7..00a5ddd 100644 --- a/daemon/socket.go +++ b/daemon/socket.go @@ -23,8 +23,20 @@ const ( SocketMessageTypeStatus SocketMessageType = "status" SocketMessageTypeCCInfo SocketMessageType = "cc_info" SocketMessageTypeSessionProject SocketMessageType = "session_project" + // SocketMessageTypeTrackPre / TrackPost carry a single raw command event the + // daemon persists to its bolt-backed CommandStore (used when the bolt storage + // engine is enabled). + SocketMessageTypeTrackPre SocketMessageType = "track_pre" + SocketMessageTypeTrackPost SocketMessageType = "track_post" ) +// TrackEventPayload is the payload for track_pre / track_post messages: a single +// command plus the recording time stamped by the CLI. +type TrackEventPayload struct { + Command model.Command `json:"command"` + RecordingTimeNano int64 `json:"recordingTimeNano"` +} + type SessionProjectRequest struct { SessionID string `json:"sessionId"` ProjectPath string `json:"projectPath"` @@ -164,6 +176,16 @@ func (p *SocketHandler) handleConnection(conn net.Conn) { if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { slog.Error("Error to publish topic", slog.Any("err", err)) } + case SocketMessageTypeTrackPre, SocketMessageTypeTrackPost: + buf, err := json.Marshal(msg) + if err != nil { + slog.Error("Error encoding track message", slog.Any("err", err)) + return + } + chMsg := message.NewMessage(watermill.NewUUID(), buf) + if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { + slog.Error("Error publishing track topic", slog.Any("err", err)) + } case SocketMessageTypeHeartbeat: // Only process heartbeat if codeTracking is enabled if p.config.CodeTracking == nil || p.config.CodeTracking.Enabled == nil || !*p.config.CodeTracking.Enabled { diff --git a/model/store.go b/model/store.go index e93ccd9..e0a984f 100644 --- a/model/store.go +++ b/model/store.go @@ -26,7 +26,7 @@ type CommandStore interface { // GetPreTree returns pre commands indexed by their unique key, used to pair // post commands with the originating pre command during sync. - GetPreTree(ctx context.Context) (preCommandTree, error) + GetPreTree(ctx context.Context) (map[string][]*Command, error) // GetPreCommands returns all pre commands (used by compaction). GetPreCommands(ctx context.Context) ([]*Command, error) // GetPostCommands returns all post commands with RecordingTime populated. diff --git a/model/store_bolt.go b/model/store_bolt.go index c1c5387..3a71176 100644 --- a/model/store_bolt.go +++ b/model/store_bolt.go @@ -113,12 +113,12 @@ func (s *boltStore) SavePost(ctx context.Context, cmd Command, result int, recor return s.put(archivedBucket, cmd, recordingTime) } -func (s *boltStore) GetPreTree(ctx context.Context) (preCommandTree, error) { +func (s *boltStore) GetPreTree(ctx context.Context) (map[string][]*Command, error) { cmds, err := s.all(activeBucket) if err != nil { return nil, err } - tree := make(preCommandTree) + tree := make(map[string][]*Command) for _, cmd := range cmds { key := cmd.GetUniqueKey() tree[key] = append(tree[key], cmd) diff --git a/model/store_file.go b/model/store_file.go index 6f4e992..0c0bb3a 100644 --- a/model/store_file.go +++ b/model/store_file.go @@ -47,7 +47,7 @@ func (s *fileStore) SavePost(ctx context.Context, cmd Command, result int, recor return s.appendLine(GetPostCommandFilePath(), cmd, recordingTime) } -func (s *fileStore) GetPreTree(ctx context.Context) (preCommandTree, error) { +func (s *fileStore) GetPreTree(ctx context.Context) (map[string][]*Command, error) { return GetPreCommandsTree(ctx) } diff --git a/model/tracking_build.go b/model/tracking_build.go new file mode 100644 index 0000000..859be79 --- /dev/null +++ b/model/tracking_build.go @@ -0,0 +1,124 @@ +package model + +import ( + "context" + "time" +) + +// TrackingBuildResult is the outcome of assembling pending post commands into a +// server-ready tracking payload. +type TrackingBuildResult struct { + Data []TrackingData + Meta TrackingMetaData + LatestRecordingTime time.Time + Cursor time.Time + NoCursorExist bool +} + +// BuildTrackingData assembles the tracking payload for all post commands newer +// than the store's cursor, pairing each with its closest pre command. It is the +// shared assembly used by both the CLI fallback path and the daemon's bolt path +// (previously inlined in commands.trySyncLocalToServer). +// +// It performs no network IO and does not advance the cursor; callers decide +// whether to flush (FlushCount), send, then SetCursor + Prune. +func BuildTrackingData(ctx context.Context, store CommandStore, config ShellTimeConfig) (TrackingBuildResult, error) { + ctx, span := modelTracer.Start(ctx, "buildTrackingData") + defer span.End() + + var res TrackingBuildResult + + postCommands, err := store.GetPostCommands(ctx) + if err != nil { + return res, err + } + if len(postCommands) == 0 { + return res, nil + } + + cursor, noCursorExist, err := store.GetLastCursor(ctx) + if err != nil { + return res, err + } + res.Cursor = cursor + res.NoCursorExist = noCursorExist + + preTree, err := store.GetPreTree(ctx) + if err != nil { + return res, err + } + + sysInfo, err := GetOSAndVersion() + if err != nil { + sysInfo = &SysInfo{Os: "unknown", Version: "unknown"} + } + + meta := TrackingMetaData{ + OS: sysInfo.Os, + OSVersion: sysInfo.Version, + } + + trackingData := make([]TrackingData, 0) + latest := cursor + + for _, postCommand := range postCommands { + if postCommand == nil { + continue + } + + recordingTime := postCommand.RecordingTime + if recordingTime.Before(cursor) { + continue + } + if recordingTime.After(latest) { + latest = recordingTime + } + + key := postCommand.GetUniqueKey() + preCommands, ok := preTree[key] + if !ok { + continue + } + + if meta.Hostname == "" { + meta.Hostname = postCommand.Hostname + } + if meta.Shell == "" { + meta.Shell = postCommand.Shell + } + if meta.Username == "" { + meta.Username = postCommand.Username + } + + if ShouldExcludeCommand(postCommand.Command, config.Exclude) { + continue + } + + closestPreCommand := postCommand.FindClosestCommand(preCommands, false) + + td := TrackingData{ + SessionID: postCommand.SessionID, + Command: postCommand.Command, + EndTime: postCommand.Time.Unix(), + EndTimeNano: postCommand.Time.UnixNano(), + Result: postCommand.Result, + PPID: postCommand.PPID, + } + + if config.DataMasking != nil && *config.DataMasking { + td.Command = MaskSensitiveTokens(td.Command) + } + + if closestPreCommand != nil { + td.StartTime = closestPreCommand.Time.Unix() + td.StartTimeNano = closestPreCommand.Time.UnixNano() + } + + trackingData = append(trackingData, td) + } + + res.Data = trackingData + res.Meta = meta + res.LatestRecordingTime = latest + return res, nil +} From 21239981707780c71512045afe647fa409404ab3 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 09:52:59 +0000 Subject: [PATCH 3/9] feat(cli): route track events to the daemon when bolt is enabled When storage.engine == bolt and the daemon socket is ready, the track hook sends the raw pre/post command event to the daemon (fire-and-forget) instead of writing the txt files locally; the daemon owns persistence, sync, and pruning. When bolt is disabled or the daemon is down, the CLI falls back to the existing txt file store and local sync, unchanged. Also dedupe the sync assembly: trySyncLocalToServer now builds its payload via model.BuildTrackingData against an explicit file store (NewFileStore) and advances the cursor through the store interface, removing the duplicated inline assembly and updateCursorToFile. Add daemon.SendTrackEvent and model.NewFileStore. --- commands/track.go | 150 +++++++++------------------------------------- daemon/client.go | 33 ++++++++++ model/store.go | 7 +++ 3 files changed, 68 insertions(+), 122 deletions(-) diff --git a/commands/track.go b/commands/track.go index 80682db..bebb031 100644 --- a/commands/track.go +++ b/commands/track.go @@ -2,7 +2,6 @@ package commands import ( "context" - "fmt" "log/slog" "os" "time" @@ -102,6 +101,25 @@ func commandTrack(c *cli.Context) error { return nil } + // When the bolt storage engine is enabled and the daemon is up, hand the raw + // command event to the daemon (fire-and-forget) and let it own persistence, + // sync, and pruning. This keeps the hook fast and avoids the CLI ever opening + // the daemon-locked bolt DB. Otherwise fall back to the txt file store. + useBolt := config.Storage != nil && config.Storage.Engine == model.StorageEngineBolt + if useBolt && daemon.IsSocketReady(ctx, config.SocketPath) { + now := time.Now() + switch cmdPhase { + case "pre": + span.SetAttributes(attribute.Int("phase", 0)) + return daemon.SendTrackEvent(ctx, config.SocketPath, daemon.SocketMessageTypeTrackPre, *instance, now) + case "post": + span.SetAttributes(attribute.Int("phase", 1)) + instance.Result = result + return daemon.SendTrackEvent(ctx, config.SocketPath, daemon.SocketMessageTypeTrackPost, *instance, now) + } + return nil + } + if cmdPhase == "pre" { span.SetAttributes(attribute.Int("phase", 0)) err = instance.DoSavePre() @@ -136,109 +154,17 @@ func trySyncLocalToServer( ) error { isForceSync := options.isForceSync isDryRun := options.isDryRun - postFileContent, lineCount, err := model.GetPostCommands(ctx) - if err != nil { - return err - } - if len(postFileContent) == 0 || lineCount == 0 { - slog.Debug("Not enough records to sync", slog.Int("lineCount", lineCount)) - return nil - } + // The local sync path always uses the txt file store. It must never open the + // bolt DB, which the daemon owns exclusively. + store := model.NewFileStore() - cursor, noCursorExist, err := model.GetLastCursor(ctx) + result, err := model.BuildTrackingData(ctx, store, config) if err != nil { return err } - preFileTree, err := model.GetPreCommandsTree(ctx) - if err != nil { - return err - } - - sysInfo, err := model.GetOSAndVersion() - if err != nil { - slog.Warn("failed to get OS version", slog.Any("err", err)) - sysInfo = &model.SysInfo{ - Os: "unknown", - Version: "unknown", - } - } - - trackingData := make([]model.TrackingData, 0) - var latestRecordingTime time.Time = cursor - - meta := model.TrackingMetaData{ - Hostname: "", - Username: "", - OS: sysInfo.Os, - OSVersion: sysInfo.Version, - Shell: "", - } - - for _, line := range postFileContent { - postCommand := new(model.Command) - recordingTime, err := postCommand.FromLineBytes(line) - if err != nil { - slog.Error("Failed to parse post command", slog.Any("err", err), slog.String("line", string(line))) - continue - } - - if recordingTime.Before(cursor) { - continue - } - if recordingTime.After(latestRecordingTime) { - latestRecordingTime = recordingTime - } - - key := postCommand.GetUniqueKey() - preCommands, ok := preFileTree[key] - if !ok { - continue - } - - if meta.Hostname == "" { - meta.Hostname = postCommand.Hostname - } - if meta.Shell == "" { - meta.Shell = postCommand.Shell - } - if meta.Username == "" { - meta.Username = postCommand.Username - } - - // Check if command should be excluded during sync - if model.ShouldExcludeCommand(postCommand.Command, config.Exclude) { - slog.Debug("Command excluded during sync", slog.String("command", postCommand.Command)) - continue - } - - // here very sure the commandList are all elligable, so no need check here. - closestPreCommand := postCommand.FindClosestCommand(preCommands, false) - - td := model.TrackingData{ - SessionID: postCommand.SessionID, - Command: postCommand.Command, - EndTime: postCommand.Time.Unix(), - EndTimeNano: postCommand.Time.UnixNano(), - Result: postCommand.Result, - PPID: postCommand.PPID, - } - - // data masking - if config.DataMasking != nil && *config.DataMasking == true { - td.Command = model.MaskSensitiveTokens(td.Command) - } - - if closestPreCommand != nil { - td.StartTime = closestPreCommand.Time.Unix() - td.StartTimeNano = closestPreCommand.Time.UnixNano() - } - - trackingData = append(trackingData, td) - } - - if len(trackingData) == 0 { + if len(result.Data) == 0 { slog.Debug("no tracking data need to be sync") return nil } @@ -246,13 +172,13 @@ func trySyncLocalToServer( // no matter the flush count is, just force sync if !isForceSync { // allow first command to be sync with server - if len(trackingData) < config.FlushCount && !noCursorExist { - slog.Debug("not enough data need to flush, abort", slog.Int("current", len(trackingData))) + if len(result.Data) < config.FlushCount && !result.NoCursorExist { + slog.Debug("not enough data need to flush, abort", slog.Int("current", len(result.Data))) return nil } } - err = DoSyncData(ctx, config, latestRecordingTime, trackingData, meta) + err = DoSyncData(ctx, config, result.LatestRecordingTime, result.Data, result.Meta) if err != nil { slog.Error("Failed to send data to server", slog.Any("err", err)) return err @@ -261,8 +187,7 @@ func trySyncLocalToServer( if isDryRun { return nil } - // TODO: update cursor - return updateCursorToFile(ctx, latestRecordingTime) + return store.SetCursor(ctx, result.LatestRecordingTime) } func DoSyncData( @@ -289,22 +214,3 @@ func DoSyncData( // send to socket if the socket is ready return daemon.SendLocalDataToSocket(ctx, socketPath, config, cursor, trackingData, meta) } - -func updateCursorToFile(ctx context.Context, latestRecordingTime time.Time) error { - ctx, span := commandTracer.Start(ctx, "updateCurosr") - defer span.End() - cursorFilePath := model.GetCursorFilePath() - cursorFile, err := os.OpenFile(cursorFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - slog.Error("Failed to open cursor file for writing", slog.Any("err", err)) - return err - } - defer cursorFile.Close() - - _, err = cursorFile.WriteString(fmt.Sprintf("\n%d\n", latestRecordingTime.UnixNano())) - if err != nil { - slog.Error("Failed to write to cursor file", slog.Any("err", err)) - return err - } - return nil -} diff --git a/daemon/client.go b/daemon/client.go index 77a146f..1708c23 100644 --- a/daemon/client.go +++ b/daemon/client.go @@ -51,6 +51,39 @@ func SendLocalDataToSocket( return nil } +// SendTrackEvent sends a single raw command event (pre or post) to the daemon +// for persistence in its bolt store. Fire-and-forget, mirroring the latency +// profile of the txt-file append it replaces. +func SendTrackEvent( + ctx context.Context, + socketPath string, + msgType SocketMessageType, + cmd model.Command, + recordingTime time.Time, +) error { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return err + } + defer conn.Close() + + data := SocketMessage{ + Type: msgType, + Payload: TrackEventPayload{ + Command: cmd, + RecordingTimeNano: recordingTime.UnixNano(), + }, + } + + encoded, err := json.Marshal(data) + if err != nil { + return err + } + + _, err = conn.Write(encoded) + return err +} + // SendSessionProject sends a session-to-project mapping to the daemon (fire-and-forget) func SendSessionProject(socketPath string, sessionID, projectPath string) { conn, err := net.DialTimeout("unix", socketPath, 10*time.Millisecond) diff --git a/model/store.go b/model/store.go index e0a984f..7095d57 100644 --- a/model/store.go +++ b/model/store.go @@ -52,6 +52,13 @@ const StorageEngineFile = "file" // StorageEngineBolt selects the bbolt backend (daemon-owned). const StorageEngineBolt = "bolt" +// NewFileStore returns the txt-file backed store. The CLI uses this for the +// fallback path (bolt disabled or no daemon) and must never open the bolt DB +// directly, since the daemon holds its exclusive lock. +func NewFileStore() CommandStore { + return newFileStore() +} + // NewCommandStore builds the store selected by config. It falls back to the // file store for any unknown / empty engine so misconfiguration never loses data. // From a4016356f2a623189a4a77cb54d482f19caca589 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 09:55:22 +0000 Subject: [PATCH 4/9] feat(cli): serve `ls` from the daemon when bolt is enabled In bolt mode the daemon owns the exclusively-locked DB, so the CLI cannot read it directly. Add a list_commands request/response socket message: `ls` queries the daemon when bolt is enabled and the socket is ready, and falls back to reading the txt file store otherwise. Extract the pre/post pairing for display into model.BuildListedCommands (returning model.ListedCommand) so the daemon and the CLI fallback render identical output, and replace ls.go's inline anonymous struct with it. --- commands/ls.go | 78 +++++++++------------------------------------- daemon/client.go | 23 ++++++++++++++ daemon/socket.go | 28 +++++++++++++++++ model/list.go | 61 ++++++++++++++++++++++++++++++++++++ model/list_test.go | 38 ++++++++++++++++++++++ 5 files changed, 165 insertions(+), 63 deletions(-) create mode 100644 model/list.go create mode 100644 model/list_test.go diff --git a/commands/ls.go b/commands/ls.go index b0d5779..668651b 100644 --- a/commands/ls.go +++ b/commands/ls.go @@ -4,12 +4,12 @@ package commands import ( "encoding/json" "fmt" - "log/slog" "os" "strconv" "time" "github.com/gookit/color" + "github.com/malamtime/cli/daemon" "github.com/malamtime/cli/model" "github.com/olekukonko/tablewriter" "github.com/urfave/cli/v2" @@ -54,66 +54,26 @@ func commandList(c *cli.Context) error { color.Yellow.Println("⚠️ Note: Local data will be cleaned periodically for performance and disk efficiency. To view all of your commands, please run 'shelltime web'") } - // Get post commands - postFileContent, _, err := model.GetPostCommands(ctx) + config, err := configService.ReadConfigFile(ctx) if err != nil { return err } - // Get pre commands tree for reference - preFileTree, err := model.GetPreCommandsTree(ctx) - if err != nil { - return err - } - - // Process commands - var commands []struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` - } - - for _, line := range postFileContent { - postCommand := new(model.Command) - _, err := postCommand.FromLineBytes(line) + // In bolt mode the daemon owns the (exclusively locked) DB, so query it over + // the socket. Otherwise read the txt file store directly. + var commands []model.ListedCommand + useBolt := config.Storage != nil && config.Storage.Engine == model.StorageEngineBolt + if useBolt && daemon.IsSocketReady(ctx, config.SocketPath) { + resp, err := daemon.RequestListCommands(config.SocketPath, 2*time.Second) if err != nil { - slog.Error("Failed to parse post command", slog.Any("err", err), slog.String("line", string(line))) - continue + return err } - - key := postCommand.GetUniqueKey() - preCommands, ok := preFileTree[key] - if !ok { - continue - } - - closestPreCommand := postCommand.FindClosestCommand(preCommands, false) - startTime := postCommand.Time - if closestPreCommand != nil { - startTime = closestPreCommand.Time + commands = resp.Commands + } else { + commands, err = model.BuildListedCommands(ctx, model.NewFileStore()) + if err != nil { + return err } - - commands = append(commands, struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` - }{ - Command: postCommand.Command, - Shell: postCommand.Shell, - StartTime: startTime, - EndTime: postCommand.Time, - Result: postCommand.Result, - Username: postCommand.Username, - Hostname: postCommand.Hostname, - }) } // Output based on format @@ -132,15 +92,7 @@ func outputJSON(commands interface{}) error { return nil } -func outputTable(commands []struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` -}) error { +func outputTable(commands []model.ListedCommand) error { w := tablewriter.NewWriter(os.Stdout) w.Header([]string{"COMMAND", "SHELL", "START TIME", "END TIME", "DURATION(ms)", "STATUS", "USER", "HOST"}) diff --git a/daemon/client.go b/daemon/client.go index 1708c23..2295b02 100644 --- a/daemon/client.go +++ b/daemon/client.go @@ -103,6 +103,29 @@ func SendSessionProject(socketPath string, sessionID, projectPath string) { json.NewEncoder(conn).Encode(msg) } +// RequestListCommands asks the daemon for the locally buffered commands (used +// by `shelltime ls` in bolt mode, since the CLI can't open the locked DB). +func RequestListCommands(socketPath string, timeout time.Duration) (*ListCommandsResponse, error) { + conn, err := net.DialTimeout("unix", socketPath, timeout) + if err != nil { + return nil, err + } + defer conn.Close() + + conn.SetDeadline(time.Now().Add(timeout)) + + msg := SocketMessage{Type: SocketMessageTypeListCommands} + if err := json.NewEncoder(conn).Encode(msg); err != nil { + return nil, err + } + + var response ListCommandsResponse + if err := json.NewDecoder(conn).Decode(&response); err != nil { + return nil, err + } + return &response, nil +} + // RequestCCInfo requests CC info (cost data and git info) from the daemon func RequestCCInfo(socketPath string, timeRange CCInfoTimeRange, workingDir string, timeout time.Duration) (*CCInfoResponse, error) { conn, err := net.DialTimeout("unix", socketPath, timeout) diff --git a/daemon/socket.go b/daemon/socket.go index 00a5ddd..e66406e 100644 --- a/daemon/socket.go +++ b/daemon/socket.go @@ -28,8 +28,17 @@ const ( // engine is enabled). SocketMessageTypeTrackPre SocketMessageType = "track_pre" SocketMessageTypeTrackPost SocketMessageType = "track_post" + // SocketMessageTypeListCommands requests the locally buffered commands from + // the daemon (request/response), used by `shelltime ls` when the bolt store + // is enabled and the CLI cannot open the daemon-locked DB. + SocketMessageTypeListCommands SocketMessageType = "list_commands" ) +// ListCommandsResponse is the daemon's reply to a list_commands request. +type ListCommandsResponse struct { + Commands []model.ListedCommand `json:"commands"` +} + // TrackEventPayload is the payload for track_pre / track_post messages: a single // command plus the recording time stamped by the CLI. type TrackEventPayload struct { @@ -208,6 +217,8 @@ func (p *SocketHandler) handleConnection(conn net.Conn) { // Send acknowledgment to client encoder := json.NewEncoder(conn) encoder.Encode(map[string]string{"status": "ok"}) + case SocketMessageTypeListCommands: + p.handleListCommands(conn) case SocketMessageTypeCCInfo: p.handleCCInfo(conn, msg) case SocketMessageTypeSessionProject: @@ -240,6 +251,23 @@ func (p *SocketHandler) handleStatus(conn net.Conn) { } } +func (p *SocketHandler) handleListCommands(conn net.Conn) { + response := ListCommandsResponse{Commands: []model.ListedCommand{}} + if commandStore != nil { + commands, err := model.BuildListedCommands(context.Background(), commandStore) + if err != nil { + slog.Error("Failed to build listed commands", slog.Any("err", err)) + } else { + response.Commands = commands + } + } + + encoder := json.NewEncoder(conn) + if err := encoder.Encode(response); err != nil { + slog.Error("Error encoding list_commands response", slog.Any("err", err)) + } +} + func (p *SocketHandler) handleCCInfo(conn net.Conn, msg SocketMessage) { slog.Debug("cc_info socket event received") diff --git a/model/list.go b/model/list.go new file mode 100644 index 0000000..cafa1b1 --- /dev/null +++ b/model/list.go @@ -0,0 +1,61 @@ +package model + +import ( + "context" + "time" +) + +// ListedCommand is a paired pre/post command for display by `shelltime ls`. +// JSON tags match the historical `ls -f json` output. +type ListedCommand struct { + Command string `json:"command"` + Shell string `json:"shell"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Result int `json:"result"` + Username string `json:"username"` + Hostname string `json:"hostname"` +} + +// BuildListedCommands pairs each post command with its closest pre command and +// returns the rows for display. Shared by the CLI (file store) and the daemon +// (bolt store, queried over the socket) so both produce identical output. +func BuildListedCommands(ctx context.Context, store CommandStore) ([]ListedCommand, error) { + postCommands, err := store.GetPostCommands(ctx) + if err != nil { + return nil, err + } + preTree, err := store.GetPreTree(ctx) + if err != nil { + return nil, err + } + + commands := make([]ListedCommand, 0, len(postCommands)) + for _, postCommand := range postCommands { + if postCommand == nil { + continue + } + key := postCommand.GetUniqueKey() + preCommands, ok := preTree[key] + if !ok { + continue + } + + closestPreCommand := postCommand.FindClosestCommand(preCommands, false) + startTime := postCommand.Time + if closestPreCommand != nil { + startTime = closestPreCommand.Time + } + + commands = append(commands, ListedCommand{ + Command: postCommand.Command, + Shell: postCommand.Shell, + StartTime: startTime, + EndTime: postCommand.Time, + Result: postCommand.Result, + Username: postCommand.Username, + Hostname: postCommand.Hostname, + }) + } + return commands, nil +} diff --git a/model/list_test.go b/model/list_test.go new file mode 100644 index 0000000..a0e2ea0 --- /dev/null +++ b/model/list_test.go @@ -0,0 +1,38 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBuildListedCommands(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "make build", Username: "u", Hostname: "h", Time: start} + + // pre then post for the same command -> one paired row + require.NoError(t, store.SavePre(ctx, cmd, start)) + post := cmd + post.Time = start.Add(3 * time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + // a post with no matching pre -> skipped + orphan := Command{Shell: "zsh", SessionID: 2, Command: "orphan", Username: "u", Time: start} + require.NoError(t, store.SavePost(ctx, orphan, 1, start)) + + listed, err := BuildListedCommands(ctx, store) + require.NoError(t, err) + require.Len(t, listed, 1) + require.Equal(t, "make build", listed[0].Command) + require.Equal(t, "bash", listed[0].Shell) + require.Equal(t, start.Unix(), listed[0].StartTime.Unix()) + require.Equal(t, post.Time.Unix(), listed[0].EndTime.Unix()) +} From aedc5e1225700b482086dd296fdf1c31d072f1df Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 09:59:51 +0000 Subject: [PATCH 5/9] fix(cli): skip txt command-file gc in bolt mode In bolt mode the daemon prunes synced commands from the DB after each sync, and post.txt typically does not exist, so cleanCommandFiles would error when opening it. Skip the txt compaction when storage.engine == bolt; log cleanup still runs. --- commands/gc.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/commands/gc.go b/commands/gc.go index a5d2996..f86d89c 100644 --- a/commands/gc.go +++ b/commands/gc.go @@ -207,9 +207,13 @@ func commandGC(c *cli.Context) error { defer CloseLogger() } - // Clean command files - if err := cleanCommandFiles(ctx); err != nil { - return err + // Clean command files. In bolt mode the daemon prunes synced commands from + // the DB after each sync, and the txt files are only fallback leftovers, so + // skip the txt compaction (post.txt may not even exist). + if cfg.Storage == nil || cfg.Storage.Engine != model.StorageEngineBolt { + if err := cleanCommandFiles(ctx); err != nil { + return err + } } // TODO: delete $HOME/.config/malamtime/ folder From 771e371a38b9cab5ebb4b765b4308cd4357b8379 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 10:26:39 +0000 Subject: [PATCH 6/9] fix(model): prune synced pre commands using post-side matching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prune matched pre rows via pre.FindClosestCommand(posts), but that helper only accepts candidates at or before the pre's time. A post always runs after its pre, so it was never matched and synced pre commands were never pruned — the bolt active bucket (and the pre.txt fallback) grew unbounded. Match from the post side instead: a pre is finished when a synced post (recording time <= cursor) with the same unique key ran at or after it. Extract this as model.preHasSyncedPost, shared by the bolt and file stores. Also: run boltStore.Prune in a single write transaction (collect posts and the keys to delete in one pass over the archived bucket) and add defensive nil checks on tx.Bucket plus a length guard in decodeKeyNano, per review feedback. Adds a regression test covering the realistic case where post.Time > pre.Time. --- model/store.go | 30 ++++++++++++++++++ model/store_bolt.go | 67 ++++++++++++++++++++++++++++------------ model/store_bolt_test.go | 24 ++++++++++++++ model/store_file.go | 5 ++- 4 files changed, 104 insertions(+), 22 deletions(-) diff --git a/model/store.go b/model/store.go index 7095d57..33ea0f4 100644 --- a/model/store.go +++ b/model/store.go @@ -78,3 +78,33 @@ func NewCommandStore(cfg ShellTimeConfig) (CommandStore, error) { return newFileStore(), nil } } + +// preHasSyncedPost reports whether posts contains a synced post command (recording +// time at or before cursor) that completes the given pre command: same unique key +// and running at or after the pre started. Prune uses it to drop finished, synced +// pre rows while keeping unfinished ones. +// +// Note the direction: a post always runs after its pre, so matching must be +// post.Time >= pre.Time. (Command.FindClosestCommand only accepts candidates at or +// before the receiver, so calling it on the pre would never match its later post.) +func preHasSyncedPost(pre *Command, posts []*Command, cursor time.Time) bool { + if pre == nil { + return false + } + key := pre.GetUniqueKey() + for _, p := range posts { + if p == nil { + continue + } + if p.RecordingTime.After(cursor) { + continue // post not yet synced; keep the pre for the next sync + } + if p.GetUniqueKey() != key { + continue + } + if !p.Time.Before(pre.Time) { + return true + } + } + return false +} diff --git a/model/store_bolt.go b/model/store_bolt.go index 3a71176..ecdbf64 100644 --- a/model/store_bolt.go +++ b/model/store_bolt.go @@ -65,6 +65,9 @@ func encodeKey(recordingTime time.Time, seq uint64) []byte { } func decodeKeyNano(key []byte) int64 { + if len(key) < 8 { + return 0 + } return int64(binary.BigEndian.Uint64(key[0:8])) } @@ -75,6 +78,9 @@ func (s *boltStore) put(bucket string, cmd Command, recordingTime time.Time) err } return s.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(bucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", bucket) + } seq, err := b.NextSequence() if err != nil { return err @@ -87,6 +93,9 @@ func (s *boltStore) all(bucket string) ([]*Command, error) { result := make([]*Command, 0) err := s.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(bucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", bucket) + } return b.ForEach(func(k, v []byte) error { cmd := new(Command) if err := json.Unmarshal(v, cmd); err != nil { @@ -136,7 +145,11 @@ func (s *boltStore) GetPostCommands(ctx context.Context) ([]*Command, error) { func (s *boltStore) GetLastCursor(ctx context.Context) (cursorTime time.Time, noCursorExist bool, err error) { err = s.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket([]byte(metaBucket)).Get([]byte(cursorKey)) + b := tx.Bucket([]byte(metaBucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", metaBucket) + } + v := b.Get([]byte(cursorKey)) if v == nil { noCursorExist = true cursorTime = time.Time{} @@ -152,24 +165,40 @@ func (s *boltStore) SetCursor(ctx context.Context, cursor time.Time) error { buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, uint64(cursor.UnixNano())) return s.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket([]byte(metaBucket)).Put([]byte(cursorKey), buf) + b := tx.Bucket([]byte(metaBucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", metaBucket) + } + return b.Put([]byte(cursorKey), buf) }) } // Prune deletes synced post commands (recording time <= cursor) and the pre -// commands that have a matching post. Unfinished pre commands are kept, matching -// the historical gc behavior. +// commands they complete, keeping unfinished pre commands. It runs in a single +// write transaction so the post set is consistent with the deletions. func (s *boltStore) Prune(ctx context.Context, cursor time.Time) error { - postCommands, err := s.GetPostCommands(ctx) - if err != nil { - return err - } cursorNano := cursor.UnixNano() return s.db.Update(func(tx *bolt.Tx) error { archived := tx.Bucket([]byte(archivedBucket)) + if archived == nil { + return fmt.Errorf("bucket %s not found", archivedBucket) + } + active := tx.Bucket([]byte(activeBucket)) + if active == nil { + return fmt.Errorf("bucket %s not found", activeBucket) + } + + // Single pass over archived: collect all post commands (for matching) and + // the synced keys to delete. + var postCommands []*Command var delArchived [][]byte if err := archived.ForEach(func(k, v []byte) error { + cmd := new(Command) + if err := json.Unmarshal(v, cmd); err == nil { + cmd.RecordingTime = time.Unix(0, decodeKeyNano(k)) + postCommands = append(postCommands, cmd) + } if decodeKeyNano(k) <= cursorNano { delArchived = append(delArchived, append([]byte(nil), k...)) } @@ -177,32 +206,32 @@ func (s *boltStore) Prune(ctx context.Context, cursor time.Time) error { }); err != nil { return err } - for _, k := range delArchived { - if err := archived.Delete(k); err != nil { - return err - } - } - active := tx.Bucket([]byte(activeBucket)) + // Drop pre rows at/before the cursor that a synced post completes. var delActive [][]byte if err := active.ForEach(func(k, v []byte) error { nano := decodeKeyNano(k) if nano > cursorNano { return nil // keep anything newer than the cursor } - cmd := new(Command) - if err := json.Unmarshal(v, cmd); err != nil { + pre := new(Command) + if err := json.Unmarshal(v, pre); err != nil { return nil } - cmd.RecordingTime = time.Unix(0, nano) - closest := cmd.FindClosestCommand(postCommands, true) - if closest != nil && !closest.IsNil() { + pre.RecordingTime = time.Unix(0, nano) + if preHasSyncedPost(pre, postCommands, cursor) { delActive = append(delActive, append([]byte(nil), k...)) } return nil }); err != nil { return err } + + for _, k := range delArchived { + if err := archived.Delete(k); err != nil { + return err + } + } for _, k := range delActive { if err := active.Delete(k); err != nil { return err diff --git a/model/store_bolt_test.go b/model/store_bolt_test.go index d6fc61b..2aaa79a 100644 --- a/model/store_bolt_test.go +++ b/model/store_bolt_test.go @@ -139,6 +139,30 @@ func (s *BoltStoreTestSuite) TestPrune() { s.Equal("later", post[0].Command) } +func (s *BoltStoreTestSuite) TestPruneRealisticTiming() { + // A normal command: post runs AFTER pre. Once synced (<= cursor) both must be + // pruned, otherwise the active bucket grows unbounded. + start := time.Now() + end := start.Add(2 * time.Second) + cmd := s.newCmd("npm test", start) + + s.Require().NoError(s.store.SavePre(s.ctx, cmd, start)) + post := cmd + post.Time = end + s.Require().NoError(s.store.SavePost(s.ctx, post, 0, end)) + + // cursor at/after the post recording time => both are synced + s.Require().NoError(s.store.Prune(s.ctx, end)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Empty(pre, "synced pre command must be pruned even though post.Time > pre.Time") + + postCmds, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Empty(postCmds) +} + func TestBoltStoreSuite(t *testing.T) { suite.Run(t, new(BoltStoreTestSuite)) } diff --git a/model/store_file.go b/model/store_file.go index 0c0bb3a..5e7bbab 100644 --- a/model/store_file.go +++ b/model/store_file.go @@ -120,9 +120,8 @@ func (s *fileStore) Prune(ctx context.Context, cursor time.Time) error { newPre = append(newPre, row) continue } - // keep pre commands that never got a matching post (unfinished) - closest := row.FindClosestCommand(postCommands, true) - if closest == nil || closest.IsNil() { + // keep pre commands that no synced post completes (unfinished) + if !preHasSyncedPost(row, postCommands, cursor) { newPre = append(newPre, row) } } From 351d37d2f613dfca794ef5b2a8f081d6afa6f8c9 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 10:39:54 +0000 Subject: [PATCH 7/9] test: cover bolt store, tracking assembly, and track socket path Raise patch coverage on the new code with high-value unit tests: - model.BuildTrackingData (assembly, exclude filtering, empty case) - fileStore round trip (save/read/cursor/prune) over a temp HOME - daemon handlePubSubTrackPost happy path (flush -> sync -> cursor -> prune) via an httptest server, plus the no-store error branch - a socket round trip exercising list_commands (RequestListCommands / handleListCommands) and SendTrackEvent routing --- daemon/handlers.track_test.go | 31 +++++++++++++++++ daemon/socket_test.go | 46 +++++++++++++++++++++++++ model/store_file_test.go | 59 ++++++++++++++++++++++++++++++++ model/tracking_build_test.go | 63 +++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+) create mode 100644 model/store_file_test.go create mode 100644 model/tracking_build_test.go diff --git a/daemon/handlers.track_test.go b/daemon/handlers.track_test.go index 020553b..e715284 100644 --- a/daemon/handlers.track_test.go +++ b/daemon/handlers.track_test.go @@ -2,6 +2,8 @@ package daemon import ( "context" + "net/http" + "net/http/httptest" "testing" "time" @@ -153,6 +155,35 @@ func (s *TrackHandlerTestSuite) TestTrackPostNotEnoughToFlush() { assert.Equal(s.T(), 0, store.pruneCalls) } +func (s *TrackHandlerTestSuite) TestTrackPostFlushSyncsAndPrunes() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + store := &fakeCommandStore{noCursorExist: true} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Token: "t", APIEndpoint: server.URL, FlushCount: 1}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + require.NoError(s.T(), store.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + + // flush threshold met (no cursor yet) => sent, cursor advanced, pruned + assert.Len(s.T(), store.post, 1) + assert.Equal(s.T(), 1, store.cursorSetCalls) + assert.Equal(s.T(), 1, store.pruneCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPostNoStore() { + commandStore = nil + err := handlePubSubTrackPost(context.Background(), TrackEventPayload{}) + assert.ErrorIs(s.T(), err, errNoCommandStore) +} + func TestTrackHandlerSuite(t *testing.T) { suite.Run(t, new(TrackHandlerTestSuite)) } diff --git a/daemon/socket_test.go b/daemon/socket_test.go index 18c9043..d29f8ae 100644 --- a/daemon/socket_test.go +++ b/daemon/socket_test.go @@ -1,6 +1,7 @@ package daemon import ( + "context" "encoding/json" "net" "os" @@ -138,6 +139,51 @@ func TestSocketHandler_StatusRequest(t *testing.T) { } } +func TestSocketHandler_ListCommandsAndTrackEvent(t *testing.T) { + tempDir, err := os.MkdirTemp("", "shelltime-socket-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + socketPath := filepath.Join(tempDir, "test.sock") + config := &model.ShellTimeConfig{SocketPath: socketPath} + ch := NewGoChannel(PubSubConfig{OutputChannelBuffer: 10}, nil) + handler := NewSocketHandler(config, ch) + + // Seed the daemon-owned store with a completed command. + prev := commandStore + defer func() { commandStore = prev }() + store := &fakeCommandStore{} + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Hostname: "h", Time: now} + _ = store.SavePre(context.Background(), cmd, now) + post := cmd + post.Time = now.Add(time.Second) + _ = store.SavePost(context.Background(), post, 0, post.Time) + commandStore = store + + if err := handler.Start(); err != nil { + t.Fatalf("Start failed: %v", err) + } + defer handler.Stop() + time.Sleep(50 * time.Millisecond) + + // list_commands request/response + resp, err := RequestListCommands(socketPath, 2*time.Second) + if err != nil { + t.Fatalf("RequestListCommands failed: %v", err) + } + if len(resp.Commands) != 1 || resp.Commands[0].Command != "ls" { + t.Fatalf("unexpected list response: %+v", resp.Commands) + } + + // track event is fire-and-forget; it should be accepted without error + if err := SendTrackEvent(context.Background(), socketPath, SocketMessageTypeTrackPre, cmd, now); err != nil { + t.Errorf("SendTrackEvent returned error: %v", err) + } +} + func TestSocketMessageType_Constants(t *testing.T) { testCases := []struct { msgType SocketMessageType diff --git a/model/store_file_test.go b/model/store_file_test.go new file mode 100644 index 0000000..47ddffb --- /dev/null +++ b/model/store_file_test.go @@ -0,0 +1,59 @@ +package model + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestFileStoreRoundTrip(t *testing.T) { + t.Setenv("HOME", t.TempDir()) + InitFolder("") // reset globals to the default .shelltime under the temp HOME + + store := NewFileStore() + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 7, Command: "go build", Username: "u", Hostname: "h", Time: start} + + require.NoError(t, store.SavePre(ctx, cmd, start)) + + _, noCursor, err := store.GetLastCursor(ctx) + require.NoError(t, err) + require.True(t, noCursor, "fresh store has no cursor") + + post := cmd + post.Time = start.Add(time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + tree, err := store.GetPreTree(ctx) + require.NoError(t, err) + require.Len(t, tree[cmd.GetUniqueKey()], 1) + + posts, err := store.GetPostCommands(ctx) + require.NoError(t, err) + require.Len(t, posts, 1) + require.Equal(t, CommandPhasePost, int(posts[0].Phase)) + + pres, err := store.GetPreCommands(ctx) + require.NoError(t, err) + require.Len(t, pres, 1) + + require.NoError(t, store.SetCursor(ctx, post.Time)) + c, noCursor, err := store.GetLastCursor(ctx) + require.NoError(t, err) + require.False(t, noCursor) + require.Equal(t, post.Time.UnixNano(), c.UnixNano()) + + // finished + synced => pruned from both files + require.NoError(t, store.Prune(ctx, post.Time)) + pres, err = store.GetPreCommands(ctx) + require.NoError(t, err) + require.Empty(t, pres) + posts, err = store.GetPostCommands(ctx) + require.NoError(t, err) + require.Empty(t, posts) + + require.NoError(t, store.Close()) +} diff --git a/model/tracking_build_test.go b/model/tracking_build_test.go new file mode 100644 index 0000000..3a04102 --- /dev/null +++ b/model/tracking_build_test.go @@ -0,0 +1,63 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBuildTrackingData(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "deploy prod", Username: "u", Hostname: "h", Time: start, PPID: 42} + require.NoError(t, store.SavePre(ctx, cmd, start)) + post := cmd + post.Time = start.Add(2 * time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + res, err := BuildTrackingData(ctx, store, ShellTimeConfig{}) + require.NoError(t, err) + require.True(t, res.NoCursorExist) + require.Len(t, res.Data, 1) + require.Equal(t, "deploy prod", res.Data[0].Command) + require.Equal(t, start.Unix(), res.Data[0].StartTime) + require.Equal(t, post.Time.Unix(), res.Data[0].EndTime) + require.Equal(t, 42, res.Data[0].PPID) + require.Equal(t, "h", res.Meta.Hostname) + require.Equal(t, "bash", res.Meta.Shell) +} + +func TestBuildTrackingDataExcludes(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + now := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "secret-token-cmd", Username: "u", Time: now} + require.NoError(t, store.SavePre(ctx, cmd, now)) + post := cmd + post.Time = now.Add(time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + res, err := BuildTrackingData(ctx, store, ShellTimeConfig{Exclude: []string{"^secret"}}) + require.NoError(t, err) + require.Empty(t, res.Data, "excluded command must not be synced") +} + +func TestBuildTrackingDataEmpty(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + res, err := BuildTrackingData(context.Background(), store, ShellTimeConfig{}) + require.NoError(t, err) + require.Empty(t, res.Data) +} From 62e05dbd2914c7004be67360d7feabe0f193acfe Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 11:26:07 +0000 Subject: [PATCH 8/9] test: cover ls file-mode path and track-handler parse errors - commands/ls.go file-store path (previously 0% covered) via a temp-HOME fixture exercising both json and table output - handlePubSubTrackPre/Post parse-error branches --- commands/ls_test.go | 42 +++++++++++++++++++++++++++++++++++ daemon/handlers.track_test.go | 13 +++++++++++ 2 files changed, 55 insertions(+) create mode 100644 commands/ls_test.go diff --git a/commands/ls_test.go b/commands/ls_test.go new file mode 100644 index 0000000..8b076dc --- /dev/null +++ b/commands/ls_test.go @@ -0,0 +1,42 @@ +package commands + +import ( + "context" + "os" + "testing" + "time" + + "github.com/malamtime/cli/model" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace/noop" +) + +// TestLsCommandFileMode exercises `shelltime ls` against the txt file store +// (no storage config => file mode, daemon not consulted). +func TestLsCommandFileMode(t *testing.T) { + otel.SetTracerProvider(noop.NewTracerProvider()) + SKIP_LOGGER_SETTINGS = true + + t.Setenv("HOME", t.TempDir()) + model.InitFolder("") + require.NoError(t, os.MkdirAll(os.ExpandEnv("$HOME/"+model.COMMAND_STORAGE_FOLDER), os.ModePerm)) + + store := model.NewFileStore() + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "make", Username: "u", Hostname: "h", Time: now} + require.NoError(t, store.SavePre(context.Background(), cmd, now)) + post := cmd + post.Time = now.Add(time.Second) + require.NoError(t, store.SavePost(context.Background(), post, 0, post.Time)) + + cs := model.NewMockConfigService(t) + cs.On("ReadConfigFile", mock.Anything).Return(model.ShellTimeConfig{}, nil) + configService = cs + + app := &cli.App{Name: "mtt", Commands: []*cli.Command{LsCommand}} + require.NoError(t, app.Run([]string{"mtt", "ls", "-f", "json"})) + require.NoError(t, app.Run([]string{"mtt", "ls", "-f", "table"})) +} diff --git a/daemon/handlers.track_test.go b/daemon/handlers.track_test.go index e715284..4ec5cea 100644 --- a/daemon/handlers.track_test.go +++ b/daemon/handlers.track_test.go @@ -184,6 +184,19 @@ func (s *TrackHandlerTestSuite) TestTrackPostNoStore() { assert.ErrorIs(s.T(), err, errNoCommandStore) } +func (s *TrackHandlerTestSuite) TestTrackPreParseError() { + commandStore = &fakeCommandStore{} + // a channel cannot be JSON-marshaled, so parseTrackEvent fails + err := handlePubSubTrackPre(context.Background(), make(chan int)) + assert.Error(s.T(), err) +} + +func (s *TrackHandlerTestSuite) TestTrackPostParseError() { + commandStore = &fakeCommandStore{} + err := handlePubSubTrackPost(context.Background(), make(chan int)) + assert.Error(s.T(), err) +} + func TestTrackHandlerSuite(t *testing.T) { suite.Run(t, new(TrackHandlerTestSuite)) } From a2372ccbd45054bd8b2c66c6ae92cc27400cd011 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 13:03:20 +0000 Subject: [PATCH 9/9] refactor(cli): keep track hook config-free; decide bolt in daemon The `track` command runs inside the shell hook on every command and is spawned as a fresh process each time, so the in-memory config cache never helps and reading config.toml/config.local.toml adds filesystem latency to every command. Stop reading config on the fast path: if a daemon is up on the default socket, forward the raw pre/post event and return. Config is only read in the daemon-less fallback (which may point at a custom socket or drive the direct HTTP sync). The daemon now owns the storage-engine decision and exclude filtering. It already reads config (cached, long-lived process), so the bolt-vs-txt choice and exclude rules live there. When the bolt engine is disabled the track handlers fall back to the txt file store (both satisfy CommandStore) instead of erroring, so default (file engine) users keep working through the daemon. --- commands/track.go | 62 ++++++++++++++++++---------- daemon/handlers.track.go | 67 +++++++++++++++++++----------- daemon/handlers.track_test.go | 76 +++++++++++++++++++++++++++++++---- 3 files changed, 152 insertions(+), 53 deletions(-) diff --git a/commands/track.go b/commands/track.go index bebb031..124a0ab 100644 --- a/commands/track.go +++ b/commands/track.go @@ -63,11 +63,6 @@ func commandTrack(c *cli.Context) error { SetupLogger(os.ExpandEnv("$HOME/" + model.COMMAND_BASE_STORAGE_FOLDER)) slog.Debug("track command args", slog.String("first", c.Args().First())) - config, err := configService.ReadConfigFile(ctx) - if err != nil { - slog.Error("failed to read config file", slog.Any("err", err)) - return err - } hostname, err := os.Hostname() if err != nil { @@ -95,31 +90,38 @@ func commandTrack(c *cli.Context) error { PPID: ppid, } + // Fast path: `track` runs inside the shell hook on every command, so it must + // stay cheap. A fresh `shelltime track` process is spawned per command, which + // means the in-memory config cache never helps and reading the config would add + // two TOML file reads to every command. Instead, if a daemon is listening on the + // default socket, hand it the raw event (fire-and-forget) and return. The daemon + // is a long-lived process: it reads config once (cached) and owns the + // storage-engine decision (bolt vs txt), exclude filtering, sync and pruning. + if daemon.IsSocketReady(ctx, model.DefaultSocketPath) { + return sendTrackEventToDaemon(ctx, span, model.DefaultSocketPath, cmdPhase, instance, result) + } + + // Slow path: no daemon on the default socket. We can now afford to read config — + // it may point at a custom socket path, and it carries the exclude rules and the + // settings the direct (daemon-less) sync needs. + config, err := configService.ReadConfigFile(ctx) + if err != nil { + slog.Error("failed to read config file", slog.Any("err", err)) + return err + } + // Check if command should be excluded if model.ShouldExcludeCommand(cmdCommand, config.Exclude) { slog.Debug("Command excluded by pattern", slog.String("command", cmdCommand)) return nil } - // When the bolt storage engine is enabled and the daemon is up, hand the raw - // command event to the daemon (fire-and-forget) and let it own persistence, - // sync, and pruning. This keeps the hook fast and avoids the CLI ever opening - // the daemon-locked bolt DB. Otherwise fall back to the txt file store. - useBolt := config.Storage != nil && config.Storage.Engine == model.StorageEngineBolt - if useBolt && daemon.IsSocketReady(ctx, config.SocketPath) { - now := time.Now() - switch cmdPhase { - case "pre": - span.SetAttributes(attribute.Int("phase", 0)) - return daemon.SendTrackEvent(ctx, config.SocketPath, daemon.SocketMessageTypeTrackPre, *instance, now) - case "post": - span.SetAttributes(attribute.Int("phase", 1)) - instance.Result = result - return daemon.SendTrackEvent(ctx, config.SocketPath, daemon.SocketMessageTypeTrackPost, *instance, now) - } - return nil + // A daemon may still be listening on a non-default socket path. + if config.SocketPath != model.DefaultSocketPath && daemon.IsSocketReady(ctx, config.SocketPath) { + return sendTrackEventToDaemon(ctx, span, config.SocketPath, cmdPhase, instance, result) } + // No daemon at all: persist to the local txt store and sync directly over HTTP. if cmdPhase == "pre" { span.SetAttributes(attribute.Int("phase", 0)) err = instance.DoSavePre() @@ -142,6 +144,22 @@ func commandTrack(c *cli.Context) error { return nil } +// sendTrackEventToDaemon forwards a single raw pre/post command event to the +// daemon. The daemon owns persistence, exclude filtering, sync and pruning. +func sendTrackEventToDaemon(ctx context.Context, span trace.Span, socketPath, cmdPhase string, instance *model.Command, result int) error { + now := time.Now() + switch cmdPhase { + case "pre": + span.SetAttributes(attribute.Int("phase", 0)) + return daemon.SendTrackEvent(ctx, socketPath, daemon.SocketMessageTypeTrackPre, *instance, now) + case "post": + span.SetAttributes(attribute.Int("phase", 1)) + instance.Result = result + return daemon.SendTrackEvent(ctx, socketPath, daemon.SocketMessageTypeTrackPost, *instance, now) + } + return nil +} + type syncOptions struct { isForceSync bool isDryRun bool diff --git a/daemon/handlers.track.go b/daemon/handlers.track.go index d2ffe16..9e77c31 100644 --- a/daemon/handlers.track.go +++ b/daemon/handlers.track.go @@ -3,17 +3,27 @@ package daemon import ( "context" "encoding/json" - "errors" "log/slog" "time" "github.com/malamtime/cli/model" ) -// errNoCommandStore is returned when a track event arrives but the bolt store -// was not initialized (bolt engine disabled). The CLI only emits track events -// when bolt is enabled, so this indicates a misconfiguration. -var errNoCommandStore = errors.New("command store not initialized; bolt storage engine is disabled") +// newFallbackStore builds the store used for track events when the bolt engine +// is disabled (commandStore is nil). It is a var so tests can substitute an +// in-memory store without touching the filesystem. +var newFallbackStore = model.NewFileStore + +// trackStore returns the active command store for the daemon. The CLI forwards +// every track event to the daemon and lets it decide where to persist: the +// daemon-owned bolt store when the bolt engine is enabled, otherwise the txt +// file store (both satisfy model.CommandStore). +func trackStore() model.CommandStore { + if commandStore != nil { + return commandStore + } + return newFallbackStore() +} func parseTrackEvent(payload interface{}) (model.Command, time.Time, error) { pb, err := json.Marshal(payload) @@ -28,43 +38,54 @@ func parseTrackEvent(payload interface{}) (model.Command, time.Time, error) { return ev.Command, recordingTime, nil } -// handlePubSubTrackPre persists a pre-execution command to the active bucket. +// handlePubSubTrackPre persists a pre-execution command to the active bucket, +// unless config marks the command as excluded. func handlePubSubTrackPre(ctx context.Context, payload interface{}) error { - if commandStore == nil { - return errNoCommandStore - } cmd, recordingTime, err := parseTrackEvent(payload) if err != nil { slog.Error("Failed to parse track_pre payload", slog.Any("err", err)) return err } - return commandStore.SavePre(ctx, cmd, recordingTime) + + cfg, err := stConfig.ReadConfigFile(ctx) + if err != nil { + slog.Error("Failed to read config in track_pre handler", slog.Any("err", err)) + return err + } + if model.ShouldExcludeCommand(cmd.Command, cfg.Exclude) { + slog.Debug("Command excluded by pattern", slog.String("command", cmd.Command)) + return nil + } + + return trackStore().SavePre(ctx, cmd, recordingTime) } // handlePubSubTrackPost persists a post-execution command, then runs the -// flush/sync/cursor/prune cycle against the bolt store — the daemon-side +// flush/sync/cursor/prune cycle against the active store — the daemon-side // equivalent of commands.trySyncLocalToServer. func handlePubSubTrackPost(ctx context.Context, payload interface{}) error { - if commandStore == nil { - return errNoCommandStore - } cmd, recordingTime, err := parseTrackEvent(payload) if err != nil { slog.Error("Failed to parse track_post payload", slog.Any("err", err)) return err } - if err := commandStore.SavePost(ctx, cmd, cmd.Result, recordingTime); err != nil { - return err - } - cfg, err := stConfig.ReadConfigFile(ctx) if err != nil { slog.Error("Failed to read config in track_post handler", slog.Any("err", err)) return err } + if model.ShouldExcludeCommand(cmd.Command, cfg.Exclude) { + slog.Debug("Command excluded by pattern", slog.String("command", cmd.Command)) + return nil + } + + store := trackStore() + if err := store.SavePost(ctx, cmd, cmd.Result, recordingTime); err != nil { + return err + } - result, err := model.BuildTrackingData(ctx, commandStore, cfg) + result, err := model.BuildTrackingData(ctx, store, cfg) if err != nil { return err } @@ -85,16 +106,16 @@ func handlePubSubTrackPost(ctx context.Context, payload interface{}) error { } if err := sendTrackArgsToServer(ctx, args); err != nil { - slog.Error("Failed to send tracking data from bolt store", slog.Any("err", err)) - // Leave the data in bolt; a later post will retry. Do not advance cursor. + slog.Error("Failed to send tracking data from command store", slog.Any("err", err)) + // Leave the data in the store; a later post will retry. Do not advance cursor. return err } - if err := commandStore.SetCursor(ctx, result.LatestRecordingTime); err != nil { + if err := store.SetCursor(ctx, result.LatestRecordingTime); err != nil { slog.Error("Failed to advance cursor", slog.Any("err", err)) return err } - if err := commandStore.Prune(ctx, result.LatestRecordingTime); err != nil { + if err := store.Prune(ctx, result.LatestRecordingTime); err != nil { slog.Warn("Failed to prune synced commands", slog.Any("err", err)) } return nil diff --git a/daemon/handlers.track_test.go b/daemon/handlers.track_test.go index 4ec5cea..83ff58a 100644 --- a/daemon/handlers.track_test.go +++ b/daemon/handlers.track_test.go @@ -85,18 +85,23 @@ func (f fakeConfigService) ReadConfigFile(ctx context.Context, opts ...model.Rea type TrackHandlerTestSuite struct { suite.Suite - prevStore model.CommandStore - prevConfig model.ConfigService + prevStore model.CommandStore + prevConfig model.ConfigService + prevFallback func() model.CommandStore } func (s *TrackHandlerTestSuite) SetupTest() { s.prevStore = commandStore s.prevConfig = stConfig + s.prevFallback = newFallbackStore + // pre/post handlers now read config (exclude rules); give every test a default. + stConfig = fakeConfigService{} } func (s *TrackHandlerTestSuite) TearDownTest() { commandStore = s.prevStore stConfig = s.prevConfig + newFallbackStore = s.prevFallback } func (s *TrackHandlerTestSuite) TestParseTrackEvent() { @@ -111,10 +116,33 @@ func (s *TrackHandlerTestSuite) TestParseTrackEvent() { assert.Equal(s.T(), now.UnixNano(), rt.UnixNano()) } -func (s *TrackHandlerTestSuite) TestTrackPreNoStore() { +func (s *TrackHandlerTestSuite) TestTrackPreFallsBackToFileStore() { + // bolt disabled (commandStore nil) => persist via the fallback store. commandStore = nil - err := handlePubSubTrackPre(context.Background(), TrackEventPayload{}) - assert.ErrorIs(s.T(), err, errNoCommandStore) + fallback := &fakeCommandStore{} + newFallbackStore = func() model.CommandStore { return fallback } + + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u"}, + RecordingTimeNano: now.UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPre(context.Background(), payload)) + require.Len(s.T(), fallback.pre, 1) + assert.Equal(s.T(), "ls", fallback.pre[0].Command) +} + +func (s *TrackHandlerTestSuite) TestTrackPreExcluded() { + store := &fakeCommandStore{} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Exclude: []string{"secret*"}}} + + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "secret-cmd", Username: "u"}, + RecordingTimeNano: time.Now().UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPre(context.Background(), payload)) + assert.Empty(s.T(), store.pre, "excluded command must not be persisted") } func (s *TrackHandlerTestSuite) TestTrackPrePersists() { @@ -178,10 +206,42 @@ func (s *TrackHandlerTestSuite) TestTrackPostFlushSyncsAndPrunes() { assert.Equal(s.T(), 1, store.pruneCalls) } -func (s *TrackHandlerTestSuite) TestTrackPostNoStore() { +func (s *TrackHandlerTestSuite) TestTrackPostFallsBackToFileStore() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + // bolt disabled (commandStore nil) => persist + sync via the fallback store. commandStore = nil - err := handlePubSubTrackPost(context.Background(), TrackEventPayload{}) - assert.ErrorIs(s.T(), err, errNoCommandStore) + fallback := &fakeCommandStore{noCursorExist: true} + newFallbackStore = func() model.CommandStore { return fallback } + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Token: "t", APIEndpoint: server.URL, FlushCount: 1}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + require.NoError(s.T(), fallback.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + + assert.Len(s.T(), fallback.post, 1) + assert.Equal(s.T(), 1, fallback.cursorSetCalls) + assert.Equal(s.T(), 1, fallback.pruneCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPostExcluded() { + store := &fakeCommandStore{} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Exclude: []string{"secret*"}}} + + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "secret-cmd", Username: "u"}, + RecordingTimeNano: time.Now().UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + assert.Empty(s.T(), store.post, "excluded command must not be persisted") + assert.Equal(s.T(), 0, store.cursorSetCalls) } func (s *TrackHandlerTestSuite) TestTrackPreParseError() {