Python Example Source Code for TEC controllers
ostech-tec.py
—
Python Source,
8 KB (8587 bytes)
File contents
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # vim: fileencoding=utf-8 from __future__ import print_function from __future__ import unicode_literals from __future__ import with_statement import serial import threading import struct import time import six import sys TIMEOUT = 1 # TimeOut in s EOL_CHAR = b"\r" # End of line as OsTech drivers expect ESCAPE = b'\x1b' # Escape character cancels line class Device(object): """Communication with OsTech drivers.""" def __init__(self, port=0, baudrate=9600): self.port = port self.baudrate = baudrate self._is_connected = False self._serial = serial.Serial(timeout=TIMEOUT, writeTimeout=TIMEOUT) self._serial.port = port self._serial_lock = threading.RLock() def connect(self): """Establishes a connection to a driver.""" with self._serial_lock: self._serial.open() if not self._serial.isOpen(): return if not self._detect_driver(): return self._set_binary_mode(True) def disconnect(self): """Closes connection to driver.""" with self._serial_lock: self._set_binary_mode(False) self._serial.close() self.is_connected = False def is_connected(self): """Returns if connected with a driver.""" return self._is_connected def send_command(self, command, cmdType=str, newValue=None, retry=True): """Sends command to driver and returns response value.""" if newValue is not None: command += _format_value(newValue) command += EOL_CHAR with self._serial_lock: for i in range(2): try: self._serial.flushInput() self._serial.write(command) echo = self._read_line() if echo == b"": raise serial.SerialTimeoutException if echo != command.upper(): raise OsTechEchoDiffersError(command) if cmdType is not None: value = self._read_value(command, cmdType) else: value = None self._serial.flushInput() except (serial.SerialException, OsTechError): if not retry: raise else: time.sleep(.1) self._serial.write(ESCAPE) time.sleep(.1) if i > 0: raise else: break return value def send_text_cmd(self, command, newValue=None): """Sends command to driver in standard mode and returns answer string. """ with self._serial_lock: self._set_binary_mode(False) response = self.send_command(command=command, cmdType=str, newValue=newValue) self._set_binary_mode(True) if response == b"": raise serial.SerialTimeoutException return response def _set_binary_mode(self, binary=True): """Enables or disables binary mode.""" if binary: self.send_command(command=b"GMS", cmdType=int, newValue=8) else: self.send_command(command=b"GMC", cmdType=str, newValue=8) def _detect_driver(self, timeout=1): self._serial.flushInput() endTime = time.time() + timeout while timeout is None or time.time() < endTime: self._serial.write(ESCAPE) response = self._serial.read() if (response == ESCAPE): self._is_connected = True break self._serial.flushInput() return self._is_connected def _read_value(self, command, readType): """Reads a value of the given type in OsTech driver representation from the serial connection. Then the value is converted accordingly and returned. If no type is given a string is read until EOL_CHAR. """ with self._serial_lock: if readType == bool: response = self._serial.read(1) if len(response)!=1: raise serial.SerialTimeoutException if response == b"\xAA": value = True elif response==b"\x55": value = False else: raise OsTechChecksumWrongError(command) elif readType == float: response = self._serial.read(5) if len(response) != 5: raise serial.SerialTimeoutException value = struct.unpack('>fx', response)[0] chkRead = bytearray(response)[4] chkCalc = (0x55 + sum(bytearray(response[:4]))) % 256 if chkRead != chkCalc: raise OsTechChecksumWrongError elif readType == int: response = self._serial.read(3) if len(response) != 3: raise serial.SerialTimeoutException value = struct.unpack('>Hx', response)[0] chkRead = bytearray(response)[2] chkCalc = (0x55 + sum(bytearray(response[:2]))) % 256 if chkRead != chkCalc: raise OsTechChecksumWrongError(command) else: value = self._read_line() if value == b"": raise serial.SerialTimeoutException return value def _read_line(self): line = b"" eolRead = False with self._serial_lock: while (not eolRead): c = b"" + self._serial.read(1) if len(c) == 0: return line eolRead = c == EOL_CHAR line += c return line def _format_value(value): """Converts a value to a format OsTech drivers understand. Numbers are forced to fit into 9 characters. """ if type(value) == bool: if value: result = "R" else: result = "S" elif type(value) in six.integer_types: value = max(0, min(65535, value)) result = "{:d}".format(value) elif type(value) == float: for i in range(9, 0, -1): result = "{:.{p}g}".format(value, p=i) result.replace("e-0", "e-") result.replace("e+0", "e+") if len(result) <= 9: break else: raise TypeError return six.ensure_binary(result, encoding="ascii") class OsTechError(Exception): """General OsTech error""" class OsTechEchoDiffersError(OsTechError): """Received echo differs from sent command.""" def __init__(self, command=None): self.command = command def __repr__(self): return "OsTechEchoDiffersError({})".format(self.command) class OsTechChecksumWrongError(OsTechError): """Checksum of read value is wrong.""" def __init__(self, command=None): self.command = command def __repr__(self): return "OsTechChecksumWrongError({})".format(self.command) if __name__=="__main__": print("--- EXAMPLE ---") port = "/dev/ttyUSB0" device = Device(port) device.connect() if not device.is_connected(): sys.exit("No OsTech Device found on port {}.".format(port)) deviceType = device.send_command(b"GVT", int) serialNumber = device.send_command(b"GVN", int) softwareVersion = device.send_command(b"GVS", int) temperature1 = device.send_command(b"1SA", float) temperature2 = device.send_command(b"2SA", float) tt1 = device.send_command(command=b"1TT", cmdType=float, newValue=20.0) tt2 = device.send_command(command=b"2TT", cmdType=float, newValue=25.0) t1 = device.send_command(command=b"1TC", cmdType=bool, newValue=False) print("{:<40}{:>8}".format("Device type:", deviceType)) print("{:<40}{:>8}".format("Serial number:", serialNumber)) print("{:<40}{:>8}".format("Software version:", softwareVersion)) print("{:<40}{:>8}".format("Temperature 1:", temperature1)) print("{:<40}{:>8}".format("Temperature 2:", temperature2)) print("{:<40}{:>8}".format("Set TEC1 target temperature to 20°C:", tt1)) print("{:<40}{:>8}".format("Set TEC2 target temperature to 25°C:", tt2)) print("{:<40}{:>8}".format("Stop TEC1:", str(t1))) device.disconnect()