diff --git a/README.md b/README.md index 9200b97..b7db400 100644 --- a/README.md +++ b/README.md @@ -216,8 +216,8 @@ go run ./cmd/sniffer.go -i can0 | jq . - Cross-field validation is not yet implemented (stubs exist for future work). - One physical bus per client. -- Real bus (CAN/USB) write support is in progress; use `Replay` for testing writes. -- Transport Protocol (BAM and RTS/CTS) writes work in replay mode; real bus transmit is pending. +- Address claiming uses a 1500ms default timeout; on heavily contested buses, increase via `WithClaimTimeout`. +- Transport Protocol receive through the `Client` read API delivers only the first 8 bytes of reassembled payloads (TP send works fully). ## License diff --git a/client.go b/client.go index 2b89488..a8c8854 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package n2k import ( "context" + "encoding/binary" "errors" "fmt" "iter" @@ -9,13 +10,22 @@ import ( "reflect" "sync" "sync/atomic" + "time" "github.com/brutella/can" + "github.com/open-ships/n2k/internal/adapter" + "github.com/open-ships/n2k/internal/canbus" + "github.com/open-ships/n2k/internal/claiming" + "github.com/open-ships/n2k/internal/decoder" "github.com/open-ships/n2k/internal/framer" "github.com/open-ships/n2k/internal/transport" "github.com/open-ships/n2k/pgn" ) +// defaultClaimTimeout is the maximum time NewClient blocks waiting for address +// claiming to complete. +const defaultClaimTimeout = 1500 * time.Millisecond + // Client is the central integration point for NMEA 2000 communication. It // composes address claiming, transport protocol, encoding, and framing into a // single type that can both read and write PGN messages. @@ -32,7 +42,7 @@ type Client struct { deviceName uint64 // writeFrame sends a single CAN frame. For replay sources this records the - // frame; for real buses it writes to the hardware. + // frame; for bus clients it writes to the hardware. writeFrame func(can.Frame) error // tp is the transport protocol manager used for multi-frame messages that @@ -51,10 +61,29 @@ type Client struct { // reconstruct the same configuration. opts []Option - // mu guards writtenFrames and closed state. + // mu guards writtenFrames, closed state, and sourceAddr. mu sync.Mutex writtenFrames []can.Frame // captured frames (replay/testing) closed bool + + bus canbus.Interface + claimer *claiming.Claimer + addrErr error // set by OnFatalError during address claiming + + // addrReady is closed once address claiming completes (or immediately for replay). + addrReady chan struct{} + + // msgCh delivers decoded messages from the internal read loop to the read API. + // nil for replay clients. + msgCh chan pgn.Message + + // readAdapter and readDecoder are the persistent decode pipeline for the + // internal read loop. Only used for bus clients. + readAdapter *adapter.CANAdapter + readDecoder *decoder.Decoder + + // readFilter holds the compiled CEL filter for the read API (nil if no filter). + readFilter *filter } type writeJob struct { @@ -63,7 +92,7 @@ type writeJob struct { } // NewClient creates a Client that can read and write NMEA 2000 messages. -// At least one source option (CAN, USB, or Replay) must be provided. +// At least one source option (CAN, USB, Replay, or Bus) must be provided. func NewClient(ctx context.Context, opts ...Option) (*Client, error) { cfg := config{} for _, o := range opts { @@ -86,21 +115,37 @@ func NewClient(ctx context.Context, opts ...Option) (*Client, error) { deviceName = DefaultDeviceName().Pack(true) } - // Determine source address. - var sourceAddr uint8 - if cfg.sourceAddress != nil { - sourceAddr = *cfg.sourceAddress - } else { - // Default to 0 for replay, 253 for real buses. - sourceAddr = 0 + // Determine if we have a hardware bus or replay-only. + hasBus := cfg.bus != nil + if !hasBus { for _, src := range cfg.sources { if _, ok := src.(*replaySource); !ok { - sourceAddr = 253 + hasBus = true break } } } + // Determine source address. + var sourceAddr uint8 + if cfg.sourceAddress != nil { + sourceAddr = *cfg.sourceAddress + } else if hasBus { + sourceAddr = 253 + } + // else: replay default is 0 + + // Compile CEL filter early if configured. + var readFilter *filter + if cfg.filterExpr != "" { + var err error + readFilter, err = compileFilter(cfg.filterExpr) + if err != nil { + cancel() + return nil, fmt.Errorf("n2k: compiling filter: %w", err) + } + } + c := &Client{ cfg: cfg, ctx: ctx, @@ -109,48 +154,260 @@ func NewClient(ctx context.Context, opts ...Option) (*Client, error) { sourceAddr: sourceAddr, deviceName: deviceName, opts: opts, + addrReady: make(chan struct{}), + readFilter: readFilter, } - // Determine if we have a real bus or replay-only. - hasRealBus := false - for _, src := range cfg.sources { - if _, ok := src.(*replaySource); !ok { - hasRealBus = true - break + if hasBus { + if err := c.initBus(cfg); err != nil { + cancel() + return nil, err + } + } else { + // Replay path: capture frames in memory. + close(c.addrReady) + + c.writeFrame = func(f can.Frame) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return errors.New("n2k: client closed") + } + c.writtenFrames = append(c.writtenFrames, f) + return nil } + + // Create transport manager for BAM/RTS-CTS sends. + c.tp = transport.NewManager(transport.ManagerConfig{ + WriteFrame: c.writeFrame, + Logger: c.log, + }) } - if hasRealBus { - // TODO: Real bus integration — create canbus.Interface, run it, set - // up address claiming. For now, return an error indicating this path - // is not yet implemented. - cancel() - return nil, errors.New("n2k: real bus (CAN/USB) client not yet implemented; use Replay for testing") + // Start the single writer goroutine for FIFO ordering. + c.writeCh = make(chan writeJob, 64) + c.writeWg.Add(1) + go c.writeLoop() + + return c, nil +} + +// initBus sets up the CAN bus integration: bus interface, address claiming, +// transport protocol, and the internal read/decode pipeline. +func (c *Client) initBus(cfg config) error { + // Get or construct the bus interface. + if cfg.bus != nil { + c.bus = cfg.bus + } else { + // Construct from first non-replay source. + for _, src := range cfg.sources { + switch s := src.(type) { + case *socketCANSource: + c.bus = canbus.NewSocketCAN(c.log, s.iface, c.handleBusFrame) + case *usbCANSource: + c.bus = canbus.NewUSBCAN(c.log, s.port, c.handleBusFrame) + } + if c.bus != nil { + break + } + } + } + if c.bus == nil { + return errors.New("n2k: could not construct bus from sources") + } + + // If the bus supports setting the handler after construction, set it now. + if hs, ok := c.bus.(canbus.HandlerSettable); ok { + hs.SetHandler(c.handleBusFrame) } - // Replay path: no real bus, capture frames in memory. + // Set writeFrame to delegate to the bus. c.writeFrame = func(f can.Frame) error { c.mu.Lock() - defer c.mu.Unlock() - if c.closed { + closed := c.closed + c.mu.Unlock() + if closed { return errors.New("n2k: client closed") } - c.writtenFrames = append(c.writtenFrames, f) - return nil + return c.bus.WriteFrame(f) } - // Create transport manager for BAM/RTS-CTS sends. + // Create transport manager with OnComplete that feeds decoded TP messages + // through the read pipeline. c.tp = transport.NewManager(transport.ManagerConfig{ WriteFrame: c.writeFrame, - Logger: c.log, + OnComplete: func(tpPGN uint32, source uint8, destination uint8, data []byte) { + // Build a synthetic CAN frame for the reassembled TP message + // and feed it through the decode pipeline. + canID := framer.BuildCANID(tpPGN, 6, source, destination) + f := can.Frame{ + ID: canID, + Length: uint8(min(len(data), 8)), + } + copy(f.Data[:], data) + // The adapter + decoder pipeline handles the rest. + c.readAdapter.HandleMessage(&f) + }, + Logger: c.log, + }) + + // Set up the persistent decode pipeline. + c.readAdapter = adapter.NewCANAdapter() + c.readDecoder = decoder.New() + c.readDecoder.SetOutput(&clientDecoderHandler{client: c}) + c.readAdapter.SetOutput(c.readDecoder) + + // Create the message channel for the read API. + c.msgCh = make(chan pgn.Message, 64) + + // Determine claiming mode. + mode := claiming.ModeAuto + if cfg.sourceAddress != nil { + mode = claiming.ModeExplicit + } + + // Create the claimer. + c.claimer = claiming.New(claiming.Config{ + Mode: mode, + Address: c.sourceAddr, + Name: c.deviceName, + WriteFrame: c.writeFrame, + OnAddressChange: func(newAddr uint8) { + c.mu.Lock() + c.sourceAddr = newAddr + c.mu.Unlock() + }, + OnFatalError: func(err error) { + c.log.Error("address claiming fatal error", "error", err) + c.mu.Lock() + c.addrErr = err + c.mu.Unlock() + }, + Logger: c.log, }) - // Start the single writer goroutine for FIFO ordering. - c.writeCh = make(chan writeJob, 64) - c.writeWg.Add(1) - go c.writeLoop() + // Start the bus read loop goroutine. + go c.busReadLoop() - return c, nil + // Send the initial address claim. + if err := c.claimer.Start(); err != nil { + return fmt.Errorf("n2k: starting address claim: %w", err) + } + + // Wait for the claim timeout to allow the network to respond. + claimTimeout := defaultClaimTimeout + if cfg.claimTimeout != nil { + claimTimeout = *cfg.claimTimeout + } + + timer := time.NewTimer(claimTimeout) + defer timer.Stop() + + select { + case <-timer.C: + case <-c.ctx.Done(): + return c.ctx.Err() + } + + // Check if a fatal error occurred during the claim window. + c.mu.Lock() + if c.addrErr != nil { + err := c.addrErr + c.mu.Unlock() + return fmt.Errorf("n2k: address claiming failed: %w", err) + } + c.mu.Unlock() + + // Check if we got a valid address. + addr := c.claimer.Address() + if addr == 254 { + return errors.New("n2k: address claiming failed — all addresses exhausted or contention in explicit mode") + } + + // Update sourceAddr with the final claimed address. + c.mu.Lock() + c.sourceAddr = addr + c.mu.Unlock() + + close(c.addrReady) + return nil +} + +// handleBusFrame is the central frame router called for every incoming CAN frame. +func (c *Client) handleBusFrame(frame can.Frame) { + info := adapter.NewPacketInfo(&frame) + + // Route address claim frames (PGN 60928) to the claimer. + if info.PGN == 60928 && frame.Length == 8 { + name := binary.LittleEndian.Uint64(frame.Data[:]) + c.claimer.HandleAddressClaim(info.SourceId, name) + } + + // Route ISO requests (PGN 59904) for address claim to the claimer. + if info.PGN == 59904 && frame.Length >= 3 { + requestedPGN := uint32(frame.Data[0]) | uint32(frame.Data[1])<<8 | uint32(frame.Data[2])<<16 + if requestedPGN == 60928 { + c.claimer.HandleISORequest() + } + } + + // Route transport protocol frames to the TP manager. + if info.PGN == 60416 || info.PGN == 60160 { + c.tp.HandleFrame(frame) + } + + // Pre-filter: skip decoding if metadata doesn't match. + if c.readFilter != nil && !c.readFilter.evalPre(info) { + return + } + + // Decode for the read API using the persistent pipeline. + c.readAdapter.HandleMessage(&frame) +} + +// busReadLoop runs the bus and closes the message channel when done. +func (c *Client) busReadLoop() { + defer close(c.msgCh) + if err := c.bus.Run(c.ctx); err != nil && c.ctx.Err() == nil { + c.log.Error("bus read loop error", "error", err) + } +} + +// clientDecoderHandler receives decoded messages from the decoder pipeline and +// delivers them to the client's msgCh. +type clientDecoderHandler struct { + client *Client +} + +func (h *clientDecoderHandler) HandleStruct(msg pgn.Message) { + if msg == nil { + return + } + + if u, ok := msg.(*pgn.UnknownPGN); ok { + if !h.client.cfg.includeUnknown { + h.client.cfg.logger.Debug("dropping unknown PGN", + "pgn", u.Info.PGN, "reason", u.Reason) + return + } + } + + if h.client.readFilter != nil && h.client.readFilter.hasPost { + fields := structToFilterMap(msg) + rv := reflect.ValueOf(msg) + if rv.Kind() == reflect.Pointer { + rv = rv.Elem() + } + info := rv.FieldByName("Info").Interface().(pgn.MessageInfo) + if !h.client.readFilter.evalPostWithInfo(info, fields) { + return + } + } + + select { + case h.client.msgCh <- msg: + case <-h.client.ctx.Done(): + } } // Write asynchronously encodes and transmits a PGN message. The message must @@ -181,6 +438,13 @@ func (c *Client) writeLoop() { // doWrite performs the synchronous work of encoding and framing a PGN message. func (c *Client) doWrite(msg pgn.Message) error { + // Wait for address readiness before sending. + select { + case <-c.addrReady: + case <-c.ctx.Done(): + return c.ctx.Err() + } + pgnNum := msg.PGNNumber() encoder, ok := pgn.EncoderLookup[pgnNum] @@ -206,7 +470,12 @@ func (c *Client) doWrite(msg pgn.Message) error { destination = *info.TargetId } - canID := framer.BuildCANID(pgnNum, priority, c.sourceAddr, destination) + // Read sourceAddr under lock. + c.mu.Lock() + srcAddr := c.sourceAddr + c.mu.Unlock() + + canID := framer.BuildCANID(pgnNum, priority, srcAddr, destination) if len(payload) <= 8 { frame := framer.FrameSingle(canID, payload) @@ -229,19 +498,38 @@ func (c *Client) doWrite(msg pgn.Message) error { return nil } - return c.tp.SendBAM(pgnNum, c.sourceAddr, payload) + return c.tp.SendBAM(pgnNum, srcAddr, payload) } -// Receive returns an iterator of decoded NMEA 2000 messages from the -// configured sources. It delegates to the top-level n2k.Receive function -// with the same options used to create this Client. +// Receive returns an iterator of decoded NMEA 2000 messages. For bus clients +// it reads from the internal message channel; for replay clients it delegates +// to the top-level Receive function. func (c *Client) Receive() iter.Seq2[pgn.Message, error] { + if c.msgCh != nil { + return func(yield func(pgn.Message, error) bool) { + for msg := range c.msgCh { + if !yield(msg, nil) { + return + } + } + } + } return Receive(c.ctx, c.opts...) } -// Scanner creates a new Scanner that reads from the configured sources. It -// delegates to n2k.NewScanner with the same options used to create this Client. +// Scanner creates a new Scanner that reads from this client. For bus clients +// it reads from the internal message channel; for replay clients it delegates +// to NewScanner. func (c *Client) Scanner() *Scanner { + if c.msgCh != nil { + s := &Scanner{ + ctx: c.ctx, + cfg: c.cfg, + ch: c.msgCh, + } + s.once.Do(func() {}) // prevent lazy start since ch is already live + return s + } return NewScanner(c.ctx, c.opts...) } @@ -272,5 +560,8 @@ func (c *Client) Close() error { if c.tp != nil { c.tp.Close() } + if c.bus != nil { + return c.bus.Close() + } return nil } diff --git a/client_test.go b/client_test.go index c0f3c10..bdd0e7b 100644 --- a/client_test.go +++ b/client_test.go @@ -3,7 +3,9 @@ package n2k import ( "context" "strings" + "sync" "testing" + "time" "github.com/brutella/can" "github.com/open-ships/n2k/internal/framer" @@ -145,7 +147,7 @@ func TestClient_Write_AfterClose(t *testing.T) { assert.Contains(t, err.Error(), "closed") } -func TestClient_Receive(t *testing.T) { +func TestClient_Replay_Receive(t *testing.T) { // Build a pre-encoded VesselHeading frame for replay. heading := float32(1.5) msg := &pgn.VesselHeading{ @@ -284,6 +286,19 @@ func TestClient_Write_FastPacket(t *testing.T) { } } +func TestClient_CANSource_NoLongerStubbed(t *testing.T) { + // CAN("can0") should attempt to construct a bus client, not error with + // "not yet implemented". On a machine without can0, it will fail with a + // hardware error — but NOT "not yet implemented". + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + _, err := NewClient(ctx, CAN("can_nonexistent_99")) + require.Error(t, err) + assert.NotContains(t, err.Error(), "not yet implemented", + "should no longer return the old stub error") +} + // --- Pointer helpers for building test PGN structs --- func ptrFloat32(v float32) *float32 { return &v } @@ -448,3 +463,227 @@ func TestClient_Write_FIFO_Ordering(t *testing.T) { "message %d Heading should match write order", i) } } + +// --- Mock bus for bus path tests --- + +type mockBus struct { + inbound chan can.Frame + written []can.Frame + mu sync.Mutex + handler func(can.Frame) +} + +func newMockBus() *mockBus { + return &mockBus{inbound: make(chan can.Frame, 64)} +} + +func (m *mockBus) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case f, ok := <-m.inbound: + if !ok { + return nil + } + if m.handler != nil { + m.handler(f) + } + } + } +} + +func (m *mockBus) Close() error { return nil } + +func (m *mockBus) WriteFrame(frame can.Frame) error { + m.mu.Lock() + defer m.mu.Unlock() + m.written = append(m.written, frame) + return nil +} + +func (m *mockBus) SetHandler(h func(can.Frame)) { + m.handler = h +} + +func (m *mockBus) getWritten() []can.Frame { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]can.Frame, len(m.written)) + copy(out, m.written) + return out +} + +func TestClient_AddressClaim(t *testing.T) { + mb := newMockBus() + + c, err := NewClient(context.Background(), + WithBus(mb), + WithClaimTimeout(250*time.Millisecond), + ) + require.NoError(t, err) + defer func() { _ = c.Close() }() + + // The first written frame should be an address claim (PGN 60928). + written := mb.getWritten() + require.NotEmpty(t, written, "client should have written at least one frame (address claim)") + + // Parse the CAN ID to extract the PGN properly (accounting for addressed PGNs). + firstFrame := written[0] + rawPGN := (firstFrame.ID & 0x3FFFF00) >> 8 + pduFormat := uint8((rawPGN >> 8) & 0xFF) + if pduFormat < 240 { + rawPGN &= 0xFFF00 // mask off destination byte for addressed PGNs + } + assert.Equal(t, uint32(60928), rawPGN, "first frame should be address claim PGN 60928") +} + +func TestClient_Write(t *testing.T) { + mb := newMockBus() + + c, err := NewClient(context.Background(), + WithBus(mb), + WithClaimTimeout(250*time.Millisecond), + ) + require.NoError(t, err) + defer func() { _ = c.Close() }() + + heading := float32(1.5) + msg := &pgn.VesselHeading{ + Heading: &heading, + } + msg.Info.Priority = ptrUint8(2) + + wr := c.Write(msg) + require.NoError(t, wr.Wait()) + + // Find the VesselHeading frame in the written frames (skip address claim frames). + written := mb.getWritten() + var found bool + for _, f := range written { + framePGN := (f.ID & 0x3FFFF00) >> 8 + if framePGN == 127250 { + found = true + break + } + } + assert.True(t, found, "should find a VesselHeading (PGN 127250) frame in written frames") +} + +func TestClient_Receive(t *testing.T) { + mb := newMockBus() + + c, err := NewClient(context.Background(), + WithBus(mb), + WithClaimTimeout(250*time.Millisecond), + ) + require.NoError(t, err) + + // Build and inject a VesselHeading frame into the mock bus. + heading := float32(1.5) + msg := &pgn.VesselHeading{ + Heading: &heading, + } + encoder := pgn.EncoderLookup[127250] + require.NotNil(t, encoder) + payload, err := encoder(msg) + require.NoError(t, err) + + canID := framer.BuildCANID(127250, 2, 42, 255) + frame := framer.FrameSingle(canID, payload) + + // Inject the frame into the mock bus inbound channel. + mb.inbound <- frame + + // Read messages via msgCh until we get a VesselHeading (skip address claim + // echoes that may arrive first from the claimer's own WriteFrame calls). + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var received *pgn.VesselHeading + done := make(chan struct{}) + go func() { + defer close(done) + for m, err := range c.Receive() { + if err != nil { + return + } + if vh, ok := m.(*pgn.VesselHeading); ok { + received = vh + return + } + } + }() + + select { + case <-done: + case <-ctx.Done(): + t.Fatal("timed out waiting for VesselHeading message") + } + + require.NotNil(t, received, "should have received a decoded VesselHeading") + require.NotNil(t, received.Heading) + assert.InDelta(t, 1.5, float64(*received.Heading), 0.001) + + _ = c.Close() +} + +func TestClient_FilterPGN(t *testing.T) { + mb := newMockBus() + + // Filter: only accept PGN 127250 (VesselHeading). + c, err := NewClient(context.Background(), + WithBus(mb), + WithClaimTimeout(250*time.Millisecond), + Filter("pgn == 127250"), + ) + require.NoError(t, err) + + // Build a VesselHeading frame (should pass filter). + heading := float32(1.5) + encoder := pgn.EncoderLookup[127250] + require.NotNil(t, encoder) + payload, err := encoder(&pgn.VesselHeading{Heading: &heading}) + require.NoError(t, err) + headingFrame := framer.FrameSingle(framer.BuildCANID(127250, 2, 42, 255), payload) + + // Build a SystemTime frame PGN 126992 (should be filtered out). + sysEncoder := pgn.EncoderLookup[126992] + require.NotNil(t, sysEncoder) + sysPayload, err := sysEncoder(&pgn.SystemTime{}) + require.NoError(t, err) + sysFrame := framer.FrameSingle(framer.BuildCANID(126992, 3, 42, 255), sysPayload) + + // Send the SystemTime first, then VesselHeading. + mb.inbound <- sysFrame + mb.inbound <- headingFrame + + // We should only receive VesselHeading. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var received pgn.Message + done := make(chan struct{}) + go func() { + defer close(done) + for m, err := range c.Receive() { + if err != nil { + return + } + received = m + return + } + }() + + select { + case <-done: + case <-ctx.Done(): + t.Fatal("timed out waiting for message") + } + + require.NotNil(t, received) + _, ok := received.(*pgn.VesselHeading) + assert.True(t, ok, "expected VesselHeading, got %T", received) + + _ = c.Close() +} diff --git a/internal/canbus/interface.go b/internal/canbus/interface.go index a8c3d08..eac4081 100644 --- a/internal/canbus/interface.go +++ b/internal/canbus/interface.go @@ -6,6 +6,13 @@ import ( "github.com/brutella/can" ) +// HandlerSettable is optionally implemented by Interface implementations that +// support setting the frame handler after construction (e.g., for testing with +// mock buses where the handler isn't known at construction time). +type HandlerSettable interface { + SetHandler(func(can.Frame)) +} + // Interface is a basic interface for a CANbus implementation. // Any CAN bus transport (USB-CAN dongle, SocketCAN, etc.) must implement these three methods // to be usable as a CAN channel in the system. diff --git a/internal/canbus/socketcan.go b/internal/canbus/socketcan.go index 06b7612..9071357 100644 --- a/internal/canbus/socketcan.go +++ b/internal/canbus/socketcan.go @@ -106,11 +106,29 @@ func (c *socketCANChannel) Close() error { // WriteFrame sends a single CAN frame out on the SocketCAN bus. // The brutella/can library handles encoding the frame into the Linux SocketCAN wire format -// and writing it to the raw CAN socket. +// and writing it to the raw CAN socket. Returns an error if the bus is not yet open. func (c *socketCANChannel) WriteFrame(frame can.Frame) error { + if c.bus == nil { + return errors.New("socketCAN: bus not open (interface not available or Run not called)") + } return c.bus.Publish(frame) } +// NewSocketCAN creates a SocketCAN Interface for the given Linux CAN interface name. +// The handler callback receives each incoming CAN frame. The interface is not opened +// until Run() is called. +func NewSocketCAN(log *slog.Logger, iface string, handler func(can.Frame)) Interface { + var frameHandler can.HandlerFunc = func(frame can.Frame) { + if handler != nil { + handler(frame) + } + } + return newSocketCANChannel(log, socketCANChannelOptions{ + InterfaceName: iface, + MessageHandler: frameHandler, + }) +} + // RunSocketCAN creates a SocketCAN channel for the given interface and runs it, // calling handler for each received CAN frame. The interface must already be configured and up. // Blocks until error or context done. diff --git a/internal/canbus/usbcan.go b/internal/canbus/usbcan.go index 736a659..a10129d 100644 --- a/internal/canbus/usbcan.go +++ b/internal/canbus/usbcan.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "slices" + "sync" "github.com/brutella/can" "go.bug.st/serial" @@ -61,6 +62,9 @@ type usbCANChannel struct { // It is nil until Run() is called. port serial.Port + // mu guards all access to port, since Run() reads and WriteFrame() writes concurrently. + mu sync.Mutex + // log is the structured logger for debug/info messages about frame parsing and errors. log *slog.Logger } @@ -113,7 +117,9 @@ func (c *usbCANChannel) Run(ctx context.Context) error { // Read up to 32 bytes at a time from the serial port. // The actual number of bytes returned depends on what the OS has buffered. working := make([]byte, 32) + c.mu.Lock() readBytes, err := port.Read(working) + c.mu.Unlock() if err != nil { return err } @@ -329,7 +335,9 @@ func (c *usbCANChannel) WriteFrame(frame can.Frame) error { buf = append(buf, frame.Data[0:frame.Length]...) buf = append(buf, 0x55) + c.mu.Lock() o, err := c.port.Write(buf) + c.mu.Unlock() if o != len(buf) { return fmt.Errorf("WriteFrame sent %d of %d bytes", o, len(buf)) } @@ -340,6 +348,22 @@ func (c *usbCANChannel) WriteFrame(frame can.Frame) error { return nil } +// NewUSBCAN creates a USB-CAN Interface for the given serial port. +// The handler callback receives each incoming CAN frame. The interface is not opened +// until Run() is called. +func NewUSBCAN(log *slog.Logger, port string, handler func(can.Frame)) Interface { + var frameHandler can.HandlerFunc = func(frame can.Frame) { + if handler != nil { + handler(frame) + } + } + return newUSBCANChannel(log, usbCANChannelOptions{ + SerialPortName: port, + SerialBaudRate: 2000000, + FrameHandler: frameHandler, + }) +} + // RunUSBCAN creates a USB-CAN channel for the given serial port and runs it, // calling handler for each received CAN frame. Blocks until error or context done. func RunUSBCAN(ctx context.Context, log *slog.Logger, port string, handler func(can.Frame)) error { diff --git a/options.go b/options.go index 9d38a86..62f7e99 100644 --- a/options.go +++ b/options.go @@ -3,8 +3,10 @@ package n2k import ( "errors" "log/slog" + "time" "github.com/brutella/can" + "github.com/open-ships/n2k/internal/canbus" ) type config struct { @@ -12,12 +14,14 @@ type config struct { filterExpr string includeUnknown bool logger *slog.Logger - sourceAddress *uint8 // nil = auto mode - deviceName *DeviceName // nil = use default + sourceAddress *uint8 // nil = auto mode + deviceName *DeviceName // nil = use default + claimTimeout *time.Duration // nil = use default (1500ms) + bus canbus.Interface // pre-constructed bus (internal/testing use) } func (c *config) validate() error { - if len(c.sources) == 0 { + if len(c.sources) == 0 && c.bus == nil { return errors.New("n2k: at least one source (CAN, USB, or Replay) is required") } return nil @@ -86,6 +90,15 @@ func WithSourceAddress(addr uint8) Option { }) } +// WithClaimTimeout sets how long NewClient blocks waiting for address claiming +// to complete on a real CAN bus. Default is 1500ms. This allows time for the +// initial 250ms claim window plus several rounds of contention renegotiation. +func WithClaimTimeout(d time.Duration) Option { + return optionFunc(func(c *config) { + c.claimTimeout = &d + }) +} + // WithName sets the ISO 11783 device NAME used for address claiming. // The NAME is a 64-bit identifier that uniquely identifies this device on the // NMEA 2000 network. In address contention, the device with the lower NAME wins. @@ -95,3 +108,12 @@ func WithName(name DeviceName) Option { c.deviceName = &name }) } + +// WithBus provides a pre-constructed canbus.Interface for the client to use. +// This is primarily for testing with mock buses. When set, the client uses this +// bus directly instead of constructing one from CAN/USB sources. +func WithBus(bus canbus.Interface) Option { + return optionFunc(func(c *config) { + c.bus = bus + }) +} diff --git a/options_test.go b/options_test.go index 58875af..88da017 100644 --- a/options_test.go +++ b/options_test.go @@ -3,9 +3,11 @@ package n2k import ( "log/slog" "testing" + "time" "github.com/brutella/can" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestOptions(t *testing.T) { @@ -40,3 +42,10 @@ func TestFilterOption(t *testing.T) { Filter(`pgn == 127250`).apply(&cfg) assert.Equal(t, `pgn == 127250`, cfg.filterExpr) } + +func TestWithClaimTimeout(t *testing.T) { + cfg := config{} + WithClaimTimeout(2 * time.Second).apply(&cfg) + require.NotNil(t, cfg.claimTimeout) + assert.Equal(t, 2*time.Second, *cfg.claimTimeout) +}