diff --git a/Dockerfile.data-service b/Dockerfile.data-service new file mode 100644 index 0000000..0e9732c --- /dev/null +++ b/Dockerfile.data-service @@ -0,0 +1,51 @@ +# Stage 1: Build Go application +FROM golang:1.25.1-alpine AS go-builder + +# Install build dependencies +RUN apk add --no-cache git ca-certificates tzdata + +# Set working directory +WORKDIR /build + +# Copy the main module files first (needed as dependency) +COPY go.mod go.sum ./ + +# Download dependencies +RUN go mod download + +# Copy source code +COPY . . + +# Build the data-service binary +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-w -s" -o data-service ./data-service + +# Stage 2: Final runtime image +FROM alpine:latest + +# Install runtime dependencies +RUN apk add --no-cache ca-certificates tzdata + +ENV TZ=Europe/Riga + +# Create non-root user +RUN addgroup -g 1001 data-service && \ + adduser -D -u 1001 -G data-service data-service + +# Set working directory +WORKDIR /app + +# Copy binary from builder stage +COPY --from=go-builder /build/data-service/data-service . + +# Switch to non-root user +USER data-service + +# Expose application port +EXPOSE 8081 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD wget --no-verbose --tries=1 --spider http://localhost:8081/mpc/get || exit 1 + +# Default command +ENTRYPOINT ["./data-service"] diff --git a/Makefile b/Makefile index 82287f6..cd4f58e 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,20 @@ dev-watch: ## Run with file watching (requires entr: brew install entr) @which entr > /dev/null || (echo "entr not installed for file watching" && exit 1) find . -name '*.go' | entr -r go run . -price-limit=50.0 -network=192.168.1.0/24 +# Data-service Docker targets +DATA_SERVICE_IMAGE=data-service +DATA_SERVICE_DOCKERFILE=Dockerfile.data-service + +docker-data-service-multi: ## Build Docker image for data-service for multiple platforms + docker buildx build --platform $(PLATFORMS) -t $(DATA_SERVICE_IMAGE):$(DOCKER_TAG) -f $(DATA_SERVICE_DOCKERFILE) . + +docker-data-service: ## Build Docker image for data-service for ARM7 (Raspberry Pi) + rm -f ems-data-service-working.tar + docker buildx build --platform linux/arm/v7 --no-cache --output=type=docker -t $(DATA_SERVICE_IMAGE):latest-arm7 -f $(DATA_SERVICE_DOCKERFILE) . + docker save $(DATA_SERVICE_IMAGE):latest-arm7 > data-service.tar + skopeo copy docker-archive:data-service.tar docker-archive:ems-data-service-working.tar + cp ems-data-service-working.tar ~/Downloads/ems-data-service-working.tar + # Docker targets docker: ## Build Docker image for ARM7 (Raspberry Pi) rm -f ems-working.tar diff --git a/data-service/data-service b/data-service/data-service new file mode 100755 index 0000000..0b565cd Binary files /dev/null and b/data-service/data-service differ diff --git a/data-service/main.go b/data-service/main.go new file mode 100644 index 0000000..e35e84e --- /dev/null +++ b/data-service/main.go @@ -0,0 +1,101 @@ +// Package main implements the data-service HTTP server for storing and retrieving MPC control decisions. +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/devskill-org/ems/mpc" +) + +var ( + decisions []mpc.ControlDecision + decisionsMu sync.RWMutex +) + +func main() { + port := os.Getenv("PORT") + if port == "" { + port = "8081" + } + + mux := http.NewServeMux() + mux.HandleFunc("/mpc/save", handleMPCSave) + mux.HandleFunc("/mpc/get", handleMPCGet) + + server := &http.Server{ + Addr: ":" + port, + Handler: mux, + ReadHeaderTimeout: 1 * time.Second, + } + + go func() { + log.Printf("data-service HTTP server listening on :%s", port) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("HTTP server error: %v", err) + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + log.Println("Shutting down data-service...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Fatalf("Server forced to shutdown: %v", err) + } + log.Println("data-service stopped") +} + +func handleMPCSave(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + var saved []mpc.ControlDecision + if err := json.NewDecoder(r.Body).Decode(&saved); err != nil { + http.Error(w, fmt.Sprintf("invalid JSON: %v", err), http.StatusBadRequest) + return + } + + decisionsMu.Lock() + decisions = saved + decisionsMu.Unlock() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(map[string]any{ + "status": "ok", + "decisions_saved": len(saved), + }); err != nil { + log.Printf("failed to encode response: %v", err) + } +} + +func handleMPCGet(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + decisionsMu.RLock() + snapshot := make([]mpc.ControlDecision, len(decisions)) + copy(snapshot, decisions) + decisionsMu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(snapshot); err != nil { + log.Printf("failed to encode response: %v", err) + } +} diff --git a/scheduler/config.go b/scheduler/config.go index 271a4a5..14c509e 100644 --- a/scheduler/config.go +++ b/scheduler/config.go @@ -60,6 +60,7 @@ type Config struct { PVPollInterval time.Duration `json:"pv_poll_interval"` // Poll interval for PV power (duration) PVIntegrationPeriod time.Duration `json:"pv_integration_period"` // Integration period for PV power (duration) PostgresConnString string `json:"postgres_conn_string"` // PostgreSQL connection string + DataServiceURL string `json:"data_service_url"` // URL of the data-service HTTP server for MPC decisions // Weather API settings WeatherUpdateInterval time.Duration `json:"weather_update_interval"` // How often to update weather diff --git a/scheduler/mpc_persistence.go b/scheduler/mpc_persistence.go index b96c040..2f121d6 100644 --- a/scheduler/mpc_persistence.go +++ b/scheduler/mpc_persistence.go @@ -1,250 +1,122 @@ package scheduler import ( + "bytes" "context" - "database/sql" + "encoding/json" "fmt" + "io" + "net/http" "time" "github.com/devskill-org/ems/mpc" ) -// saveMPCDecisions persists MPC decisions to the database -func (s *MinerScheduler) saveMPCDecisions(ctx context.Context, decisions []mpc.ControlDecision) error { - if s.db == nil { - return fmt.Errorf("database connection not available") +// dataServiceClient handles communication with the data-service HTTP server +// for storing and retrieving MPC decisions. +type dataServiceClient struct { + baseURL string + client *http.Client +} + +// newDataServiceClient creates a new data-service client. +// Returns nil if baseURL is empty. +func newDataServiceClient(config *Config) *dataServiceClient { + if config.DataServiceURL == "" { + return nil + } + return &dataServiceClient{ + baseURL: config.DataServiceURL, + client: &http.Client{ + Timeout: 2 * time.Second, + }, } +} +// saveMPCDecisions sends decisions to the data-service via its POST /mpc/save endpoint. +// The data-service replaces the full in-memory list with the provided decisions. +func (s *MinerScheduler) saveMPCDecisions(ctx context.Context, decisions []mpc.ControlDecision) error { + if s.dataServiceClient == nil { + return nil // data-service not configured, skip storage + } if len(decisions) == 0 { return nil } - // Use first decision timestamp as minimum - // Decisions are ordered by timestamp because: - // 1. MPC forecast is built from a map and explicitly sorted by hour - // 2. MPC controller reconstructs path in the same order as forecast - // 3. Timestamp increases monotonically with hour - minTimestamp := decisions[0].Timestamp - - // Begin transaction - tx, err := s.db.BeginTx(ctx, nil) + body, err := json.Marshal(decisions) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to marshal decisions: %w", err) } - defer tx.Rollback() - // Delete existing decisions with timestamp >= minTimestamp - _, err = tx.ExecContext(ctx, `DELETE FROM mpc_decisions WHERE timestamp >= $1`, minTimestamp) + url := s.dataServiceClient.baseURL + "/mpc/save" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") if err != nil { - return fmt.Errorf("failed to delete existing decisions: %w", err) + return fmt.Errorf("failed to create request: %w", err) } - // Prepare upsert statement - stmt, err := tx.PrepareContext(ctx, ` - INSERT INTO mpc_decisions ( - timestamp, - hour, - battery_charge, - battery_charge_from_pv, - battery_charge_from_grid, - battery_discharge, - grid_import, - grid_export, - battery_soc, - profit, - import_price, - export_price, - solar_forecast, - load_forecast, - cloud_coverage, - weather_symbol, - battery_avg_cell_temp, - air_temperature, - battery_preheat_active - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) - ON CONFLICT (timestamp) DO UPDATE SET - hour = EXCLUDED.hour, - battery_charge = EXCLUDED.battery_charge, - battery_charge_from_pv = EXCLUDED.battery_charge_from_pv, - battery_charge_from_grid = EXCLUDED.battery_charge_from_grid, - battery_discharge = EXCLUDED.battery_discharge, - grid_import = EXCLUDED.grid_import, - grid_export = EXCLUDED.grid_export, - battery_soc = EXCLUDED.battery_soc, - profit = EXCLUDED.profit, - import_price = EXCLUDED.import_price, - export_price = EXCLUDED.export_price, - solar_forecast = EXCLUDED.solar_forecast, - load_forecast = EXCLUDED.load_forecast, - cloud_coverage = EXCLUDED.cloud_coverage, - weather_symbol = EXCLUDED.weather_symbol, - battery_avg_cell_temp = EXCLUDED.battery_avg_cell_temp, - air_temperature = EXCLUDED.air_temperature, - battery_preheat_active = EXCLUDED.battery_preheat_active - `) + resp, err := s.dataServiceClient.client.Do(req) if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer stmt.Close() - - // Insert all decisions - for _, decision := range decisions { - _, err := stmt.ExecContext(ctx, - decision.Timestamp, - decision.Hour, - decision.BatteryCharge, - decision.BatteryChargeFromPV, - decision.BatteryChargeFromGrid, - decision.BatteryDischarge, - decision.GridImport, - decision.GridExport, - decision.BatterySOC, - decision.Profit, - decision.ImportPrice, - decision.ExportPrice, - decision.SolarForecast, - decision.LoadForecast, - decision.CloudCoverage, - decision.WeatherSymbol, - decision.BatteryAvgCellTemp, - decision.AirTemperature, - decision.BatteryPreHeatActive, - ) - if err != nil { - return fmt.Errorf("failed to insert decision for hour %d: %w", decision.Hour, err) - } + return fmt.Errorf("failed to POST to data-service: %w", err) } + defer resp.Body.Close() - // Commit transaction - if err := tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("data-service POST /mpc/save returned %d: %s", resp.StatusCode, string(respBody)) } - s.logger.Printf("Saved %d MPC decisions to database", len(decisions)) + _, _ = io.Copy(io.Discard, resp.Body) // drain body to reuse connection + s.logger.Printf("Saved %d MPC decisions to data-service", len(decisions)) return nil } -// loadLatestMPCDecisions loads MPC decisions from the database with timestamp >= now +// loadLatestMPCDecisions fetches the current list of MPC decisions from the +// data-service via its GET /mpc/get endpoint. Unlike the previous DB variant +// this does NOT filter by timestamp — the data-service holds decisions for all +// hours; the scheduler can filter client-side if needed. func (s *MinerScheduler) loadLatestMPCDecisions(ctx context.Context) ([]mpc.ControlDecision, error) { - if s.db == nil { - return nil, fmt.Errorf("database connection not available") + if s.dataServiceClient == nil { + return nil, nil // data-service not configured } - - config := s.GetConfig() - - // Get current Unix timestamp - now := ctx.Value("now") - var nowTimestamp int64 - if now != nil { - if t, ok := now.(int64); ok { - nowTimestamp = t - } - } - if nowTimestamp == 0 { - nowTimestamp = s.getCurrentTimestamp() + url := s.dataServiceClient.baseURL + "/mpc/get" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) } - ts := nowTimestamp - int64(config.CheckPriceInterval.Seconds()) - - // Load decisions with timestamp >= now, ordered by timestamp - rows, err := s.db.QueryContext(ctx, ` - SELECT - timestamp, - hour, - battery_charge, - battery_charge_from_pv, - battery_charge_from_grid, - battery_discharge, - grid_import, - grid_export, - battery_soc, - profit, - import_price, - export_price, - solar_forecast, - load_forecast, - cloud_coverage, - weather_symbol, - battery_avg_cell_temp, - air_temperature, - battery_preheat_active - FROM mpc_decisions - WHERE timestamp >= $1 - ORDER BY timestamp ASC - `, ts) + resp, err := s.dataServiceClient.client.Do(req) if err != nil { - return nil, fmt.Errorf("failed to query decisions: %w", err) + return nil, fmt.Errorf("failed to GET from data-service: %w", err) } - defer rows.Close() + defer resp.Body.Close() - var decisions []mpc.ControlDecision - for rows.Next() { - var decision mpc.ControlDecision - var cloudCoverage sql.NullFloat64 - var weatherSymbol sql.NullString - var batteryAvgCellTemp sql.NullFloat64 - var airTemperature sql.NullFloat64 - var batteryPreHeatActive sql.NullBool - - err := rows.Scan( - &decision.Timestamp, - &decision.Hour, - &decision.BatteryCharge, - &decision.BatteryChargeFromPV, - &decision.BatteryChargeFromGrid, - &decision.BatteryDischarge, - &decision.GridImport, - &decision.GridExport, - &decision.BatterySOC, - &decision.Profit, - &decision.ImportPrice, - &decision.ExportPrice, - &decision.SolarForecast, - &decision.LoadForecast, - &cloudCoverage, - &weatherSymbol, - &batteryAvgCellTemp, - &airTemperature, - &batteryPreHeatActive, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan decision: %w", err) - } - - if cloudCoverage.Valid { - decision.CloudCoverage = cloudCoverage.Float64 - } - if weatherSymbol.Valid { - decision.WeatherSymbol = weatherSymbol.String - } - if batteryAvgCellTemp.Valid { - decision.BatteryAvgCellTemp = batteryAvgCellTemp.Float64 - } - if airTemperature.Valid { - decision.AirTemperature = airTemperature.Float64 - } - if batteryPreHeatActive.Valid { - decision.BatteryPreHeatActive = batteryPreHeatActive.Bool - } - - decisions = append(decisions, decision) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("data-service GET /mpc/get returned %d: %s", resp.StatusCode, string(respBody)) } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("error iterating decisions: %w", err) + // Load decisions + decisions, err := loadDecisionsFromReader(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to decode MPC decisions: %w", err) } if len(decisions) == 0 { - s.logger.Printf("No future MPC decisions found in database") + s.logger.Printf("No MPC decisions found in data-service") return nil, nil } - s.logger.Printf("Loaded %d MPC decisions from database (starting from timestamp %d)", len(decisions), ts) - + s.logger.Printf("Loaded %d MPC decisions from data-service", len(decisions)) return decisions, nil } -// getCurrentTimestamp returns the current Unix timestamp -func (s *MinerScheduler) getCurrentTimestamp() int64 { - return time.Now().Unix() +// loadDecisionsFromReader decodes MPC decisions from an io.Reader. +func loadDecisionsFromReader(r io.Reader) ([]mpc.ControlDecision, error) { + var decisions []mpc.ControlDecision + err := json.NewDecoder(r).Decode(&decisions) + if err != nil { + return nil, err + } + return decisions, nil } diff --git a/scheduler/mpc_persistence_test.go b/scheduler/mpc_persistence_test.go index 1e98aab..d1e057e 100644 --- a/scheduler/mpc_persistence_test.go +++ b/scheduler/mpc_persistence_test.go @@ -2,45 +2,73 @@ package scheduler import ( "context" - "database/sql" + "encoding/json" "log" + "net/http" + "net/http/httptest" "os" "testing" "time" "github.com/devskill-org/ems/mpc" - _ "github.com/lib/pq" ) -// TestMPCPersistence_SaveAndLoad tests the save and load cycle -func TestMPCPersistence_SaveAndLoad(t *testing.T) { - // Skip if no database connection available - connString := os.Getenv("TEST_POSTGRES_CONN") - if connString == "" { - t.Skip("Skipping test: TEST_POSTGRES_CONN not set") - } +func makeTestDataService(t *testing.T) (string, *httptest.Server) { + t.Helper() + + var decisions []mpc.ControlDecision + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/mpc/save": + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var saved []mpc.ControlDecision + if err := json.NewDecoder(r.Body).Decode(&saved); err != nil { + http.Error(w, "invalid JSON", http.StatusBadRequest) + return + } + decisions = saved + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "ok", + "decisions_saved": len(saved), + }) + case "/mpc/get": + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + if len(decisions) == 0 { + json.NewEncoder(w).Encode([]mpc.ControlDecision{}) + return + } + json.NewEncoder(w).Encode(decisions) + default: + http.NotFound(w, r) + } + })) - db, err := sql.Open("postgres", connString) - if err != nil { - t.Fatalf("Failed to connect to database: %v", err) - } - defer db.Close() + return server.URL, server +} - // Clean up table before test - _, err = db.Exec("DELETE FROM mpc_decisions") - if err != nil { - t.Fatalf("Failed to clean up table: %v", err) - } +func TestMPCDataSaveAndLoad(t *testing.T) { + url, server := makeTestDataService(t) + defer server.Close() - // Create scheduler with database - config := &Config{} + config := &Config{ + DataServiceURL: url, + } scheduler := &MinerScheduler{ - config: config, - db: db, - logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), + config: config, + dataServiceClient: newDataServiceClient(config), + logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), } - // Create test decisions with timestamps in the future now := time.Now().Unix() decisions := []mpc.ControlDecision{ { @@ -48,7 +76,7 @@ func TestMPCPersistence_SaveAndLoad(t *testing.T) { Timestamp: now + 3600, BatteryCharge: 10.5, BatteryChargeFromPV: 10.5, - BatteryChargeFromGrid: 5.0, + BatteryChargeFromGrid: 5.0, BatteryDischarge: 0, GridImport: 5.0, GridExport: 0, @@ -69,7 +97,7 @@ func TestMPCPersistence_SaveAndLoad(t *testing.T) { Timestamp: now + 7200, BatteryCharge: 0, BatteryChargeFromPV: 0, - BatteryChargeFromGrid: 0, + BatteryChargeFromGrid: 0, BatteryDischarge: 8.0, GridImport: 0, GridExport: 3.0, @@ -90,7 +118,7 @@ func TestMPCPersistence_SaveAndLoad(t *testing.T) { ctx := context.Background() // Save decisions - err = scheduler.saveMPCDecisions(ctx, decisions) + err := scheduler.saveMPCDecisions(ctx, decisions) if err != nil { t.Fatalf("Failed to save decisions: %v", err) } @@ -134,235 +162,124 @@ func TestMPCPersistence_SaveAndLoad(t *testing.T) { } } -// TestMPCPersistence_DeleteOldDecisions tests that old decisions are replaced -func TestMPCPersistence_DeleteOldDecisions(t *testing.T) { - // Skip if no database connection available - connString := os.Getenv("TEST_POSTGRES_CONN") - if connString == "" { - t.Skip("Skipping test: TEST_POSTGRES_CONN not set") - } +func TestMPCDataReplaceDecisions(t *testing.T) { + url, server := makeTestDataService(t) + defer server.Close() - db, err := sql.Open("postgres", connString) - if err != nil { - t.Fatalf("Failed to connect to database: %v", err) - } - defer db.Close() - - // Clean up table before test - _, err = db.Exec("DELETE FROM mpc_decisions") - if err != nil { - t.Fatalf("Failed to clean up table: %v", err) + config := &Config{ + DataServiceURL: url, } - - // Create scheduler with database - config := &Config{} scheduler := &MinerScheduler{ - config: config, - db: db, - logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), + config: config, + dataServiceClient: newDataServiceClient(config), + logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), } now := time.Now().Unix() ctx := context.Background() - // First, save decisions for hours 0-2 + // First save: hours 0-2 firstDecisions := []mpc.ControlDecision{ {Hour: 0, Timestamp: now + 3600, Profit: 1.0}, {Hour: 1, Timestamp: now + 7200, Profit: 2.0}, {Hour: 2, Timestamp: now + 10800, Profit: 3.0}, } - err = scheduler.saveMPCDecisions(ctx, firstDecisions) + err := scheduler.saveMPCDecisions(ctx, firstDecisions) if err != nil { t.Fatalf("Failed to save first decisions: %v", err) } - // Then, save new decisions starting from hour 1 (should replace hours 1-2) + // Verify + loaded, err := scheduler.loadLatestMPCDecisions(ctx) + if err != nil { + t.Fatalf("Failed to load decisions: %v", err) + } + if len(loaded) != 3 { + t.Fatalf("Expected 3 decisions after first save, got %d", len(loaded)) + } + // Data-service replaces the entire list, so second save replaces all. + // Second save: hours 3-5 (replaces first 3) secondDecisions := []mpc.ControlDecision{ - {Hour: 1, Timestamp: now + 7200, Profit: 20.0}, // Updated - {Hour: 2, Timestamp: now + 10800, Profit: 30.0}, // Updated - {Hour: 3, Timestamp: now + 14400, Profit: 40.0}, // New + {Hour: 3, Timestamp: now + 14400, Profit: 40.0}, + {Hour: 4, Timestamp: now + 18000, Profit: 50.0}, + {Hour: 5, Timestamp: now + 21600, Profit: 60.0}, } err = scheduler.saveMPCDecisions(ctx, secondDecisions) if err != nil { t.Fatalf("Failed to save second decisions: %v", err) } - // Load all decisions (including past) - var allDecisions []mpc.ControlDecision - rows, err := db.Query("SELECT timestamp, profit FROM mpc_decisions ORDER BY timestamp") + // Verify: only the new 3 decisions should remain + loaded, err = scheduler.loadLatestMPCDecisions(ctx) if err != nil { - t.Fatalf("Failed to query decisions: %v", err) - } - defer rows.Close() - - for rows.Next() { - var d mpc.ControlDecision - err := rows.Scan(&d.Timestamp, &d.Profit) - if err != nil { - t.Fatalf("Failed to scan decision: %v", err) - } - allDecisions = append(allDecisions, d) + t.Fatalf("Failed to load decisions: %v", err) } - - // Should have 4 decisions: hour 0 (unchanged), hours 1-3 (new/updated) - if len(allDecisions) != 4 { - t.Errorf("Expected 4 decisions, got %d", len(allDecisions)) + if len(loaded) != 3 { + t.Fatalf("Expected 3 decisions after second save (replace), got %d", len(loaded)) } - - // Verify hour 0 is unchanged - if allDecisions[0].Timestamp == now+3600 && allDecisions[0].Profit != 1.0 { - t.Errorf("Hour 0 should be unchanged with profit 1.0, got %.2f", allDecisions[0].Profit) + if loaded[0].Hour != 3 || loaded[0].Profit != 40.0 { + t.Errorf("Expected first decision to be hour 3 profit 40.0, got hour %d profit %.2f", loaded[0].Hour, loaded[0].Profit) } - - // Verify hour 1 is updated - if allDecisions[1].Timestamp == now+7200 && allDecisions[1].Profit != 20.0 { - t.Errorf("Hour 1 should be updated with profit 20.0, got %.2f", allDecisions[1].Profit) + if loaded[2].Hour != 5 || loaded[2].Profit != 60.0 { + t.Errorf("Expected last decision to be hour 5 profit 60.0, got hour %d profit %.2f", loaded[2].Hour, loaded[2].Profit) } } -// TestMPCPersistence_LoadOnlyFutureDecisions tests that only future decisions are loaded -func TestMPCPersistence_LoadOnlyFutureDecisions(t *testing.T) { - // Skip if no database connection available - connString := os.Getenv("TEST_POSTGRES_CONN") - if connString == "" { - t.Skip("Skipping test: TEST_POSTGRES_CONN not set") - } - - db, err := sql.Open("postgres", connString) - if err != nil { - t.Fatalf("Failed to connect to database: %v", err) - } - defer db.Close() - - // Clean up table before test - _, err = db.Exec("DELETE FROM mpc_decisions") - if err != nil { - t.Fatalf("Failed to clean up table: %v", err) +func TestMPCDataNoDataService(t *testing.T) { + config := &Config{ + DataServiceURL: "", } - - // Create scheduler with database - config := &Config{} scheduler := &MinerScheduler{ - config: config, - db: db, - logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), + config: config, + dataServiceClient: newDataServiceClient(config), + logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), } - now := time.Now().Unix() ctx := context.Background() - // Save decisions: some in the past, some in the future - decisions := []mpc.ControlDecision{ - {Hour: 0, Timestamp: now - 3600, Profit: 1.0}, // Past - {Hour: 1, Timestamp: now - 1800, Profit: 2.0}, // Past - {Hour: 2, Timestamp: now + 1800, Profit: 3.0}, // Future - {Hour: 3, Timestamp: now + 3600, Profit: 4.0}, // Future - {Hour: 4, Timestamp: now + 7200, Profit: 5.0}, // Future - } - - // Insert directly to test load filtering - for _, d := range decisions { - _, err := db.Exec(` - INSERT INTO mpc_decisions (timestamp, hour, battery_charge, battery_charge_from_pv, - battery_charge_from_grid, battery_discharge, grid_import, grid_export, battery_soc, - profit, import_price, export_price, solar_forecast, load_forecast) - VALUES ($1, $2, 0, 0, 0, 0, 0, 0, 0.5, $3, 0.1, 0.05, 10, 5) - `, d.Timestamp, d.Hour, d.Profit) - if err != nil { - t.Fatalf("Failed to insert decision: %v", err) - } + // Saving with no data-service should be a no-op (nil error) + err := scheduler.saveMPCDecisions(ctx, nil) + if err != nil { + t.Fatalf("save with no service should not error: %v", err) } - // Load decisions (should only get future ones) + // Loading with no data-service should return empty (nil error) loaded, err := scheduler.loadLatestMPCDecisions(ctx) if err != nil { - t.Fatalf("Failed to load decisions: %v", err) + t.Fatalf("load with no service should not error: %v", err) } - - // Should only load 3 future decisions - if len(loaded) != 3 { - t.Errorf("Expected 3 future decisions, got %d", len(loaded)) - } - - // Verify all loaded decisions are in the future - for i, decision := range loaded { - if decision.Timestamp < now { - t.Errorf("Decision %d has past timestamp %d (now: %d)", i, decision.Timestamp, now) - } - } - - // Verify they are ordered by timestamp - for i := 1; i < len(loaded); i++ { - if loaded[i].Timestamp <= loaded[i-1].Timestamp { - t.Errorf("Decisions not properly ordered by timestamp") - } + if len(loaded) != 0 { + t.Errorf("Expected empty list, got %d decisions", len(loaded)) } } -// TestMPCPersistence_UniqueTimestamp tests that timestamp PRIMARY KEY prevents duplicates -func TestMPCPersistence_UniqueTimestamp(t *testing.T) { - // Skip if no database connection available - connString := os.Getenv("TEST_POSTGRES_CONN") - if connString == "" { - t.Skip("Skipping test: TEST_POSTGRES_CONN not set") - } +func TestMPCDataEmptyList(t *testing.T) { + url, server := makeTestDataService(t) + defer server.Close() - db, err := sql.Open("postgres", connString) - if err != nil { - t.Fatalf("Failed to connect to database: %v", err) + config := &Config{ + DataServiceURL: url, } - defer db.Close() - - // Clean up table before test - _, err = db.Exec("DELETE FROM mpc_decisions") - if err != nil { - t.Fatalf("Failed to clean up table: %v", err) + scheduler := &MinerScheduler{ + config: config, + dataServiceClient: newDataServiceClient(config), + logger: log.New(os.Stdout, "TEST: ", log.LstdFlags), } - now := time.Now().Unix() - timestamp := now + 3600 - - // Insert first decision - _, err = db.Exec(` - INSERT INTO mpc_decisions (timestamp, hour, battery_charge, battery_charge_from_pv, - battery_charge_from_grid, battery_discharge, grid_import, grid_export, battery_soc, - profit, import_price, export_price, solar_forecast, load_forecast) - VALUES ($1, 0, 10, 10, 0, 0, 5, 0, 0.6, 2.5, 0.1, 0.05, 15, 10) - `, timestamp) - if err != nil { - t.Fatalf("Failed to insert first decision: %v", err) - } + ctx := context.Background() - // Try to insert duplicate timestamp (should be handled by UPSERT in saveMPCDecisions) - _, err = db.Exec(` - INSERT INTO mpc_decisions (timestamp, hour, battery_charge, battery_charge_from_pv, - battery_charge_from_grid, battery_discharge, grid_import, grid_export, battery_soc, - profit, import_price, export_price, solar_forecast, load_forecast) - VALUES ($1, 1, 20, 20, 0, 0, 10, 0, 0.7, 5.0, 0.12, 0.06, 20, 12) - ON CONFLICT (timestamp) DO UPDATE SET - hour = EXCLUDED.hour, - profit = EXCLUDED.profit - `, timestamp) + // Save empty list + err := scheduler.saveMPCDecisions(ctx, []mpc.ControlDecision{}) if err != nil { - t.Fatalf("UPSERT failed: %v", err) + t.Fatalf("Failed to save empty decisions: %v", err) } - // Verify only one row exists and it's updated - var count int - var profit float64 - var hour int - err = db.QueryRow("SELECT COUNT(*), MAX(hour), MAX(profit) FROM mpc_decisions WHERE timestamp = $1", timestamp).Scan(&count, &hour, &profit) + // Loading should return empty + loaded, err := scheduler.loadLatestMPCDecisions(ctx) if err != nil { - t.Fatalf("Failed to query: %v", err) - } - - if count != 1 { - t.Errorf("Expected 1 row, got %d", count) - } - if hour != 1 { - t.Errorf("Expected hour to be updated to 1, got %d", hour) + t.Fatalf("Failed to load decisions: %v", err) } - if profit != 5.0 { - t.Errorf("Expected profit to be updated to 5.0, got %.2f", profit) + if len(loaded) != 0 { + t.Errorf("Expected empty list, got %d decisions", len(loaded)) } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index da53684..af6dd01 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -131,9 +131,12 @@ type MinerScheduler struct { // Web server webServer *WebServer - // Database connection + // Database connection (kept for metrics, not used for MPC decisions) db *sql.DB + // Data-service HTTP client for MPC decisions + dataServiceClient *dataServiceClient + // Logging logger *log.Logger @@ -148,10 +151,11 @@ func NewMinerScheduler(config *Config, logger *log.Logger) *MinerScheduler { } scheduler := &MinerScheduler{ - config: config, - stopChan: make(chan struct{}), - logger: logger, - xmlCache: entsoe.NewXMLDocumentCache(), + config: config, + stopChan: make(chan struct{}), + logger: logger, + xmlCache: entsoe.NewXMLDocumentCache(), + dataServiceClient: newDataServiceClient(config), weatherCache: WeatherForecastCache{ cacheDuration: 2 * time.Hour, }, @@ -247,19 +251,19 @@ func (s *MinerScheduler) Start(ctx context.Context, serverOnly bool) error { dataDB = nil } else { s.db = dataDB - - // Load latest MPC decisions from database - if decisions, err := s.loadLatestMPCDecisions(ctx); err != nil { - s.logger.Printf("Warning: Failed to load MPC decisions from database: %v", err) - } else if len(decisions) > 0 { - s.mu.Lock() - s.mpcDecisions = decisions - s.mu.Unlock() - s.logger.Printf("Loaded %d MPC decisions from database on startup", len(decisions)) - } } } + // Load latest MPC decisions from data-service on startup + if decisions, err := s.loadLatestMPCDecisions(ctx); err != nil { + s.logger.Printf("Warning: Failed to load MPC decisions from data-service: %v", err) + } else if len(decisions) > 0 { + s.mu.Lock() + s.mpcDecisions = decisions + s.mu.Unlock() + s.logger.Printf("Loaded %d MPC decisions from data-service on startup", len(decisions)) + } + // Calculate initial delays now := time.Now() minersControlInitialDelay := s.getInitialDelay(now, config.CheckPriceInterval) + time.Second