Python Example Source Code
ostech.py
—
Python Source,
8 KB (8396 bytes)
File contents
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import with_statement
import serial
import threading
import struct
import time
import six
import sys
TIMEOUT = 1 # TimeOut in s
EOL_CHAR = b"\r" # End of line as OsTech drivers expect
ESCAPE = b'\x1b' # Escape character cancels line
class Device(object):
"""Communication with OsTech drivers."""
def __init__(self, port=0, baudrate=9600):
self.port = port
self.baudrate = baudrate
self._is_connected = False
self._serial = serial.Serial(timeout=TIMEOUT, writeTimeout=TIMEOUT)
self._serial.port = port
self._serial_lock = threading.RLock()
def connect(self):
"""Establishes a connection to a driver."""
with self._serial_lock:
self._serial.open()
if not self._serial.isOpen():
return
if not self._detect_driver():
return
self._set_binary_mode(True)
def disconnect(self):
"""Closes connection to driver."""
with self._serial_lock:
self._set_binary_mode(False)
self._serial.close()
self.is_connected = False
def is_connected(self):
"""Returns if connected with a driver."""
return self._is_connected
def send_command(self, command, cmdType=str, newValue=None, retry=True):
"""Sends command to driver and returns response value."""
if newValue is not None:
if cmdType is not str:
newValue = cmdType(newValue)
command += _format_value(newValue)
command += EOL_CHAR
with self._serial_lock:
for i in range(2):
try:
self._serial.flushInput()
self._serial.write(command)
echo = self._read_line()
if echo == b"":
raise serial.SerialTimeoutException
if echo != command.upper():
raise OsTechEchoDiffersError(command)
if cmdType is not None:
value = self._read_value(command, cmdType)
else:
value = None
self._serial.flushInput()
except (serial.SerialException, OsTechError):
if not retry:
raise
else:
time.sleep(.1)
self._serial.write(ESCAPE)
time.sleep(.1)
if i > 0:
raise
else:
break
return value
def send_text_cmd(self, command, newValue=None):
"""Sends command to driver in standard mode and returns answer
string.
"""
with self._serial_lock:
self._set_binary_mode(False)
response = self.send_command(command=command, cmdType=str,
newValue=newValue)
self._set_binary_mode(True)
if response == b"":
raise serial.SerialTimeoutException
return response
def _set_binary_mode(self, binary=True):
"""Enables or disables binary mode."""
if binary:
self.send_command(command=b"GMS", cmdType=int, newValue=8)
else:
self.send_command(command=b"GMC", cmdType=str, newValue=8)
def _detect_driver(self, timeout=1):
self._serial.flushInput()
endTime = time.time() + timeout
while timeout is None or time.time() < endTime:
self._serial.write(ESCAPE)
response = self._serial.read()
if (response == ESCAPE):
self._is_connected = True
break
self._serial.flushInput()
return self._is_connected
def _read_value(self, command, readType):
"""Reads a value of the given type in OsTech driver representation from
the serial connection. Then the value is converted accordingly and
returned. If no type is given a string is read until EOL_CHAR.
"""
with self._serial_lock:
if readType == bool:
response = self._serial.read(1)
if len(response)!=1:
raise serial.SerialTimeoutException
if response == b"\xAA":
value = True
elif response==b"\x55":
value = False
else:
raise OsTechChecksumWrongError(command)
elif readType == float:
response = self._serial.read(5)
if len(response) != 5:
raise serial.SerialTimeoutException
value = struct.unpack('>fx', response)[0]
chkRead = bytearray(response)[4]
chkCalc = (0x55 + sum(bytearray(response[:4]))) % 256
if chkRead != chkCalc:
raise OsTechChecksumWrongError
elif readType == int:
response = self._serial.read(3)
if len(response) != 3:
raise serial.SerialTimeoutException
value = struct.unpack('>Hx', response)[0]
chkRead = bytearray(response)[2]
chkCalc = (0x55 + sum(bytearray(response[:2]))) % 256
if chkRead != chkCalc:
raise OsTechChecksumWrongError(command)
else:
value = self._read_line()
if value == b"":
raise serial.SerialTimeoutException
return value
def _read_line(self):
line = b""
eolRead = False
with self._serial_lock:
while (not eolRead):
c = b"" + self._serial.read(1)
if len(c) == 0:
return line
eolRead = c == EOL_CHAR
line += c
return line
def _format_value(value):
"""Converts a value to a format OsTech drivers understand.
Numbers are forced to fit into 9 characters.
"""
if type(value) == bool:
if value:
result = "R"
else:
result = "S"
elif type(value) in six.integer_types:
value = max(0, min(65535, value))
result = "{:d}".format(value)
elif type(value) == float:
for i in range(9, 0, -1):
result = "{:.{p}g}".format(value, p=i)
result.replace("e-0", "e-")
result.replace("e+0", "e+")
if len(result) <= 9:
break
else:
raise TypeError
return six.ensure_binary(result, encoding="ascii")
class OsTechError(Exception):
"""General OsTech error"""
class OsTechEchoDiffersError(OsTechError):
"""Received echo differs from sent command."""
def __init__(self, command=None):
self.command = command
def __repr__(self):
return "OsTechEchoDiffersError({})".format(self.command)
class OsTechChecksumWrongError(OsTechError):
"""Checksum of read value is wrong."""
def __init__(self, command=None):
self.command = command
def __repr__(self):
return "OsTechChecksumWrongError({})".format(self.command)
if __name__=="__main__":
print("--- EXAMPLE ---")
port = "/dev/ttyUSB0"
device = Device(port)
device.connect()
if not device.is_connected():
sys.exit("No OsTech Device found on port {}.".format(port))
deviceType = device.send_command(b"GVT", int)
serialNumber = device.send_command(b"GVN", int)
softwareVersion = device.send_command(b"GVS", int)
temperature = device.send_command(b"1SA", float)
lct = device.send_command(command=b"LCT", cmdType=float, newValue=0.0)
ls = device.send_command(command=b"LS", cmdType=bool, newValue=False)
print("{:<40}{:>8}".format("Device type:", deviceType))
print("{:<40}{:>8}".format("Serial number:", serialNumber))
print("{:<40}{:>8}".format("Software version:", softwareVersion))
print("{:<40}{:>8}".format("Temperature 1:", temperature))
print("{:<40}{:>8}".format("Set laser target current to 0 mA:", lct))
print("{:<40}{:>8}".format("Turn the laser off:", str(ls)))
device.disconnect()