Hello all,
I would like to decode RTCM3 protocol from RTK GPS. How can I do that in RTMaps ? Thanks.
Hello Daniel,
I suggest you use PythonBridge with pyrtcm library.
Here is a sample we have used in the past. Feel free to adapt it to your needs. The idea is to accumulate the buffer on a member variable and parse it once it is complete.
# ---------------- TEMPLATE ---------------------------------------
# This is a template to help you start writing PythonBridge code -
# -----------------------------------------------------------------
import rtmaps.core as rt
import rtmaps.types
from rtmaps.base_component import BaseComponent # base class
import numpy as np
import pyrtcm
# Python class that will be called from RTMaps.
class rtmaps_python(BaseComponent):
# Constructor has to call the BaseComponent parent class
def __init__(self):
BaseComponent.__init__(self) # call base class constructor
def Dynamic(self):
# Adding an input called "in" of ANY type
self.add_input("stream8_input", rtmaps.types.ANY) # define an input
# Define the outputs here
self.add_output("o_1012_DF035", rtmaps.types.AUTO)
self.add_output("o_1012_DF034", rtmaps.types.AUTO)
self.add_output("o_1012_MSG", rtmaps.types.AUTO)
self.add_output("o_1004_DF006", rtmaps.types.AUTO)
self.add_output("o_1004_DF004", rtmaps.types.AUTO)
self.add_output("o_1004_MSG", rtmaps.types.AUTO)
self.add_output("o_1006_MSG", rtmaps.types.AUTO)
self.add_output("o_1033_MSG", rtmaps.types.AUTO)
# Birth() will be called once at diagram execution startup
def Birth(self):
self.buffer = bytearray()
self.state = 0
def DecodeStream(self, buffer):
msg = pyrtcm.RTCMReader.parse(buffer)
wop = msg.identity
print(wop)
if int(wop) == 1012:
self.write("o_1012_MSG", wop)
self.write("o_1012_DF034", msg.DF034)
self.write("o_1012_DF035", msg.DF035)
elif int(wop) == 1004:
self.write("o_1004_MSG", wop)
self.write("o_1004_DF006", msg.DF006)
self.write("o_1004_DF004", msg.DF004)
elif int(wop) == 1006:
self.write("o_1006_MSG", wop)
elif int(wop) == 1033:
self.write("o_1033_MSG", wop)
def DecodeRAWMessage(self):
if self.state == 0: # Looking for magic number
if b"\xd3" in self.buffer:
pos = self.buffer.index(b"\xd3")
if len(self.buffer) > pos + 2:
self.state = 1
byte2 = self.buffer[pos+1]
if byte2 & ~0x03 == 0: # That's a magic number, we start from scratch !
hdr3 = self.buffer[pos+2]
size = (byte2 << 8) | hdr3
self.total_size = size + 6 # including header + CRC
self.buffer = self.buffer[pos:]
if len(self.buffer) >= self.total_size:
self.state = 0 # back to the beggining
msg = self.buffer[:self.total_size]
# Remove buffer we have just read
self.buffer = self.buffer[self.total_size:]
self.DecodeStream(msg)
# More message to decode ?
self.DecodeRAWMessage()
else:
return
else:
return
else:
return
# Core() is called every time you have a new inputs available, depending on your chosen reading policy
def Core(self):
try:
# Read the RAW input
raw_data = self.inputs["stream8_input"].ioelt.data.tobytes()
self.buffer += raw_data
self.DecodeRAWMessage()
if self.state == 1: # waiting for correct size
if len(self.buffer) >= self.total_size:
self.state = 0 # back to the beggining
msg = self.buffer[:self.total_size]
# Remove buffer we have just read
self.buffer = self.buffer[self.total_size:]
self.DecodeStream(msg)
# More message to decode ?
self.DecodeRAWMessage()
except pyrtcm.exceptions.RTCMParseError as e:
rt.report_warning(e)
# Death() will be called once at diagram execution shutdown
def Death(self):
print("Passing through Death()")
1 Like