Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified EEPROM/gps.mpy
Binary file not shown.
163 changes: 100 additions & 63 deletions EEPROM/gps.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,126 @@
# Minimal length method names to make the mpy file as small as possible so it will fit in the 2k hexpansion EEPROM.
# Minimal functionality to get a GPS fix and display it
#
import app
from app_components.tokens import button_labels
from events.input import Buttons, BUTTON_TYPES
import asyncio
import time

from events import Event
from system.eventbus import eventbus
from system.scheduler.events import RequestForegroundPushEvent, RequestStopAppEvent
from machine import UART, Pin

class GPSApp(app.App): # pylint: disable=no-member
""" App to get GPS data from a GPS module connected to the hexpansion and display it on the badge. """
VERSION = 1 # Increment this when making changes to the app that require the hexpansion app to be re-flashed with the new code.

class GPSApp(app.App):
"""Provides a GPS API for apps to use directly and GPS Events that other apps may subscribe to."""

VERSION = 2 # Increment this when making changes to the app that require the hexpansion app to be re-flashed with the new code.

class GPSEvent(Event):
def __init__(self, position, speed, bearing):
self.position = position
self.speed = speed
self.bearing = bearing

def __str__(self):
return f"GPS fix {self.position}, speed {self.speed} knots, bearing {self.bearing}°"

def __init__(self, config=None):
super().__init__()

# Config is mandatory, we're running from the EEPROM
if config is None:
raise TypeError # The app should not be run without a config as it won't work (shouldn't happen anyway if run from the hexpansion EEPROM)
self.config = config # Enables HexManager to check which port app is associated with
self.t = config.pin[0]
self.x = config.pin[1]
raise TypeError
self.config = config

# GPS fix data
self._position = None
self._bearing = 0.0
self._speed = 0.0

# Specifying a small time out to wait before giving up on receiving
# more characters ensures we always read full messages from the UART
# This reduces parse errors due to only having half a message
self.to = 10
self.uart = UART(1, baudrate=9600, tx=config.pin[0], rx=config.pin[1], timeout=self.to)

# Reset pin
self.r = config.pin[2]
self.b = Buttons(self)
self.l = None # Last GPS fix as a string in the format "lat,lon" or None if no fix yet. Latitude and longitude are rounded to 5 decimal places which gives a precision of about 1 meter, more than enough for badgebot's purposes.
self.u = UART(1, baudrate=9600, tx=self.t, rx=self.x)
self.r.init(mode=Pin.OUT)
self.r.value(1)
self.z = -1 # Ticks timer - time since GPS reset, used to control when to release the GPS from reset after resetting it
# and then used to time how long since last valid GPS fix.
# also used as a flag (-1) to indicate that ForegroundPushEvent has not yet been emitted
eventbus.on_async(RequestStopAppEvent, self.s, self)

async def s(self, _e: RequestStopAppEvent):
""" handle app stop """
if _e.app == self:
self.r.value(1)
self.u.deinit()
# Time since last valid GPS fix
self.z = 0

@property
def position(self):
"""Position as a (latitude, longitude) tuple"""
return self._position

@property
def bearing(self):
"""Course over ground in degrees from true north"""
return self._bearing

def update(self, _d):
""" Update the app state - expire last_fix if it is too old """
if self.b.get(BUTTON_TYPES["CANCEL"]):
self.b.clear()
self.minimise()
@property
def speed(self):
"""Ground speed in knots"""
return round(self._speed, 2)

if self.z < 0:
eventbus.emit(RequestForegroundPushEvent(self))
async def background_task(self):
"""Override the default background task behaviour to give more time to other apps"""
last = time.ticks_ms()
while True:
start = time.ticks_ms()
delta = time.ticks_diff(start, last)
result = self.background_update(delta)
# Get successive messages fast, but yield more time to other apps
# if there was nothing to read, this lowers the frequency of
# occurrances of blocking for the full read timeout to elapse when
# nothing is being sent over the UART
await asyncio.sleep_ms(25 if result else 250 - self.to)
last = start

self.z +=_d
def background_update(self, delta):
self.z += delta

# Delay releasing the reset pin a little bit
if self.r.value():
if self.z > 99:
self.r.value(0)

if self.l:
# Clear fix data if we haven't had a fix for a while
if self._position:
if self.z > 9999:
self.l = None
self._position = None
self._speed = 0

def background_update(self, _d):
""" Update the app state in the background - read GPS data """
l = self.u.readline()
l = self.uart.readline()
if l:
#print(l)
try:
p = l.decode().strip().split(',')
if (p[0] != "$GPRMC" and p[0] != "$GNRMC") or p[2] != "A" or not p[3] or not p[5]:
return None
t = float(p[3][:2]) + float(p[3][2:]) / 60
n = float(p[5][:3]) + float(p[5][3:]) / 60
if p[4] == "S":
t = -t
if p[6] == "W":
n = -n
self.l = str(round(t, 5))
self.n = str(round(n, 5))
self.z = 0
except (UnicodeError, ValueError, AttributeError):
if p[0] == "$GPRMC" or p[0] == "$GNRMC":
if p[2] == "A":
lat = float(p[3][:2]) + float(p[3][2:]) / 60
lon = float(p[5][:3]) + float(p[5][3:]) / 60
if p[4] == "S":
lat = -lat
if p[6] == "W":
lon = -lon
self._position = (round(lat, 5), round(lon, 5))
self._speed = float(p[7])
self._bearing = float(p[8])

# Eliminate satellite jitter when stationary by rounding
# very small velocities to zero
if self._speed < 1:
self._speed = 0

# Reset the time since last fix if we successfully got a valid fix message
self.z = 0

# Send event to subscribers
eventbus.emit(self.GPSEvent(self._position, self._speed, self._bearing))
except (UnicodeError, ValueError, AttributeError, IndexError):
pass
return True
return False


def draw(self, _c):
_c.font_size = 40 # not using defined sizes to save bytes in the mpy file
_c.rgb(0, 0.2, 0).rectangle(-120, -120, 240, 240).fill()
_c.rgb(0, 1, 0).move_to(-35, -50).text("GPS")
if self.l:
_c.move_to(-110, 0).text("Lat:" + self.l)
_c.move_to(-110, 40).text("Lon:" + self.n)
else:
_c.move_to(-110, 0).text("Searching...")
button_labels(_c, cancel_label="Back")

__app_export__ = GPSApp #pylint: disable=invalid-name
__app_export__ = GPSApp # pylint: disable=invalid-name
Loading