Traccar RPI

Plenkii4 years ago

hello

I would like to share a simple python script to use a RPI with a usb-GPS to talk to your traccar server.

Google-Drive

api.py

import requests
import pengugps as gpsd
import time


ID	=	"001"	# Identifier when adding new device to the traccar server, should be long and random since there is no authentication
UPDATE_DELAY	=	1	# Seconds waited between each request made to the traccar server
BAD_DATA_DELAY	=	0.1	# Seconds waited before requesting coordinates again from GPS when it first returns invalid data
RECONNECT_DELAY	=	1	# Seconds waited before trying to reconnect to the GPS module when something goes wrong (for example GPS module not found)

class API:
    def __init__(self, id):
        self.url2 = "http://traccar.earth-hosting.ch:5055/?id=%s&lat={0}&lon={1}&timestamp={2}&accuracy={3}&speed={4}" % id
        self.url3 = self.url2 + "&altitude={5}"

    def update(self, args):
        if len(args) == 6:
            requests.post(self.url3.format(*args))
        elif len(args) == 5:
            requests.post(self.url2.format(*args))


api = API(ID)

while True:
    try:
        gpsd.connect()
        while True:
            packet = gpsd.get_current()
            if not packet.lat:
                time.sleep(BAD_DATA_DELAY)
                continue

            args = []
            args.append(packet.lat)
            args.append(packet.lon)
            args.append(packet.time)
            args.append(max(packet.error["x"], packet.error["y"]))
            args.append(packet.hspeed)
            if packet.mode == 3:
                args.append(packet.alt)

            api.update(args)
            
            time.sleep(UPDATE_DELAY)
    except Exception as e:
        print(e)
        time.sleep(RECONNECT_DELAY)

pengugps.py

import socket
import json
import logging
import datetime


gpsd_socket = None
gpsd_stream = None
state = {}


def _parse_state_packet(json_data):
    global state
    if json_data['class'] == 'DEVICES':
        if not json_data['devices']:
            raise Exception("No gps devices found")
        state['devices'] = json_data
    elif json_data['class'] == 'WATCH':
        state['watch'] = json_data
    else:
        raise Exception(
            "Unexpected message received from gps: {}".format(json_data['class']))


class GpsResponse(object):
    def __init__(self):
        self.mode = 0
        self.sats = 0
        self.sats_valid = 0
        self.lon = 0.0
        self.lat = 0.0
        self.alt = 0.0
        self.track = 0
        self.hspeed = 0
        self.climb = 0
        self.time = ''
        self.error = {}

    @classmethod
    def from_json(cls, packet):
        result = cls()
        if not packet['active']:
            raise Exception('GPS not active')
        last_tpv = packet['tpv'][-1]
        last_sky = packet['sky'][-1]

        if 'satellites' in last_sky:
            result.sats = len(last_sky['satellites'])
            result.sats_valid = len(
                [sat for sat in last_sky['satellites'] if sat['used'] == True])
        else:
            result.sats = 0;
            result.sats_valid = 0;

        result.mode = last_tpv['mode']

        if last_tpv['mode'] >= 2:
            result.lon = last_tpv['lon'] if 'lon' in last_tpv else 0.0
            result.lat = last_tpv['lat'] if 'lat' in last_tpv else 0.0
            result.track = last_tpv['track'] if 'track' in last_tpv else 0
            result.hspeed = last_tpv['speed'] if 'speed' in last_tpv else 0
            result.time = last_tpv['time'] if 'time' in last_tpv else ''
            result.error = {
                'c': 0,
                's': last_tpv['eps'] if 'eps' in last_tpv else 0,
                't': last_tpv['ept'] if 'ept' in last_tpv else 0,
                'v': 0,
                'x': last_tpv['epx'] if 'epx' in last_tpv else 0,
                'y': last_tpv['epy'] if 'epy' in last_tpv else 0
            }

        if last_tpv['mode'] >= 3:
            result.alt = last_tpv['alt'] if 'alt' in last_tpv else 0.0
            result.climb = last_tpv['climb'] if 'climb' in last_tpv else 0
            result.error['c'] = last_tpv['epc'] if 'epc' in last_tpv else 0
            result.error['v'] = last_tpv['epv'] if 'epv' in last_tpv else 0

        return result

    def __repr__(self):
        modes = {
            0: 'No mode',
            1: 'No fix',
            2: '2D fix',
            3: '3D fix'
        }
        if self.mode < 2:
            return "<GpsResponse {}>".format(modes[self.mode])
        if self.mode == 2:
            return "<GpsResponse 2D Fix {} {}>".format(self.lat, self.lon)
        if self.mode == 3:
            return "<GpsResponse 3D Fix {} {} ({} m)>".format(self.lat, self.lon, self.alt)


def connect(host="127.0.0.1", port=2947):
    global gpsd_socket, gpsd_stream, verbose_output, state
    gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    gpsd_socket.connect((host, port))
    gpsd_stream = gpsd_socket.makefile(mode="rw")
    welcome_raw = gpsd_stream.readline()
    welcome = json.loads(welcome_raw)
    if welcome['class'] != "VERSION":
        raise Exception(
            "Unexpected data received as welcome. Is the server a gpsd 3 server?")
    gpsd_stream.write('?WATCH={"enable":true}\n')
    gpsd_stream.flush()

    for i in range(0, 2):
        raw = gpsd_stream.readline()
        parsed = json.loads(raw)
        _parse_state_packet(parsed)


def get_current():
    global gpsd_stream, verbose_output
    gpsd_stream.write("?POLL;\n")
    gpsd_stream.flush()
    raw = gpsd_stream.readline()
    response = json.loads(raw)
    if response['class'] != 'POLL':
        raise Exception(
            "Unexpected message received from gps: {}".format(response['class']))
    return GpsResponse.from_json(response)