diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index a9280dd..63429fd 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -82,6 +82,19 @@ func main() { model.InjectVar(version) cmdService := model.NewCommandService() + // When the bolt storage engine is enabled, the daemon owns the bolt-backed + // command store for its lifetime (bbolt holds an exclusive file lock). + if cfg.Storage != nil && cfg.Storage.Engine == model.StorageEngineBolt { + store, err := model.NewCommandStore(cfg) + if err != nil { + slog.Error("Failed to open bolt command store", slog.Any("err", err)) + } else { + daemon.InitCommandStore(store) + defer store.Close() + slog.Info("Bolt command store initialized") + } + } + pubsub := daemon.NewGoChannel(daemon.PubSubConfig{}, watermill.NewSlogLogger(slog.Default())) msg, err := pubsub.Subscribe(context.Background(), daemon.PubSubTopic) diff --git a/commands/gc.go b/commands/gc.go index a5d2996..f86d89c 100644 --- a/commands/gc.go +++ b/commands/gc.go @@ -207,9 +207,13 @@ func commandGC(c *cli.Context) error { defer CloseLogger() } - // Clean command files - if err := cleanCommandFiles(ctx); err != nil { - return err + // Clean command files. In bolt mode the daemon prunes synced commands from + // the DB after each sync, and the txt files are only fallback leftovers, so + // skip the txt compaction (post.txt may not even exist). + if cfg.Storage == nil || cfg.Storage.Engine != model.StorageEngineBolt { + if err := cleanCommandFiles(ctx); err != nil { + return err + } } // TODO: delete $HOME/.config/malamtime/ folder diff --git a/commands/ls.go b/commands/ls.go index b0d5779..668651b 100644 --- a/commands/ls.go +++ b/commands/ls.go @@ -4,12 +4,12 @@ package commands import ( "encoding/json" "fmt" - "log/slog" "os" "strconv" "time" "github.com/gookit/color" + "github.com/malamtime/cli/daemon" "github.com/malamtime/cli/model" "github.com/olekukonko/tablewriter" "github.com/urfave/cli/v2" @@ -54,66 +54,26 @@ func commandList(c *cli.Context) error { color.Yellow.Println("⚠️ Note: Local data will be cleaned periodically for performance and disk efficiency. To view all of your commands, please run 'shelltime web'") } - // Get post commands - postFileContent, _, err := model.GetPostCommands(ctx) + config, err := configService.ReadConfigFile(ctx) if err != nil { return err } - // Get pre commands tree for reference - preFileTree, err := model.GetPreCommandsTree(ctx) - if err != nil { - return err - } - - // Process commands - var commands []struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` - } - - for _, line := range postFileContent { - postCommand := new(model.Command) - _, err := postCommand.FromLineBytes(line) + // In bolt mode the daemon owns the (exclusively locked) DB, so query it over + // the socket. Otherwise read the txt file store directly. + var commands []model.ListedCommand + useBolt := config.Storage != nil && config.Storage.Engine == model.StorageEngineBolt + if useBolt && daemon.IsSocketReady(ctx, config.SocketPath) { + resp, err := daemon.RequestListCommands(config.SocketPath, 2*time.Second) if err != nil { - slog.Error("Failed to parse post command", slog.Any("err", err), slog.String("line", string(line))) - continue + return err } - - key := postCommand.GetUniqueKey() - preCommands, ok := preFileTree[key] - if !ok { - continue - } - - closestPreCommand := postCommand.FindClosestCommand(preCommands, false) - startTime := postCommand.Time - if closestPreCommand != nil { - startTime = closestPreCommand.Time + commands = resp.Commands + } else { + commands, err = model.BuildListedCommands(ctx, model.NewFileStore()) + if err != nil { + return err } - - commands = append(commands, struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` - }{ - Command: postCommand.Command, - Shell: postCommand.Shell, - StartTime: startTime, - EndTime: postCommand.Time, - Result: postCommand.Result, - Username: postCommand.Username, - Hostname: postCommand.Hostname, - }) } // Output based on format @@ -132,15 +92,7 @@ func outputJSON(commands interface{}) error { return nil } -func outputTable(commands []struct { - Command string `json:"command"` - Shell string `json:"shell"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Result int `json:"result"` - Username string `json:"username"` - Hostname string `json:"hostname"` -}) error { +func outputTable(commands []model.ListedCommand) error { w := tablewriter.NewWriter(os.Stdout) w.Header([]string{"COMMAND", "SHELL", "START TIME", "END TIME", "DURATION(ms)", "STATUS", "USER", "HOST"}) diff --git a/commands/ls_test.go b/commands/ls_test.go new file mode 100644 index 0000000..8b076dc --- /dev/null +++ b/commands/ls_test.go @@ -0,0 +1,42 @@ +package commands + +import ( + "context" + "os" + "testing" + "time" + + "github.com/malamtime/cli/model" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace/noop" +) + +// TestLsCommandFileMode exercises `shelltime ls` against the txt file store +// (no storage config => file mode, daemon not consulted). +func TestLsCommandFileMode(t *testing.T) { + otel.SetTracerProvider(noop.NewTracerProvider()) + SKIP_LOGGER_SETTINGS = true + + t.Setenv("HOME", t.TempDir()) + model.InitFolder("") + require.NoError(t, os.MkdirAll(os.ExpandEnv("$HOME/"+model.COMMAND_STORAGE_FOLDER), os.ModePerm)) + + store := model.NewFileStore() + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "make", Username: "u", Hostname: "h", Time: now} + require.NoError(t, store.SavePre(context.Background(), cmd, now)) + post := cmd + post.Time = now.Add(time.Second) + require.NoError(t, store.SavePost(context.Background(), post, 0, post.Time)) + + cs := model.NewMockConfigService(t) + cs.On("ReadConfigFile", mock.Anything).Return(model.ShellTimeConfig{}, nil) + configService = cs + + app := &cli.App{Name: "mtt", Commands: []*cli.Command{LsCommand}} + require.NoError(t, app.Run([]string{"mtt", "ls", "-f", "json"})) + require.NoError(t, app.Run([]string{"mtt", "ls", "-f", "table"})) +} diff --git a/commands/track.go b/commands/track.go index 80682db..124a0ab 100644 --- a/commands/track.go +++ b/commands/track.go @@ -2,7 +2,6 @@ package commands import ( "context" - "fmt" "log/slog" "os" "time" @@ -64,11 +63,6 @@ func commandTrack(c *cli.Context) error { SetupLogger(os.ExpandEnv("$HOME/" + model.COMMAND_BASE_STORAGE_FOLDER)) slog.Debug("track command args", slog.String("first", c.Args().First())) - config, err := configService.ReadConfigFile(ctx) - if err != nil { - slog.Error("failed to read config file", slog.Any("err", err)) - return err - } hostname, err := os.Hostname() if err != nil { @@ -96,12 +90,38 @@ func commandTrack(c *cli.Context) error { PPID: ppid, } + // Fast path: `track` runs inside the shell hook on every command, so it must + // stay cheap. A fresh `shelltime track` process is spawned per command, which + // means the in-memory config cache never helps and reading the config would add + // two TOML file reads to every command. Instead, if a daemon is listening on the + // default socket, hand it the raw event (fire-and-forget) and return. The daemon + // is a long-lived process: it reads config once (cached) and owns the + // storage-engine decision (bolt vs txt), exclude filtering, sync and pruning. + if daemon.IsSocketReady(ctx, model.DefaultSocketPath) { + return sendTrackEventToDaemon(ctx, span, model.DefaultSocketPath, cmdPhase, instance, result) + } + + // Slow path: no daemon on the default socket. We can now afford to read config — + // it may point at a custom socket path, and it carries the exclude rules and the + // settings the direct (daemon-less) sync needs. + config, err := configService.ReadConfigFile(ctx) + if err != nil { + slog.Error("failed to read config file", slog.Any("err", err)) + return err + } + // Check if command should be excluded if model.ShouldExcludeCommand(cmdCommand, config.Exclude) { slog.Debug("Command excluded by pattern", slog.String("command", cmdCommand)) return nil } + // A daemon may still be listening on a non-default socket path. + if config.SocketPath != model.DefaultSocketPath && daemon.IsSocketReady(ctx, config.SocketPath) { + return sendTrackEventToDaemon(ctx, span, config.SocketPath, cmdPhase, instance, result) + } + + // No daemon at all: persist to the local txt store and sync directly over HTTP. if cmdPhase == "pre" { span.SetAttributes(attribute.Int("phase", 0)) err = instance.DoSavePre() @@ -124,6 +144,22 @@ func commandTrack(c *cli.Context) error { return nil } +// sendTrackEventToDaemon forwards a single raw pre/post command event to the +// daemon. The daemon owns persistence, exclude filtering, sync and pruning. +func sendTrackEventToDaemon(ctx context.Context, span trace.Span, socketPath, cmdPhase string, instance *model.Command, result int) error { + now := time.Now() + switch cmdPhase { + case "pre": + span.SetAttributes(attribute.Int("phase", 0)) + return daemon.SendTrackEvent(ctx, socketPath, daemon.SocketMessageTypeTrackPre, *instance, now) + case "post": + span.SetAttributes(attribute.Int("phase", 1)) + instance.Result = result + return daemon.SendTrackEvent(ctx, socketPath, daemon.SocketMessageTypeTrackPost, *instance, now) + } + return nil +} + type syncOptions struct { isForceSync bool isDryRun bool @@ -136,109 +172,17 @@ func trySyncLocalToServer( ) error { isForceSync := options.isForceSync isDryRun := options.isDryRun - postFileContent, lineCount, err := model.GetPostCommands(ctx) - if err != nil { - return err - } - if len(postFileContent) == 0 || lineCount == 0 { - slog.Debug("Not enough records to sync", slog.Int("lineCount", lineCount)) - return nil - } + // The local sync path always uses the txt file store. It must never open the + // bolt DB, which the daemon owns exclusively. + store := model.NewFileStore() - cursor, noCursorExist, err := model.GetLastCursor(ctx) + result, err := model.BuildTrackingData(ctx, store, config) if err != nil { return err } - preFileTree, err := model.GetPreCommandsTree(ctx) - if err != nil { - return err - } - - sysInfo, err := model.GetOSAndVersion() - if err != nil { - slog.Warn("failed to get OS version", slog.Any("err", err)) - sysInfo = &model.SysInfo{ - Os: "unknown", - Version: "unknown", - } - } - - trackingData := make([]model.TrackingData, 0) - var latestRecordingTime time.Time = cursor - - meta := model.TrackingMetaData{ - Hostname: "", - Username: "", - OS: sysInfo.Os, - OSVersion: sysInfo.Version, - Shell: "", - } - - for _, line := range postFileContent { - postCommand := new(model.Command) - recordingTime, err := postCommand.FromLineBytes(line) - if err != nil { - slog.Error("Failed to parse post command", slog.Any("err", err), slog.String("line", string(line))) - continue - } - - if recordingTime.Before(cursor) { - continue - } - if recordingTime.After(latestRecordingTime) { - latestRecordingTime = recordingTime - } - - key := postCommand.GetUniqueKey() - preCommands, ok := preFileTree[key] - if !ok { - continue - } - - if meta.Hostname == "" { - meta.Hostname = postCommand.Hostname - } - if meta.Shell == "" { - meta.Shell = postCommand.Shell - } - if meta.Username == "" { - meta.Username = postCommand.Username - } - - // Check if command should be excluded during sync - if model.ShouldExcludeCommand(postCommand.Command, config.Exclude) { - slog.Debug("Command excluded during sync", slog.String("command", postCommand.Command)) - continue - } - - // here very sure the commandList are all elligable, so no need check here. - closestPreCommand := postCommand.FindClosestCommand(preCommands, false) - - td := model.TrackingData{ - SessionID: postCommand.SessionID, - Command: postCommand.Command, - EndTime: postCommand.Time.Unix(), - EndTimeNano: postCommand.Time.UnixNano(), - Result: postCommand.Result, - PPID: postCommand.PPID, - } - - // data masking - if config.DataMasking != nil && *config.DataMasking == true { - td.Command = model.MaskSensitiveTokens(td.Command) - } - - if closestPreCommand != nil { - td.StartTime = closestPreCommand.Time.Unix() - td.StartTimeNano = closestPreCommand.Time.UnixNano() - } - - trackingData = append(trackingData, td) - } - - if len(trackingData) == 0 { + if len(result.Data) == 0 { slog.Debug("no tracking data need to be sync") return nil } @@ -246,13 +190,13 @@ func trySyncLocalToServer( // no matter the flush count is, just force sync if !isForceSync { // allow first command to be sync with server - if len(trackingData) < config.FlushCount && !noCursorExist { - slog.Debug("not enough data need to flush, abort", slog.Int("current", len(trackingData))) + if len(result.Data) < config.FlushCount && !result.NoCursorExist { + slog.Debug("not enough data need to flush, abort", slog.Int("current", len(result.Data))) return nil } } - err = DoSyncData(ctx, config, latestRecordingTime, trackingData, meta) + err = DoSyncData(ctx, config, result.LatestRecordingTime, result.Data, result.Meta) if err != nil { slog.Error("Failed to send data to server", slog.Any("err", err)) return err @@ -261,8 +205,7 @@ func trySyncLocalToServer( if isDryRun { return nil } - // TODO: update cursor - return updateCursorToFile(ctx, latestRecordingTime) + return store.SetCursor(ctx, result.LatestRecordingTime) } func DoSyncData( @@ -289,22 +232,3 @@ func DoSyncData( // send to socket if the socket is ready return daemon.SendLocalDataToSocket(ctx, socketPath, config, cursor, trackingData, meta) } - -func updateCursorToFile(ctx context.Context, latestRecordingTime time.Time) error { - ctx, span := commandTracer.Start(ctx, "updateCurosr") - defer span.End() - cursorFilePath := model.GetCursorFilePath() - cursorFile, err := os.OpenFile(cursorFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - slog.Error("Failed to open cursor file for writing", slog.Any("err", err)) - return err - } - defer cursorFile.Close() - - _, err = cursorFile.WriteString(fmt.Sprintf("\n%d\n", latestRecordingTime.UnixNano())) - if err != nil { - slog.Error("Failed to write to cursor file", slog.Any("err", err)) - return err - } - return nil -} diff --git a/daemon/base.go b/daemon/base.go index 82787c1..6648217 100644 --- a/daemon/base.go +++ b/daemon/base.go @@ -10,6 +10,17 @@ var stConfig model.ConfigService var version string var startedAt time.Time +// commandStore is the daemon-owned bolt CommandStore, set when the bolt storage +// engine is enabled. It is nil when running with the default file engine. +var commandStore model.CommandStore + +// InitCommandStore registers the daemon-owned command store used by the bolt +// track handlers. The daemon owns the store for its lifetime (bbolt holds an +// exclusive file lock). +func InitCommandStore(store model.CommandStore) { + commandStore = store +} + const ( PubSubTopic = "socket" ) diff --git a/daemon/client.go b/daemon/client.go index 77a146f..2295b02 100644 --- a/daemon/client.go +++ b/daemon/client.go @@ -51,6 +51,39 @@ func SendLocalDataToSocket( return nil } +// SendTrackEvent sends a single raw command event (pre or post) to the daemon +// for persistence in its bolt store. Fire-and-forget, mirroring the latency +// profile of the txt-file append it replaces. +func SendTrackEvent( + ctx context.Context, + socketPath string, + msgType SocketMessageType, + cmd model.Command, + recordingTime time.Time, +) error { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return err + } + defer conn.Close() + + data := SocketMessage{ + Type: msgType, + Payload: TrackEventPayload{ + Command: cmd, + RecordingTimeNano: recordingTime.UnixNano(), + }, + } + + encoded, err := json.Marshal(data) + if err != nil { + return err + } + + _, err = conn.Write(encoded) + return err +} + // SendSessionProject sends a session-to-project mapping to the daemon (fire-and-forget) func SendSessionProject(socketPath string, sessionID, projectPath string) { conn, err := net.DialTimeout("unix", socketPath, 10*time.Millisecond) @@ -70,6 +103,29 @@ func SendSessionProject(socketPath string, sessionID, projectPath string) { json.NewEncoder(conn).Encode(msg) } +// RequestListCommands asks the daemon for the locally buffered commands (used +// by `shelltime ls` in bolt mode, since the CLI can't open the locked DB). +func RequestListCommands(socketPath string, timeout time.Duration) (*ListCommandsResponse, error) { + conn, err := net.DialTimeout("unix", socketPath, timeout) + if err != nil { + return nil, err + } + defer conn.Close() + + conn.SetDeadline(time.Now().Add(timeout)) + + msg := SocketMessage{Type: SocketMessageTypeListCommands} + if err := json.NewEncoder(conn).Encode(msg); err != nil { + return nil, err + } + + var response ListCommandsResponse + if err := json.NewDecoder(conn).Decode(&response); err != nil { + return nil, err + } + return &response, nil +} + // RequestCCInfo requests CC info (cost data and git info) from the daemon func RequestCCInfo(socketPath string, timeRange CCInfoTimeRange, workingDir string, timeout time.Duration) (*CCInfoResponse, error) { conn, err := net.DialTimeout("unix", socketPath, timeout) diff --git a/daemon/handlers.go b/daemon/handlers.go index 080f6fc..9a452e1 100644 --- a/daemon/handlers.go +++ b/daemon/handlers.go @@ -24,6 +24,10 @@ func SocketTopicProcessor(messages <-chan *message.Message) { switch socketMsg.Type { case SocketMessageTypeSync: err = handlePubSubSync(ctx, socketMsg.Payload) + case SocketMessageTypeTrackPre: + err = handlePubSubTrackPre(ctx, socketMsg.Payload) + case SocketMessageTypeTrackPost: + err = handlePubSubTrackPost(ctx, socketMsg.Payload) case SocketMessageTypeHeartbeat: err = handlePubSubHeartbeat(ctx, socketMsg.Payload) default: diff --git a/daemon/handlers.sync.go b/daemon/handlers.sync.go index b3e21cc..d0dbbdb 100644 --- a/daemon/handlers.sync.go +++ b/daemon/handlers.sync.go @@ -34,6 +34,13 @@ func handlePubSubSync(ctx context.Context, socketMsgPayload interface{}) error { return err } + return sendTrackArgsToServer(ctx, syncMsg) +} + +// sendTrackArgsToServer enriches a tracking payload (terminal resolution, daemon +// source, optional encryption) and sends it to the server, recording circuit +// breaker state. It is shared by the sync handler and the bolt track handler. +func sendTrackArgsToServer(ctx context.Context, syncMsg model.PostTrackArgs) error { // Resolve terminal from PPID (use first data item's PPID) if len(syncMsg.Data) > 0 && syncMsg.Data[0].PPID > 0 { terminal, multiplexer := ResolveTerminal(syncMsg.Data[0].PPID) diff --git a/daemon/handlers.track.go b/daemon/handlers.track.go new file mode 100644 index 0000000..9e77c31 --- /dev/null +++ b/daemon/handlers.track.go @@ -0,0 +1,122 @@ +package daemon + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "github.com/malamtime/cli/model" +) + +// newFallbackStore builds the store used for track events when the bolt engine +// is disabled (commandStore is nil). It is a var so tests can substitute an +// in-memory store without touching the filesystem. +var newFallbackStore = model.NewFileStore + +// trackStore returns the active command store for the daemon. The CLI forwards +// every track event to the daemon and lets it decide where to persist: the +// daemon-owned bolt store when the bolt engine is enabled, otherwise the txt +// file store (both satisfy model.CommandStore). +func trackStore() model.CommandStore { + if commandStore != nil { + return commandStore + } + return newFallbackStore() +} + +func parseTrackEvent(payload interface{}) (model.Command, time.Time, error) { + pb, err := json.Marshal(payload) + if err != nil { + return model.Command{}, time.Time{}, err + } + var ev TrackEventPayload + if err := json.Unmarshal(pb, &ev); err != nil { + return model.Command{}, time.Time{}, err + } + recordingTime := time.Unix(0, ev.RecordingTimeNano) + return ev.Command, recordingTime, nil +} + +// handlePubSubTrackPre persists a pre-execution command to the active bucket, +// unless config marks the command as excluded. +func handlePubSubTrackPre(ctx context.Context, payload interface{}) error { + cmd, recordingTime, err := parseTrackEvent(payload) + if err != nil { + slog.Error("Failed to parse track_pre payload", slog.Any("err", err)) + return err + } + + cfg, err := stConfig.ReadConfigFile(ctx) + if err != nil { + slog.Error("Failed to read config in track_pre handler", slog.Any("err", err)) + return err + } + if model.ShouldExcludeCommand(cmd.Command, cfg.Exclude) { + slog.Debug("Command excluded by pattern", slog.String("command", cmd.Command)) + return nil + } + + return trackStore().SavePre(ctx, cmd, recordingTime) +} + +// handlePubSubTrackPost persists a post-execution command, then runs the +// flush/sync/cursor/prune cycle against the active store — the daemon-side +// equivalent of commands.trySyncLocalToServer. +func handlePubSubTrackPost(ctx context.Context, payload interface{}) error { + cmd, recordingTime, err := parseTrackEvent(payload) + if err != nil { + slog.Error("Failed to parse track_post payload", slog.Any("err", err)) + return err + } + + cfg, err := stConfig.ReadConfigFile(ctx) + if err != nil { + slog.Error("Failed to read config in track_post handler", slog.Any("err", err)) + return err + } + if model.ShouldExcludeCommand(cmd.Command, cfg.Exclude) { + slog.Debug("Command excluded by pattern", slog.String("command", cmd.Command)) + return nil + } + + store := trackStore() + if err := store.SavePost(ctx, cmd, cmd.Result, recordingTime); err != nil { + return err + } + + result, err := model.BuildTrackingData(ctx, store, cfg) + if err != nil { + return err + } + if len(result.Data) == 0 { + return nil + } + + // Respect FlushCount, allowing the very first batch through (no cursor yet). + if !result.NoCursorExist && len(result.Data) < cfg.FlushCount { + slog.Debug("not enough data to flush", slog.Int("current", len(result.Data)), slog.Int("flushCount", cfg.FlushCount)) + return nil + } + + args := model.PostTrackArgs{ + CursorID: result.LatestRecordingTime.UnixNano(), + Data: result.Data, + Meta: result.Meta, + } + + if err := sendTrackArgsToServer(ctx, args); err != nil { + slog.Error("Failed to send tracking data from command store", slog.Any("err", err)) + // Leave the data in the store; a later post will retry. Do not advance cursor. + return err + } + + if err := store.SetCursor(ctx, result.LatestRecordingTime); err != nil { + slog.Error("Failed to advance cursor", slog.Any("err", err)) + return err + } + if err := store.Prune(ctx, result.LatestRecordingTime); err != nil { + slog.Warn("Failed to prune synced commands", slog.Any("err", err)) + } + return nil +} diff --git a/daemon/handlers.track_test.go b/daemon/handlers.track_test.go new file mode 100644 index 0000000..83ff58a --- /dev/null +++ b/daemon/handlers.track_test.go @@ -0,0 +1,262 @@ +package daemon + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/malamtime/cli/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// fakeCommandStore is an in-memory CommandStore for handler tests. +type fakeCommandStore struct { + pre []*model.Command + post []*model.Command + + cursor time.Time + noCursorExist bool + + cursorSetCalls int + pruneCalls int +} + +func (f *fakeCommandStore) SavePre(ctx context.Context, cmd model.Command, rt time.Time) error { + c := cmd + c.RecordingTime = rt + f.pre = append(f.pre, &c) + return nil +} + +func (f *fakeCommandStore) SavePost(ctx context.Context, cmd model.Command, result int, rt time.Time) error { + c := cmd + c.Result = result + c.RecordingTime = rt + f.post = append(f.post, &c) + return nil +} + +func (f *fakeCommandStore) GetPreTree(ctx context.Context) (map[string][]*model.Command, error) { + tree := make(map[string][]*model.Command) + for _, c := range f.pre { + k := c.GetUniqueKey() + tree[k] = append(tree[k], c) + } + return tree, nil +} + +func (f *fakeCommandStore) GetPreCommands(ctx context.Context) ([]*model.Command, error) { + return f.pre, nil +} + +func (f *fakeCommandStore) GetPostCommands(ctx context.Context) ([]*model.Command, error) { + return f.post, nil +} + +func (f *fakeCommandStore) GetLastCursor(ctx context.Context) (time.Time, bool, error) { + return f.cursor, f.noCursorExist, nil +} + +func (f *fakeCommandStore) SetCursor(ctx context.Context, cursor time.Time) error { + f.cursor = cursor + f.cursorSetCalls++ + return nil +} + +func (f *fakeCommandStore) Prune(ctx context.Context, cursor time.Time) error { + f.pruneCalls++ + return nil +} + +func (f *fakeCommandStore) Close() error { return nil } + +// fakeConfigService implements model.ConfigService without mockery. +type fakeConfigService struct { + cfg model.ShellTimeConfig +} + +func (f fakeConfigService) ReadConfigFile(ctx context.Context, opts ...model.ReadConfigOption) (model.ShellTimeConfig, error) { + return f.cfg, nil +} + +type TrackHandlerTestSuite struct { + suite.Suite + prevStore model.CommandStore + prevConfig model.ConfigService + prevFallback func() model.CommandStore +} + +func (s *TrackHandlerTestSuite) SetupTest() { + s.prevStore = commandStore + s.prevConfig = stConfig + s.prevFallback = newFallbackStore + // pre/post handlers now read config (exclude rules); give every test a default. + stConfig = fakeConfigService{} +} + +func (s *TrackHandlerTestSuite) TearDownTest() { + commandStore = s.prevStore + stConfig = s.prevConfig + newFallbackStore = s.prevFallback +} + +func (s *TrackHandlerTestSuite) TestParseTrackEvent() { + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", Command: "ls"}, + RecordingTimeNano: now.UnixNano(), + } + cmd, rt, err := parseTrackEvent(payload) + require.NoError(s.T(), err) + assert.Equal(s.T(), "ls", cmd.Command) + assert.Equal(s.T(), now.UnixNano(), rt.UnixNano()) +} + +func (s *TrackHandlerTestSuite) TestTrackPreFallsBackToFileStore() { + // bolt disabled (commandStore nil) => persist via the fallback store. + commandStore = nil + fallback := &fakeCommandStore{} + newFallbackStore = func() model.CommandStore { return fallback } + + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u"}, + RecordingTimeNano: now.UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPre(context.Background(), payload)) + require.Len(s.T(), fallback.pre, 1) + assert.Equal(s.T(), "ls", fallback.pre[0].Command) +} + +func (s *TrackHandlerTestSuite) TestTrackPreExcluded() { + store := &fakeCommandStore{} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Exclude: []string{"secret*"}}} + + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "secret-cmd", Username: "u"}, + RecordingTimeNano: time.Now().UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPre(context.Background(), payload)) + assert.Empty(s.T(), store.pre, "excluded command must not be persisted") +} + +func (s *TrackHandlerTestSuite) TestTrackPrePersists() { + store := &fakeCommandStore{} + commandStore = store + now := time.Now() + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u"}, + RecordingTimeNano: now.UnixNano(), + } + err := handlePubSubTrackPre(context.Background(), payload) + require.NoError(s.T(), err) + require.Len(s.T(), store.pre, 1) + assert.Equal(s.T(), "ls", store.pre[0].Command) +} + +func (s *TrackHandlerTestSuite) TestTrackPostNotEnoughToFlush() { + store := &fakeCommandStore{ + // cursor exists, so FlushCount gating applies + noCursorExist: false, + cursor: time.Now().Add(-time.Hour), + } + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{FlushCount: 10}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + // seed a matching pre so BuildTrackingData yields one data row + require.NoError(s.T(), store.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + err := handlePubSubTrackPost(context.Background(), payload) + require.NoError(s.T(), err) + + // one data row < FlushCount(10): must not send / advance cursor / prune + assert.Len(s.T(), store.post, 1, "post should still be persisted") + assert.Equal(s.T(), 0, store.cursorSetCalls, "cursor must not advance below flush threshold") + assert.Equal(s.T(), 0, store.pruneCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPostFlushSyncsAndPrunes() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + store := &fakeCommandStore{noCursorExist: true} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Token: "t", APIEndpoint: server.URL, FlushCount: 1}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + require.NoError(s.T(), store.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + + // flush threshold met (no cursor yet) => sent, cursor advanced, pruned + assert.Len(s.T(), store.post, 1) + assert.Equal(s.T(), 1, store.cursorSetCalls) + assert.Equal(s.T(), 1, store.pruneCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPostFallsBackToFileStore() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + // bolt disabled (commandStore nil) => persist + sync via the fallback store. + commandStore = nil + fallback := &fakeCommandStore{noCursorExist: true} + newFallbackStore = func() model.CommandStore { return fallback } + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Token: "t", APIEndpoint: server.URL, FlushCount: 1}} + + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Time: now} + require.NoError(s.T(), fallback.SavePre(context.Background(), cmd, now)) + + payload := TrackEventPayload{Command: cmd, RecordingTimeNano: now.UnixNano()} + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + + assert.Len(s.T(), fallback.post, 1) + assert.Equal(s.T(), 1, fallback.cursorSetCalls) + assert.Equal(s.T(), 1, fallback.pruneCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPostExcluded() { + store := &fakeCommandStore{} + commandStore = store + stConfig = fakeConfigService{cfg: model.ShellTimeConfig{Exclude: []string{"secret*"}}} + + payload := TrackEventPayload{ + Command: model.Command{Shell: "bash", SessionID: 1, Command: "secret-cmd", Username: "u"}, + RecordingTimeNano: time.Now().UnixNano(), + } + require.NoError(s.T(), handlePubSubTrackPost(context.Background(), payload)) + assert.Empty(s.T(), store.post, "excluded command must not be persisted") + assert.Equal(s.T(), 0, store.cursorSetCalls) +} + +func (s *TrackHandlerTestSuite) TestTrackPreParseError() { + commandStore = &fakeCommandStore{} + // a channel cannot be JSON-marshaled, so parseTrackEvent fails + err := handlePubSubTrackPre(context.Background(), make(chan int)) + assert.Error(s.T(), err) +} + +func (s *TrackHandlerTestSuite) TestTrackPostParseError() { + commandStore = &fakeCommandStore{} + err := handlePubSubTrackPost(context.Background(), make(chan int)) + assert.Error(s.T(), err) +} + +func TestTrackHandlerSuite(t *testing.T) { + suite.Run(t, new(TrackHandlerTestSuite)) +} diff --git a/daemon/socket.go b/daemon/socket.go index 132ddb7..e66406e 100644 --- a/daemon/socket.go +++ b/daemon/socket.go @@ -23,8 +23,29 @@ const ( SocketMessageTypeStatus SocketMessageType = "status" SocketMessageTypeCCInfo SocketMessageType = "cc_info" SocketMessageTypeSessionProject SocketMessageType = "session_project" + // SocketMessageTypeTrackPre / TrackPost carry a single raw command event the + // daemon persists to its bolt-backed CommandStore (used when the bolt storage + // engine is enabled). + SocketMessageTypeTrackPre SocketMessageType = "track_pre" + SocketMessageTypeTrackPost SocketMessageType = "track_post" + // SocketMessageTypeListCommands requests the locally buffered commands from + // the daemon (request/response), used by `shelltime ls` when the bolt store + // is enabled and the CLI cannot open the daemon-locked DB. + SocketMessageTypeListCommands SocketMessageType = "list_commands" ) +// ListCommandsResponse is the daemon's reply to a list_commands request. +type ListCommandsResponse struct { + Commands []model.ListedCommand `json:"commands"` +} + +// TrackEventPayload is the payload for track_pre / track_post messages: a single +// command plus the recording time stamped by the CLI. +type TrackEventPayload struct { + Command model.Command `json:"command"` + RecordingTimeNano int64 `json:"recordingTimeNano"` +} + type SessionProjectRequest struct { SessionID string `json:"sessionId"` ProjectPath string `json:"projectPath"` @@ -164,6 +185,16 @@ func (p *SocketHandler) handleConnection(conn net.Conn) { if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { slog.Error("Error to publish topic", slog.Any("err", err)) } + case SocketMessageTypeTrackPre, SocketMessageTypeTrackPost: + buf, err := json.Marshal(msg) + if err != nil { + slog.Error("Error encoding track message", slog.Any("err", err)) + return + } + chMsg := message.NewMessage(watermill.NewUUID(), buf) + if err := p.channel.Publish(PubSubTopic, chMsg); err != nil { + slog.Error("Error publishing track topic", slog.Any("err", err)) + } case SocketMessageTypeHeartbeat: // Only process heartbeat if codeTracking is enabled if p.config.CodeTracking == nil || p.config.CodeTracking.Enabled == nil || !*p.config.CodeTracking.Enabled { @@ -186,6 +217,8 @@ func (p *SocketHandler) handleConnection(conn net.Conn) { // Send acknowledgment to client encoder := json.NewEncoder(conn) encoder.Encode(map[string]string{"status": "ok"}) + case SocketMessageTypeListCommands: + p.handleListCommands(conn) case SocketMessageTypeCCInfo: p.handleCCInfo(conn, msg) case SocketMessageTypeSessionProject: @@ -218,6 +251,23 @@ func (p *SocketHandler) handleStatus(conn net.Conn) { } } +func (p *SocketHandler) handleListCommands(conn net.Conn) { + response := ListCommandsResponse{Commands: []model.ListedCommand{}} + if commandStore != nil { + commands, err := model.BuildListedCommands(context.Background(), commandStore) + if err != nil { + slog.Error("Failed to build listed commands", slog.Any("err", err)) + } else { + response.Commands = commands + } + } + + encoder := json.NewEncoder(conn) + if err := encoder.Encode(response); err != nil { + slog.Error("Error encoding list_commands response", slog.Any("err", err)) + } +} + func (p *SocketHandler) handleCCInfo(conn net.Conn, msg SocketMessage) { slog.Debug("cc_info socket event received") diff --git a/daemon/socket_test.go b/daemon/socket_test.go index 18c9043..d29f8ae 100644 --- a/daemon/socket_test.go +++ b/daemon/socket_test.go @@ -1,6 +1,7 @@ package daemon import ( + "context" "encoding/json" "net" "os" @@ -138,6 +139,51 @@ func TestSocketHandler_StatusRequest(t *testing.T) { } } +func TestSocketHandler_ListCommandsAndTrackEvent(t *testing.T) { + tempDir, err := os.MkdirTemp("", "shelltime-socket-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + socketPath := filepath.Join(tempDir, "test.sock") + config := &model.ShellTimeConfig{SocketPath: socketPath} + ch := NewGoChannel(PubSubConfig{OutputChannelBuffer: 10}, nil) + handler := NewSocketHandler(config, ch) + + // Seed the daemon-owned store with a completed command. + prev := commandStore + defer func() { commandStore = prev }() + store := &fakeCommandStore{} + now := time.Now() + cmd := model.Command{Shell: "bash", SessionID: 1, Command: "ls", Username: "u", Hostname: "h", Time: now} + _ = store.SavePre(context.Background(), cmd, now) + post := cmd + post.Time = now.Add(time.Second) + _ = store.SavePost(context.Background(), post, 0, post.Time) + commandStore = store + + if err := handler.Start(); err != nil { + t.Fatalf("Start failed: %v", err) + } + defer handler.Stop() + time.Sleep(50 * time.Millisecond) + + // list_commands request/response + resp, err := RequestListCommands(socketPath, 2*time.Second) + if err != nil { + t.Fatalf("RequestListCommands failed: %v", err) + } + if len(resp.Commands) != 1 || resp.Commands[0].Command != "ls" { + t.Fatalf("unexpected list response: %+v", resp.Commands) + } + + // track event is fire-and-forget; it should be accepted without error + if err := SendTrackEvent(context.Background(), socketPath, SocketMessageTypeTrackPre, cmd, now); err != nil { + t.Errorf("SendTrackEvent returned error: %v", err) + } +} + func TestSocketMessageType_Constants(t *testing.T) { testCases := []struct { msgType SocketMessageType diff --git a/go.mod b/go.mod index 62f4977..7ac0a90 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/uptrace/uptrace-go v1.39.0 github.com/urfave/cli/v2 v2.27.7 + go.etcd.io/bbolt v1.4.3 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 go.opentelemetry.io/otel v1.39.0 go.opentelemetry.io/otel/trace v1.39.0 diff --git a/go.sum b/go.sum index d48dac6..fb86f4e 100644 --- a/go.sum +++ b/go.sum @@ -181,6 +181,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJu github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGNANqpVFCndZvcuyKbl0g+UAVcbBcqGkG28H0Y= @@ -234,6 +236,8 @@ golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/model/list.go b/model/list.go new file mode 100644 index 0000000..cafa1b1 --- /dev/null +++ b/model/list.go @@ -0,0 +1,61 @@ +package model + +import ( + "context" + "time" +) + +// ListedCommand is a paired pre/post command for display by `shelltime ls`. +// JSON tags match the historical `ls -f json` output. +type ListedCommand struct { + Command string `json:"command"` + Shell string `json:"shell"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Result int `json:"result"` + Username string `json:"username"` + Hostname string `json:"hostname"` +} + +// BuildListedCommands pairs each post command with its closest pre command and +// returns the rows for display. Shared by the CLI (file store) and the daemon +// (bolt store, queried over the socket) so both produce identical output. +func BuildListedCommands(ctx context.Context, store CommandStore) ([]ListedCommand, error) { + postCommands, err := store.GetPostCommands(ctx) + if err != nil { + return nil, err + } + preTree, err := store.GetPreTree(ctx) + if err != nil { + return nil, err + } + + commands := make([]ListedCommand, 0, len(postCommands)) + for _, postCommand := range postCommands { + if postCommand == nil { + continue + } + key := postCommand.GetUniqueKey() + preCommands, ok := preTree[key] + if !ok { + continue + } + + closestPreCommand := postCommand.FindClosestCommand(preCommands, false) + startTime := postCommand.Time + if closestPreCommand != nil { + startTime = closestPreCommand.Time + } + + commands = append(commands, ListedCommand{ + Command: postCommand.Command, + Shell: postCommand.Shell, + StartTime: startTime, + EndTime: postCommand.Time, + Result: postCommand.Result, + Username: postCommand.Username, + Hostname: postCommand.Hostname, + }) + } + return commands, nil +} diff --git a/model/list_test.go b/model/list_test.go new file mode 100644 index 0000000..a0e2ea0 --- /dev/null +++ b/model/list_test.go @@ -0,0 +1,38 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBuildListedCommands(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "make build", Username: "u", Hostname: "h", Time: start} + + // pre then post for the same command -> one paired row + require.NoError(t, store.SavePre(ctx, cmd, start)) + post := cmd + post.Time = start.Add(3 * time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + // a post with no matching pre -> skipped + orphan := Command{Shell: "zsh", SessionID: 2, Command: "orphan", Username: "u", Time: start} + require.NoError(t, store.SavePost(ctx, orphan, 1, start)) + + listed, err := BuildListedCommands(ctx, store) + require.NoError(t, err) + require.Len(t, listed, 1) + require.Equal(t, "make build", listed[0].Command) + require.Equal(t, "bash", listed[0].Shell) + require.Equal(t, start.Unix(), listed[0].StartTime.Unix()) + require.Equal(t, post.Time.Unix(), listed[0].EndTime.Unix()) +} diff --git a/model/path.go b/model/path.go index 0d206c5..639bb84 100644 --- a/model/path.go +++ b/model/path.go @@ -79,6 +79,11 @@ func GetCursorFilePath() string { return GetStoragePath("commands", "cursor.txt") } +// GetBoltDBPath returns the path to the bbolt command database (daemon-owned). +func GetBoltDBPath() string { + return GetStoragePath("commands", "commands.db") +} + // GetHeartbeatLogFilePath returns the path to the heartbeat log file func GetHeartbeatLogFilePath() string { return GetStoragePath("coding-heartbeat.data.log") diff --git a/model/store.go b/model/store.go new file mode 100644 index 0000000..33ea0f4 --- /dev/null +++ b/model/store.go @@ -0,0 +1,110 @@ +package model + +import ( + "context" + "time" +) + +// CommandStore abstracts persistence of tracked commands so the buffering +// backend can be swapped without touching call sites. +// +// Two implementations exist: +// - fileStore: the historical append-only txt files (pre.txt/post.txt/cursor.txt). +// Always available; used as the fallback whenever bolt is disabled or no daemon +// is running. +// - boltStore: a bbolt embedded KV store. bbolt holds an exclusive OS file lock, +// so it is meant to be owned by the single long-lived daemon process. +// +// Pre commands live in the "active" bucket, post commands in "archived"; the +// sync cursor lives in a "meta" bucket. These names mirror the activeBucket / +// archivedBucket constants in command.go. +type CommandStore interface { + // SavePre persists a pre-execution command record. + SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error + // SavePost persists a post-execution command record with its exit code. + SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error + + // GetPreTree returns pre commands indexed by their unique key, used to pair + // post commands with the originating pre command during sync. + GetPreTree(ctx context.Context) (map[string][]*Command, error) + // GetPreCommands returns all pre commands (used by compaction). + GetPreCommands(ctx context.Context) ([]*Command, error) + // GetPostCommands returns all post commands with RecordingTime populated. + GetPostCommands(ctx context.Context) ([]*Command, error) + + // GetLastCursor returns the last synced recording time. noCursorExist is true + // when no cursor has ever been written (first sync). + GetLastCursor(ctx context.Context) (cursorTime time.Time, noCursorExist bool, err error) + // SetCursor advances the sync cursor. + SetCursor(ctx context.Context, cursor time.Time) error + + // Prune removes records that have already been synced (recording time at or + // before the cursor), keeping unfinished pre commands. + Prune(ctx context.Context, cursor time.Time) error + + // Close releases any resources held by the store (no-op for fileStore). + Close() error +} + +// StorageEngineFile is the default, always-available txt-file backend. +const StorageEngineFile = "file" + +// StorageEngineBolt selects the bbolt backend (daemon-owned). +const StorageEngineBolt = "bolt" + +// NewFileStore returns the txt-file backed store. The CLI uses this for the +// fallback path (bolt disabled or no daemon) and must never open the bolt DB +// directly, since the daemon holds its exclusive lock. +func NewFileStore() CommandStore { + return newFileStore() +} + +// NewCommandStore builds the store selected by config. It falls back to the +// file store for any unknown / empty engine so misconfiguration never loses data. +// +// The bolt store opens (and locks) the DB file; callers must Close it. Because +// bbolt takes an exclusive lock, only one process (the daemon) should construct +// a bolt store at a time. +func NewCommandStore(cfg ShellTimeConfig) (CommandStore, error) { + engine := StorageEngineFile + if cfg.Storage != nil && cfg.Storage.Engine != "" { + engine = cfg.Storage.Engine + } + + switch engine { + case StorageEngineBolt: + return newBoltStore(GetBoltDBPath()) + default: + return newFileStore(), nil + } +} + +// preHasSyncedPost reports whether posts contains a synced post command (recording +// time at or before cursor) that completes the given pre command: same unique key +// and running at or after the pre started. Prune uses it to drop finished, synced +// pre rows while keeping unfinished ones. +// +// Note the direction: a post always runs after its pre, so matching must be +// post.Time >= pre.Time. (Command.FindClosestCommand only accepts candidates at or +// before the receiver, so calling it on the pre would never match its later post.) +func preHasSyncedPost(pre *Command, posts []*Command, cursor time.Time) bool { + if pre == nil { + return false + } + key := pre.GetUniqueKey() + for _, p := range posts { + if p == nil { + continue + } + if p.RecordingTime.After(cursor) { + continue // post not yet synced; keep the pre for the next sync + } + if p.GetUniqueKey() != key { + continue + } + if !p.Time.Before(pre.Time) { + return true + } + } + return false +} diff --git a/model/store_bolt.go b/model/store_bolt.go new file mode 100644 index 0000000..ecdbf64 --- /dev/null +++ b/model/store_bolt.go @@ -0,0 +1,249 @@ +package model + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "time" + + bolt "go.etcd.io/bbolt" +) + +const ( + // metaBucket holds singleton values such as the sync cursor. + metaBucket = "meta" + // cursorKey is the key under metaBucket storing the last synced recording time. + cursorKey = "cursor" + // boltOpenTimeout bounds how long we wait for the exclusive file lock before + // giving up, so a stale lock can't hang the daemon forever. + boltOpenTimeout = 5 * time.Second +) + +// boltStore persists commands in a bbolt database. It holds an exclusive OS +// file lock for its lifetime, so only the daemon should own one. +type boltStore struct { + db *bolt.DB +} + +func newBoltStore(path string) (*boltStore, error) { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, fmt.Errorf("failed to create bolt db folder: %w", err) + } + + db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: boltOpenTimeout}) + if err != nil { + return nil, fmt.Errorf("failed to open bolt db %s: %w", path, err) + } + + err = db.Update(func(tx *bolt.Tx) error { + for _, name := range []string{activeBucket, archivedBucket, metaBucket} { + if _, err := tx.CreateBucketIfNotExists([]byte(name)); err != nil { + return err + } + } + return nil + }) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to init bolt buckets: %w", err) + } + + return &boltStore{db: db}, nil +} + +// encodeKey produces a time-ordered, collision-free key: +// 8-byte big-endian UnixNano of the recording time + 8-byte sequence. +func encodeKey(recordingTime time.Time, seq uint64) []byte { + key := make([]byte, 16) + binary.BigEndian.PutUint64(key[0:8], uint64(recordingTime.UnixNano())) + binary.BigEndian.PutUint64(key[8:16], seq) + return key +} + +func decodeKeyNano(key []byte) int64 { + if len(key) < 8 { + return 0 + } + return int64(binary.BigEndian.Uint64(key[0:8])) +} + +func (s *boltStore) put(bucket string, cmd Command, recordingTime time.Time) error { + val, err := json.Marshal(cmd) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", bucket) + } + seq, err := b.NextSequence() + if err != nil { + return err + } + return b.Put(encodeKey(recordingTime, seq), val) + }) +} + +func (s *boltStore) all(bucket string) ([]*Command, error) { + result := make([]*Command, 0) + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", bucket) + } + return b.ForEach(func(k, v []byte) error { + cmd := new(Command) + if err := json.Unmarshal(v, cmd); err != nil { + slog.Warn("failed to unmarshal command from bolt", slog.Any("err", err)) + return nil + } + cmd.RecordingTime = time.Unix(0, decodeKeyNano(k)) + result = append(result, cmd) + return nil + }) + }) + return result, err +} + +func (s *boltStore) SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error { + cmd.Phase = CommandPhasePre + return s.put(activeBucket, cmd, recordingTime) +} + +func (s *boltStore) SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error { + cmd.Phase = CommandPhasePost + cmd.Result = result + cmd.EndTime = time.Now() + return s.put(archivedBucket, cmd, recordingTime) +} + +func (s *boltStore) GetPreTree(ctx context.Context) (map[string][]*Command, error) { + cmds, err := s.all(activeBucket) + if err != nil { + return nil, err + } + tree := make(map[string][]*Command) + for _, cmd := range cmds { + key := cmd.GetUniqueKey() + tree[key] = append(tree[key], cmd) + } + return tree, nil +} + +func (s *boltStore) GetPreCommands(ctx context.Context) ([]*Command, error) { + return s.all(activeBucket) +} + +func (s *boltStore) GetPostCommands(ctx context.Context) ([]*Command, error) { + return s.all(archivedBucket) +} + +func (s *boltStore) GetLastCursor(ctx context.Context) (cursorTime time.Time, noCursorExist bool, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(metaBucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", metaBucket) + } + v := b.Get([]byte(cursorKey)) + if v == nil { + noCursorExist = true + cursorTime = time.Time{} + return nil + } + cursorTime = time.Unix(0, int64(binary.BigEndian.Uint64(v))) + return nil + }) + return +} + +func (s *boltStore) SetCursor(ctx context.Context, cursor time.Time) error { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(cursor.UnixNano())) + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(metaBucket)) + if b == nil { + return fmt.Errorf("bucket %s not found", metaBucket) + } + return b.Put([]byte(cursorKey), buf) + }) +} + +// Prune deletes synced post commands (recording time <= cursor) and the pre +// commands they complete, keeping unfinished pre commands. It runs in a single +// write transaction so the post set is consistent with the deletions. +func (s *boltStore) Prune(ctx context.Context, cursor time.Time) error { + cursorNano := cursor.UnixNano() + + return s.db.Update(func(tx *bolt.Tx) error { + archived := tx.Bucket([]byte(archivedBucket)) + if archived == nil { + return fmt.Errorf("bucket %s not found", archivedBucket) + } + active := tx.Bucket([]byte(activeBucket)) + if active == nil { + return fmt.Errorf("bucket %s not found", activeBucket) + } + + // Single pass over archived: collect all post commands (for matching) and + // the synced keys to delete. + var postCommands []*Command + var delArchived [][]byte + if err := archived.ForEach(func(k, v []byte) error { + cmd := new(Command) + if err := json.Unmarshal(v, cmd); err == nil { + cmd.RecordingTime = time.Unix(0, decodeKeyNano(k)) + postCommands = append(postCommands, cmd) + } + if decodeKeyNano(k) <= cursorNano { + delArchived = append(delArchived, append([]byte(nil), k...)) + } + return nil + }); err != nil { + return err + } + + // Drop pre rows at/before the cursor that a synced post completes. + var delActive [][]byte + if err := active.ForEach(func(k, v []byte) error { + nano := decodeKeyNano(k) + if nano > cursorNano { + return nil // keep anything newer than the cursor + } + pre := new(Command) + if err := json.Unmarshal(v, pre); err != nil { + return nil + } + pre.RecordingTime = time.Unix(0, nano) + if preHasSyncedPost(pre, postCommands, cursor) { + delActive = append(delActive, append([]byte(nil), k...)) + } + return nil + }); err != nil { + return err + } + + for _, k := range delArchived { + if err := archived.Delete(k); err != nil { + return err + } + } + for _, k := range delActive { + if err := active.Delete(k); err != nil { + return err + } + } + return nil + }) +} + +func (s *boltStore) Close() error { + if s.db == nil { + return nil + } + return s.db.Close() +} diff --git a/model/store_bolt_test.go b/model/store_bolt_test.go new file mode 100644 index 0000000..2aaa79a --- /dev/null +++ b/model/store_bolt_test.go @@ -0,0 +1,180 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type BoltStoreTestSuite struct { + suite.Suite + store *boltStore + ctx context.Context +} + +func (s *BoltStoreTestSuite) SetupTest() { + dbPath := filepath.Join(s.T().TempDir(), "commands.db") + st, err := newBoltStore(dbPath) + s.Require().NoError(err) + s.store = st + s.ctx = context.Background() +} + +func (s *BoltStoreTestSuite) TearDownTest() { + s.Require().NoError(s.store.Close()) +} + +func (s *BoltStoreTestSuite) newCmd(cmd string, t time.Time) Command { + return Command{ + Shell: "bash", + SessionID: 1, + Command: cmd, + Username: "tester", + Time: t, + } +} + +func (s *BoltStoreTestSuite) TestSavePreAndRead() { + now := time.Now() + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("ls", now), now)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("pwd", now.Add(time.Second)), now.Add(time.Second))) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 2) + s.EqualValues(CommandPhasePre, pre[0].Phase) + // RecordingTime recovered from the key + s.Equal(now.UnixNano(), pre[0].RecordingTime.UnixNano()) +} + +func (s *BoltStoreTestSuite) TestTimeOrdering() { + base := time.Now() + // insert out of order + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("c", base.Add(2*time.Second)), base.Add(2*time.Second))) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("a", base), base)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("b", base.Add(time.Second)), base.Add(time.Second))) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 3) + s.Equal("a", pre[0].Command) + s.Equal("b", pre[1].Command) + s.Equal("c", pre[2].Command) +} + +func (s *BoltStoreTestSuite) TestSameNanoNoCollision() { + now := time.Now() + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("first", now), now)) + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("second", now), now)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Len(pre, 2, "identical recording times must not overwrite each other") +} + +func (s *BoltStoreTestSuite) TestPreTree() { + now := time.Now() + c := s.newCmd("git status", now) + s.Require().NoError(s.store.SavePre(s.ctx, c, now)) + + tree, err := s.store.GetPreTree(s.ctx) + s.Require().NoError(err) + s.Require().Len(tree[c.GetUniqueKey()], 1) +} + +func (s *BoltStoreTestSuite) TestSavePost() { + now := time.Now() + s.Require().NoError(s.store.SavePost(s.ctx, s.newCmd("make", now), 2, now)) + + post, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(post, 1) + s.EqualValues(CommandPhasePost, post[0].Phase) + s.Equal(2, post[0].Result) +} + +func (s *BoltStoreTestSuite) TestCursor() { + _, noCursor, err := s.store.GetLastCursor(s.ctx) + s.Require().NoError(err) + s.True(noCursor, "fresh db has no cursor") + + c := time.Unix(0, 1700000000123456789) + s.Require().NoError(s.store.SetCursor(s.ctx, c)) + + got, noCursor, err := s.store.GetLastCursor(s.ctx) + s.Require().NoError(err) + s.False(noCursor) + s.Equal(c.UnixNano(), got.UnixNano()) +} + +func (s *BoltStoreTestSuite) TestPrune() { + base := time.Now() + synced := base + unsyncedPost := base.Add(10 * time.Second) + + // a finished command (pre+post at the same time) before the cursor -> both pruned + finished := s.newCmd("finished", synced) + s.Require().NoError(s.store.SavePre(s.ctx, finished, synced)) + s.Require().NoError(s.store.SavePost(s.ctx, finished, 0, synced)) + + // an unfinished pre before the cursor -> kept + s.Require().NoError(s.store.SavePre(s.ctx, s.newCmd("running-server", synced), synced)) + + // a post after the cursor -> kept + s.Require().NoError(s.store.SavePost(s.ctx, s.newCmd("later", unsyncedPost), 0, unsyncedPost)) + + s.Require().NoError(s.store.Prune(s.ctx, synced)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(pre, 1) + s.Equal("running-server", pre[0].Command) + + post, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Require().Len(post, 1) + s.Equal("later", post[0].Command) +} + +func (s *BoltStoreTestSuite) TestPruneRealisticTiming() { + // A normal command: post runs AFTER pre. Once synced (<= cursor) both must be + // pruned, otherwise the active bucket grows unbounded. + start := time.Now() + end := start.Add(2 * time.Second) + cmd := s.newCmd("npm test", start) + + s.Require().NoError(s.store.SavePre(s.ctx, cmd, start)) + post := cmd + post.Time = end + s.Require().NoError(s.store.SavePost(s.ctx, post, 0, end)) + + // cursor at/after the post recording time => both are synced + s.Require().NoError(s.store.Prune(s.ctx, end)) + + pre, err := s.store.GetPreCommands(s.ctx) + s.Require().NoError(err) + s.Empty(pre, "synced pre command must be pruned even though post.Time > pre.Time") + + postCmds, err := s.store.GetPostCommands(s.ctx) + s.Require().NoError(err) + s.Empty(postCmds) +} + +func TestBoltStoreSuite(t *testing.T) { + suite.Run(t, new(BoltStoreTestSuite)) +} + +func TestNewCommandStoreSelectsEngine(t *testing.T) { + fileCfg := ShellTimeConfig{} + st, err := NewCommandStore(fileCfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok := st.(*fileStore); !ok { + t.Fatalf("expected fileStore by default, got %T", st) + } + _ = st.Close() +} diff --git a/model/store_file.go b/model/store_file.go new file mode 100644 index 0000000..5e7bbab --- /dev/null +++ b/model/store_file.go @@ -0,0 +1,158 @@ +package model + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "os" + "sort" + "time" +) + +// fileStore is the historical append-only txt backend. It delegates to the +// existing package-level reader functions so behavior is unchanged. +type fileStore struct{} + +func newFileStore() *fileStore { return &fileStore{} } + +func (s *fileStore) appendLine(path string, cmd Command, recordingTime time.Time) error { + if err := ensureStorageFolder(); err != nil { + return err + } + buf, err := cmd.ToLine(recordingTime) + if err != nil { + return err + } + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("failed to open command storage file %s: %w", path, err) + } + defer f.Close() + if _, err := f.Write(buf); err != nil { + return fmt.Errorf("failed to write command storage file %s: %w", path, err) + } + return nil +} + +func (s *fileStore) SavePre(ctx context.Context, cmd Command, recordingTime time.Time) error { + cmd.Phase = CommandPhasePre + return s.appendLine(GetPreCommandFilePath(), cmd, recordingTime) +} + +func (s *fileStore) SavePost(ctx context.Context, cmd Command, result int, recordingTime time.Time) error { + cmd.Phase = CommandPhasePost + cmd.Result = result + cmd.EndTime = time.Now() + return s.appendLine(GetPostCommandFilePath(), cmd, recordingTime) +} + +func (s *fileStore) GetPreTree(ctx context.Context) (map[string][]*Command, error) { + return GetPreCommandsTree(ctx) +} + +func (s *fileStore) GetPreCommands(ctx context.Context) ([]*Command, error) { + return GetPreCommands(ctx) +} + +func (s *fileStore) GetPostCommands(ctx context.Context) ([]*Command, error) { + raw, _, err := GetPostCommands(ctx) + if err != nil { + return nil, err + } + result := make([]*Command, 0, len(raw)) + for _, line := range raw { + cmd := new(Command) + if _, err := cmd.FromLineBytes(line); err != nil { + slog.Warn("failed to parse post command line", slog.Any("err", err)) + continue + } + result = append(result, cmd) + } + return result, nil +} + +func (s *fileStore) GetLastCursor(ctx context.Context) (time.Time, bool, error) { + return GetLastCursor(ctx) +} + +func (s *fileStore) SetCursor(ctx context.Context, cursor time.Time) error { + if err := ensureStorageFolder(); err != nil { + return err + } + cursorFile, err := os.OpenFile(GetCursorFilePath(), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return err + } + defer cursorFile.Close() + _, err = cursorFile.WriteString(fmt.Sprintf("\n%d\n", cursor.UnixNano())) + return err +} + +// Prune compacts the txt files, dropping synced records and keeping unfinished +// pre commands. Mirrors the historical gc cleanCommandFiles behavior. +func (s *fileStore) Prune(ctx context.Context, cursor time.Time) error { + postCommands, err := s.GetPostCommands(ctx) + if err != nil { + return err + } + if len(postCommands) == 0 { + return nil + } + preCommands, err := s.GetPreCommands(ctx) + if err != nil { + return err + } + + newPost := make([]*Command, 0) + for _, cmd := range postCommands { + if cmd != nil && cmd.RecordingTime.After(cursor) { + newPost = append(newPost, cmd) + } + } + + newPre := make([]*Command, 0) + for _, row := range preCommands { + if row == nil { + continue + } + if row.RecordingTime.After(cursor) { + newPre = append(newPre, row) + continue + } + // keep pre commands that no synced post completes (unfinished) + if !preHasSyncedPost(row, postCommands, cursor) { + newPre = append(newPre, row) + } + } + + sort.Slice(newPre, func(i, j int) bool { return newPre[i].RecordingTime.Before(newPre[j].RecordingTime) }) + sort.Slice(newPost, func(i, j int) bool { return newPost[i].RecordingTime.Before(newPost[j].RecordingTime) }) + + preBuf := bytes.Buffer{} + for _, cmd := range newPre { + line, err := cmd.ToLine(cmd.RecordingTime) + if err != nil { + return err + } + preBuf.Write(line) + } + postBuf := bytes.Buffer{} + for _, cmd := range newPost { + line, err := cmd.ToLine(cmd.RecordingTime) + if err != nil { + return err + } + postBuf.Write(line) + } + + if err := os.WriteFile(GetPreCommandFilePath(), preBuf.Bytes(), 0644); err != nil { + return err + } + if err := os.WriteFile(GetPostCommandFilePath(), postBuf.Bytes(), 0644); err != nil { + return err + } + return os.WriteFile(GetCursorFilePath(), []byte(fmt.Sprintf("%d", cursor.UnixNano())), 0644) +} + +func (s *fileStore) Close() error { return nil } diff --git a/model/store_file_test.go b/model/store_file_test.go new file mode 100644 index 0000000..47ddffb --- /dev/null +++ b/model/store_file_test.go @@ -0,0 +1,59 @@ +package model + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestFileStoreRoundTrip(t *testing.T) { + t.Setenv("HOME", t.TempDir()) + InitFolder("") // reset globals to the default .shelltime under the temp HOME + + store := NewFileStore() + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 7, Command: "go build", Username: "u", Hostname: "h", Time: start} + + require.NoError(t, store.SavePre(ctx, cmd, start)) + + _, noCursor, err := store.GetLastCursor(ctx) + require.NoError(t, err) + require.True(t, noCursor, "fresh store has no cursor") + + post := cmd + post.Time = start.Add(time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + tree, err := store.GetPreTree(ctx) + require.NoError(t, err) + require.Len(t, tree[cmd.GetUniqueKey()], 1) + + posts, err := store.GetPostCommands(ctx) + require.NoError(t, err) + require.Len(t, posts, 1) + require.Equal(t, CommandPhasePost, int(posts[0].Phase)) + + pres, err := store.GetPreCommands(ctx) + require.NoError(t, err) + require.Len(t, pres, 1) + + require.NoError(t, store.SetCursor(ctx, post.Time)) + c, noCursor, err := store.GetLastCursor(ctx) + require.NoError(t, err) + require.False(t, noCursor) + require.Equal(t, post.Time.UnixNano(), c.UnixNano()) + + // finished + synced => pruned from both files + require.NoError(t, store.Prune(ctx, post.Time)) + pres, err = store.GetPreCommands(ctx) + require.NoError(t, err) + require.Empty(t, pres) + posts, err = store.GetPostCommands(ctx) + require.NoError(t, err) + require.Empty(t, posts) + + require.NoError(t, store.Close()) +} diff --git a/model/tracking_build.go b/model/tracking_build.go new file mode 100644 index 0000000..859be79 --- /dev/null +++ b/model/tracking_build.go @@ -0,0 +1,124 @@ +package model + +import ( + "context" + "time" +) + +// TrackingBuildResult is the outcome of assembling pending post commands into a +// server-ready tracking payload. +type TrackingBuildResult struct { + Data []TrackingData + Meta TrackingMetaData + LatestRecordingTime time.Time + Cursor time.Time + NoCursorExist bool +} + +// BuildTrackingData assembles the tracking payload for all post commands newer +// than the store's cursor, pairing each with its closest pre command. It is the +// shared assembly used by both the CLI fallback path and the daemon's bolt path +// (previously inlined in commands.trySyncLocalToServer). +// +// It performs no network IO and does not advance the cursor; callers decide +// whether to flush (FlushCount), send, then SetCursor + Prune. +func BuildTrackingData(ctx context.Context, store CommandStore, config ShellTimeConfig) (TrackingBuildResult, error) { + ctx, span := modelTracer.Start(ctx, "buildTrackingData") + defer span.End() + + var res TrackingBuildResult + + postCommands, err := store.GetPostCommands(ctx) + if err != nil { + return res, err + } + if len(postCommands) == 0 { + return res, nil + } + + cursor, noCursorExist, err := store.GetLastCursor(ctx) + if err != nil { + return res, err + } + res.Cursor = cursor + res.NoCursorExist = noCursorExist + + preTree, err := store.GetPreTree(ctx) + if err != nil { + return res, err + } + + sysInfo, err := GetOSAndVersion() + if err != nil { + sysInfo = &SysInfo{Os: "unknown", Version: "unknown"} + } + + meta := TrackingMetaData{ + OS: sysInfo.Os, + OSVersion: sysInfo.Version, + } + + trackingData := make([]TrackingData, 0) + latest := cursor + + for _, postCommand := range postCommands { + if postCommand == nil { + continue + } + + recordingTime := postCommand.RecordingTime + if recordingTime.Before(cursor) { + continue + } + if recordingTime.After(latest) { + latest = recordingTime + } + + key := postCommand.GetUniqueKey() + preCommands, ok := preTree[key] + if !ok { + continue + } + + if meta.Hostname == "" { + meta.Hostname = postCommand.Hostname + } + if meta.Shell == "" { + meta.Shell = postCommand.Shell + } + if meta.Username == "" { + meta.Username = postCommand.Username + } + + if ShouldExcludeCommand(postCommand.Command, config.Exclude) { + continue + } + + closestPreCommand := postCommand.FindClosestCommand(preCommands, false) + + td := TrackingData{ + SessionID: postCommand.SessionID, + Command: postCommand.Command, + EndTime: postCommand.Time.Unix(), + EndTimeNano: postCommand.Time.UnixNano(), + Result: postCommand.Result, + PPID: postCommand.PPID, + } + + if config.DataMasking != nil && *config.DataMasking { + td.Command = MaskSensitiveTokens(td.Command) + } + + if closestPreCommand != nil { + td.StartTime = closestPreCommand.Time.Unix() + td.StartTimeNano = closestPreCommand.Time.UnixNano() + } + + trackingData = append(trackingData, td) + } + + res.Data = trackingData + res.Meta = meta + res.LatestRecordingTime = latest + return res, nil +} diff --git a/model/tracking_build_test.go b/model/tracking_build_test.go new file mode 100644 index 0000000..3a04102 --- /dev/null +++ b/model/tracking_build_test.go @@ -0,0 +1,63 @@ +package model + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBuildTrackingData(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + start := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "deploy prod", Username: "u", Hostname: "h", Time: start, PPID: 42} + require.NoError(t, store.SavePre(ctx, cmd, start)) + post := cmd + post.Time = start.Add(2 * time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + res, err := BuildTrackingData(ctx, store, ShellTimeConfig{}) + require.NoError(t, err) + require.True(t, res.NoCursorExist) + require.Len(t, res.Data, 1) + require.Equal(t, "deploy prod", res.Data[0].Command) + require.Equal(t, start.Unix(), res.Data[0].StartTime) + require.Equal(t, post.Time.Unix(), res.Data[0].EndTime) + require.Equal(t, 42, res.Data[0].PPID) + require.Equal(t, "h", res.Meta.Hostname) + require.Equal(t, "bash", res.Meta.Shell) +} + +func TestBuildTrackingDataExcludes(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + ctx := context.Background() + now := time.Now() + cmd := Command{Shell: "bash", SessionID: 1, Command: "secret-token-cmd", Username: "u", Time: now} + require.NoError(t, store.SavePre(ctx, cmd, now)) + post := cmd + post.Time = now.Add(time.Second) + require.NoError(t, store.SavePost(ctx, post, 0, post.Time)) + + res, err := BuildTrackingData(ctx, store, ShellTimeConfig{Exclude: []string{"^secret"}}) + require.NoError(t, err) + require.Empty(t, res.Data, "excluded command must not be synced") +} + +func TestBuildTrackingDataEmpty(t *testing.T) { + store, err := newBoltStore(filepath.Join(t.TempDir(), "commands.db")) + require.NoError(t, err) + defer store.Close() + + res, err := BuildTrackingData(context.Background(), store, ShellTimeConfig{}) + require.NoError(t, err) + require.Empty(t, res.Data) +} diff --git a/model/types.go b/model/types.go index dc8e208..8d6ff66 100644 --- a/model/types.go +++ b/model/types.go @@ -97,11 +97,23 @@ type ShellTimeConfig struct { // LogCleanup configuration for automatic log file cleanup in daemon LogCleanup *LogCleanup `toml:"logCleanup" yaml:"logCleanup" json:"logCleanup"` + // Storage selects the local command buffering backend. When unset the + // always-available txt file store is used. + Storage *StorageConfig `toml:"storage" yaml:"storage,omitempty" json:"storage,omitempty"` + // SocketPath is the path to the Unix domain socket used for communication // between the CLI and the daemon. SocketPath string `toml:"socketPath" yaml:"socketPath" json:"socketPath"` } +// StorageConfig selects which CommandStore backend buffers tracked commands +// before they sync to the server. +type StorageConfig struct { + // Engine is "file" (default) or "bolt". The bolt engine is daemon-owned; + // see CommandStore for details. + Engine string `toml:"engine" yaml:"engine" json:"engine"` +} + var DefaultAIConfig = &AIConfig{ Agent: AIAgentConfig{ View: false,