hello
I would like to share a simple python script to use a RPI with a usb-GPS to talk to your traccar server.
Google-Drive
api.py
import requests
import pengugps as gpsd
import time
ID = "001"
UPDATE_DELAY = 1
BAD_DATA_DELAY = 0.1
RECONNECT_DELAY = 1
class API:
def __init__(self, id):
self.url2 = "http://traccar.earth-hosting.ch:5055/?id=%s&lat={0}&lon={1}×tamp={2}&accuracy={3}&speed={4}" % id
self.url3 = self.url2 + "&altitude={5}"
def update(self, args):
if len(args) == 6:
requests.post(self.url3.format(*args))
elif len(args) == 5:
requests.post(self.url2.format(*args))
api = API(ID)
while True:
try:
gpsd.connect()
while True:
packet = gpsd.get_current()
if not packet.lat:
time.sleep(BAD_DATA_DELAY)
continue
args = []
args.append(packet.lat)
args.append(packet.lon)
args.append(packet.time)
args.append(max(packet.error["x"], packet.error["y"]))
args.append(packet.hspeed)
if packet.mode == 3:
args.append(packet.alt)
api.update(args)
time.sleep(UPDATE_DELAY)
except Exception as e:
print(e)
time.sleep(RECONNECT_DELAY)
pengugps.py
import socket
import json
import logging
import datetime
gpsd_socket = None
gpsd_stream = None
state = {}
def _parse_state_packet(json_data):
global state
if json_data['class'] == 'DEVICES':
if not json_data['devices']:
raise Exception("No gps devices found")
state['devices'] = json_data
elif json_data['class'] == 'WATCH':
state['watch'] = json_data
else:
raise Exception(
"Unexpected message received from gps: {}".format(json_data['class']))
class GpsResponse(object):
def __init__(self):
self.mode = 0
self.sats = 0
self.sats_valid = 0
self.lon = 0.0
self.lat = 0.0
self.alt = 0.0
self.track = 0
self.hspeed = 0
self.climb = 0
self.time = ''
self.error = {}
@classmethod
def from_json(cls, packet):
result = cls()
if not packet['active']:
raise Exception('GPS not active')
last_tpv = packet['tpv'][-1]
last_sky = packet['sky'][-1]
if 'satellites' in last_sky:
result.sats = len(last_sky['satellites'])
result.sats_valid = len(
[sat for sat in last_sky['satellites'] if sat['used'] == True])
else:
result.sats = 0;
result.sats_valid = 0;
result.mode = last_tpv['mode']
if last_tpv['mode'] >= 2:
result.lon = last_tpv['lon'] if 'lon' in last_tpv else 0.0
result.lat = last_tpv['lat'] if 'lat' in last_tpv else 0.0
result.track = last_tpv['track'] if 'track' in last_tpv else 0
result.hspeed = last_tpv['speed'] if 'speed' in last_tpv else 0
result.time = last_tpv['time'] if 'time' in last_tpv else ''
result.error = {
'c': 0,
's': last_tpv['eps'] if 'eps' in last_tpv else 0,
't': last_tpv['ept'] if 'ept' in last_tpv else 0,
'v': 0,
'x': last_tpv['epx'] if 'epx' in last_tpv else 0,
'y': last_tpv['epy'] if 'epy' in last_tpv else 0
}
if last_tpv['mode'] >= 3:
result.alt = last_tpv['alt'] if 'alt' in last_tpv else 0.0
result.climb = last_tpv['climb'] if 'climb' in last_tpv else 0
result.error['c'] = last_tpv['epc'] if 'epc' in last_tpv else 0
result.error['v'] = last_tpv['epv'] if 'epv' in last_tpv else 0
return result
def __repr__(self):
modes = {
0: 'No mode',
1: 'No fix',
2: '2D fix',
3: '3D fix'
}
if self.mode < 2:
return "<GpsResponse {}>".format(modes[self.mode])
if self.mode == 2:
return "<GpsResponse 2D Fix {} {}>".format(self.lat, self.lon)
if self.mode == 3:
return "<GpsResponse 3D Fix {} {} ({} m)>".format(self.lat, self.lon, self.alt)
def connect(host="127.0.0.1", port=2947):
global gpsd_socket, gpsd_stream, verbose_output, state
gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
gpsd_socket.connect((host, port))
gpsd_stream = gpsd_socket.makefile(mode="rw")
welcome_raw = gpsd_stream.readline()
welcome = json.loads(welcome_raw)
if welcome['class'] != "VERSION":
raise Exception(
"Unexpected data received as welcome. Is the server a gpsd 3 server?")
gpsd_stream.write('?WATCH={"enable":true}\n')
gpsd_stream.flush()
for i in range(0, 2):
raw = gpsd_stream.readline()
parsed = json.loads(raw)
_parse_state_packet(parsed)
def get_current():
global gpsd_stream, verbose_output
gpsd_stream.write("?POLL;\n")
gpsd_stream.flush()
raw = gpsd_stream.readline()
response = json.loads(raw)
if response['class'] != 'POLL':
raise Exception(
"Unexpected message received from gps: {}".format(response['class']))
return GpsResponse.from_json(response)
hello
I would like to share a simple python script to use a RPI with a usb-GPS to talk to your traccar server.
Google-Drive
api.py
import requests import pengugps as gpsd import time ID = "001" # Identifier when adding new device to the traccar server, should be long and random since there is no authentication UPDATE_DELAY = 1 # Seconds waited between each request made to the traccar server BAD_DATA_DELAY = 0.1 # Seconds waited before requesting coordinates again from GPS when it first returns invalid data RECONNECT_DELAY = 1 # Seconds waited before trying to reconnect to the GPS module when something goes wrong (for example GPS module not found) class API: def __init__(self, id): self.url2 = "http://traccar.earth-hosting.ch:5055/?id=%s&lat={0}&lon={1}×tamp={2}&accuracy={3}&speed={4}" % id self.url3 = self.url2 + "&altitude={5}" def update(self, args): if len(args) == 6: requests.post(self.url3.format(*args)) elif len(args) == 5: requests.post(self.url2.format(*args)) api = API(ID) while True: try: gpsd.connect() while True: packet = gpsd.get_current() if not packet.lat: time.sleep(BAD_DATA_DELAY) continue args = [] args.append(packet.lat) args.append(packet.lon) args.append(packet.time) args.append(max(packet.error["x"], packet.error["y"])) args.append(packet.hspeed) if packet.mode == 3: args.append(packet.alt) api.update(args) time.sleep(UPDATE_DELAY) except Exception as e: print(e) time.sleep(RECONNECT_DELAY)
pengugps.py
import socket import json import logging import datetime gpsd_socket = None gpsd_stream = None state = {} def _parse_state_packet(json_data): global state if json_data['class'] == 'DEVICES': if not json_data['devices']: raise Exception("No gps devices found") state['devices'] = json_data elif json_data['class'] == 'WATCH': state['watch'] = json_data else: raise Exception( "Unexpected message received from gps: {}".format(json_data['class'])) class GpsResponse(object): def __init__(self): self.mode = 0 self.sats = 0 self.sats_valid = 0 self.lon = 0.0 self.lat = 0.0 self.alt = 0.0 self.track = 0 self.hspeed = 0 self.climb = 0 self.time = '' self.error = {} @classmethod def from_json(cls, packet): result = cls() if not packet['active']: raise Exception('GPS not active') last_tpv = packet['tpv'][-1] last_sky = packet['sky'][-1] if 'satellites' in last_sky: result.sats = len(last_sky['satellites']) result.sats_valid = len( [sat for sat in last_sky['satellites'] if sat['used'] == True]) else: result.sats = 0; result.sats_valid = 0; result.mode = last_tpv['mode'] if last_tpv['mode'] >= 2: result.lon = last_tpv['lon'] if 'lon' in last_tpv else 0.0 result.lat = last_tpv['lat'] if 'lat' in last_tpv else 0.0 result.track = last_tpv['track'] if 'track' in last_tpv else 0 result.hspeed = last_tpv['speed'] if 'speed' in last_tpv else 0 result.time = last_tpv['time'] if 'time' in last_tpv else '' result.error = { 'c': 0, 's': last_tpv['eps'] if 'eps' in last_tpv else 0, 't': last_tpv['ept'] if 'ept' in last_tpv else 0, 'v': 0, 'x': last_tpv['epx'] if 'epx' in last_tpv else 0, 'y': last_tpv['epy'] if 'epy' in last_tpv else 0 } if last_tpv['mode'] >= 3: result.alt = last_tpv['alt'] if 'alt' in last_tpv else 0.0 result.climb = last_tpv['climb'] if 'climb' in last_tpv else 0 result.error['c'] = last_tpv['epc'] if 'epc' in last_tpv else 0 result.error['v'] = last_tpv['epv'] if 'epv' in last_tpv else 0 return result def __repr__(self): modes = { 0: 'No mode', 1: 'No fix', 2: '2D fix', 3: '3D fix' } if self.mode < 2: return "<GpsResponse {}>".format(modes[self.mode]) if self.mode == 2: return "<GpsResponse 2D Fix {} {}>".format(self.lat, self.lon) if self.mode == 3: return "<GpsResponse 3D Fix {} {} ({} m)>".format(self.lat, self.lon, self.alt) def connect(host="127.0.0.1", port=2947): global gpsd_socket, gpsd_stream, verbose_output, state gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) gpsd_socket.connect((host, port)) gpsd_stream = gpsd_socket.makefile(mode="rw") welcome_raw = gpsd_stream.readline() welcome = json.loads(welcome_raw) if welcome['class'] != "VERSION": raise Exception( "Unexpected data received as welcome. Is the server a gpsd 3 server?") gpsd_stream.write('?WATCH={"enable":true}\n') gpsd_stream.flush() for i in range(0, 2): raw = gpsd_stream.readline() parsed = json.loads(raw) _parse_state_packet(parsed) def get_current(): global gpsd_stream, verbose_output gpsd_stream.write("?POLL;\n") gpsd_stream.flush() raw = gpsd_stream.readline() response = json.loads(raw) if response['class'] != 'POLL': raise Exception( "Unexpected message received from gps: {}".format(response['class'])) return GpsResponse.from_json(response)