Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions src/arm_test_python/arm_test_python/science_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import numpy as np
import serial
import traceback
import warnings

FLUORO_WAVELENGTHS = [515, 590]

class SensorsRawNode(Node):
def __init__(self):
super().__init__('sensors_raw')
self.pub_fluoro = self.create_publisher(Int16MultiArray, '/sci_fluoro_raw', 1)
self.pub_soil = self.create_publisher(Int16MultiArray, '/sci_soil_raw', 1)
self.pub_soil = self.create_publisher(Float64MultiArray, '/sci_soil_raw', 1)
self.pub_ambient = self.create_publisher(Float64MultiArray, '/sci_ambient_raw', 1)
self.pub_arduino = self.create_publisher(String, '/sci_arduino_messages', 1)

self.serial_data_port = self.declare_parameter('/serial_data_port', '/dev/ttyUSB0').value
Expand All @@ -29,11 +31,17 @@ def __init__(self):
# initialize empty fluorometer message
self.fluoro_vals = Int16MultiArray()
# Index 0 = 515nm, Index 1 = 590nm
self.fluoro_vals.data = [0] * 2
self.fluoro_vals.data = [ 0 for _ in FLUORO_WAVELENGTHS ]

# initialize empty soil sensor message
self.soil_vals = Int16MultiArray()
self.soil_vals.data = [0,0]
self.soil_vals = Floatt64MultiArray()
# Index 0 = Soil Temp, Index 1 = Soil Moisture
self.soil_vals.data = [0., 0.]

# intiialize empty ambient conditions message
self.amb_vals = Float64MultiArray()
# Index 0 = Ambient Temp, Index 1 = Ambient Humidity, Index 2 = Ambient Methane
self.amb_vals.data = [0., 0., 0.]


def operate(self):
Expand All @@ -45,25 +53,54 @@ def operate(self):
try:
# Here we read the serial port for a string that looks like "color:123,color:123", "temp:123,moisture:123", or "debug message"
line = self.ser.readline().decode().strip() #blocking function, will wait until read entire line
vals = line.split(",") # split by commas into individual data points or clauses

if len(vals[0].split(":")) == 1: # evaluates to true if there are no colons in the substring
arduino_message = String()
arduino_message.data = line
self.pub_arduino.publish(arduino_message)
self.ser.write("\n".encode()) # send the arduino a newline character so it moves on (hopefully)
elif len(vals) == 2 and vals[0].startswith("fluoro"):
self.fluoro_vals.data = [int(val.split(":")[1]) for val in vals]
self.pub_fluoro.publish(self.fluoro_vals)
else: # evaluates to true if we're seeing a soil temp/moisture reading
self.soil_vals.data = [int(val.split(":")[1]) for val in vals]
self.pub_soil.publish(self.soil_vals)


if line.startswith("@"):
if line.startswith("@data"):
typ, dat = line.split(" ")
typ, datc = typ.strip(), dat.strip()
match typ:
case "@data.fluoro":
self.fluoro_vals.data = [ int(i) for i in dat.split(",") ]
self.pub_fluoro.publish(self.fluoro_vals)
case "@data.soil":
self.soil_vals.data = [ float(i) for i in dat.split(",") ]
self.pub_soil.publish(self.soil_vals)
case "@data.ambient":
self.amb_vals.data = [ float(i) for i in dat.split(",") ]
self.pub_ambient.publish(self.amb_vals)
case _:
raise RuntimeError(f"Unexpected @data message ({typ}): \"{line}\"")
elif line.startswith("@warn"):
amsg = String()
text = line[len("@warn "):]
warnings.warn(f"Warning! (from Science System): {text}", RuntimeWarning)
self.end_arduino_message(f"Warning: {text}")
elif line.startswith("@fail"):
amsg = String()
text = line[len("@fail "):]
amsg.data =
warnings.warn(f"Error! (from Science System): {text}", RuntimeWarning)
self.end_arduino_message(f"Error!! Error: {text}")
else:
amsg = String()
amsg.data =
self.end_arduino_message(f"Message: {line}")
except KeyboardInterrupt:
break
except Exception: # the only way we ever get to here is if reading totally fails, which may never happen
except Exception as e: # the only way we ever get to here is if reading totally fails, which may never happen
print('Bad line received on Arduino port - ignoring and continuing.')
warnings.warn(f"Failed to parse Science System response (\"{line}\"): {e.__str__()}", RuntimeWarning)
self.end_arduino_message("[[ failed to parse: \"{line}\" ]]")

def end_arduino_message(self, amsg_text: str):
try:
amsg = String()
amsg.data = amsg_text
self.pub_arduino.publish(amsg)
self.ser.write("\n".encode())
except Exception as e:
warnings.warn(f"Error ending arduino message! {e.__str__()}", RuntimeWarning)
raise e

def main(args=None):
try:
Expand Down