Python Example Source Code for TEC controllers

Python Source icon ostech-tec.py — Python Source, 8 KB (8587 bytes)

File contents

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import with_statement
import serial
import threading
import struct
import time
import six
import sys


TIMEOUT = 1 # TimeOut in s
EOL_CHAR = b"\r" # End of line as OsTech drivers expect
ESCAPE = b'\x1b' # Escape character cancels line


class Device(object):
    """Communication with OsTech drivers."""

    def __init__(self, port=0, baudrate=9600):
        self.port = port
        self.baudrate = baudrate
        self._is_connected = False
        self._serial = serial.Serial(timeout=TIMEOUT, writeTimeout=TIMEOUT)
        self._serial.port = port
        self._serial_lock = threading.RLock()

    def connect(self):
        """Establishes a connection to a driver."""
        with self._serial_lock:
            self._serial.open()
            if not self._serial.isOpen():
                return
            if not self._detect_driver():
                return
            self._set_binary_mode(True)

    def disconnect(self):
        """Closes connection to driver."""
        with self._serial_lock:
            self._set_binary_mode(False)
            self._serial.close()
        self.is_connected = False

    def is_connected(self):
        """Returns if connected with a driver."""
        return self._is_connected

    def send_command(self, command, cmdType=str, newValue=None, retry=True):
        """Sends command to driver and returns response value."""
        if newValue is not None:
            command += _format_value(newValue)
        command += EOL_CHAR
        with self._serial_lock:
            for i in range(2):
                try:
                    self._serial.flushInput()
                    self._serial.write(command)
                    echo = self._read_line()
                    if echo == b"":
                        raise serial.SerialTimeoutException
                    if echo != command.upper():
                        raise OsTechEchoDiffersError(command)
                    if cmdType is not None:
                        value = self._read_value(command, cmdType)
                    else:
                        value = None
                    self._serial.flushInput()
                except (serial.SerialException, OsTechError):
                    if not retry:
                        raise
                    else:
                        time.sleep(.1)
                        self._serial.write(ESCAPE)
                        time.sleep(.1)
                        if i > 0:
                            raise
                else:
                    break
        return value

    def send_text_cmd(self, command, newValue=None):
        """Sends command to driver in standard mode and returns answer
        string.
        """
        with self._serial_lock:
            self._set_binary_mode(False)
            response = self.send_command(command=command, cmdType=str,
                newValue=newValue)
            self._set_binary_mode(True)
            if response == b"":
                raise serial.SerialTimeoutException
        return response

    def _set_binary_mode(self, binary=True):
        """Enables or disables binary mode."""
        if binary:
            self.send_command(command=b"GMS", cmdType=int, newValue=8)
        else:
            self.send_command(command=b"GMC", cmdType=str, newValue=8)

    def _detect_driver(self, timeout=1):
        self._serial.flushInput()
        endTime = time.time() + timeout
        while timeout is None or time.time() < endTime:
            self._serial.write(ESCAPE)
            response = self._serial.read()
            if (response == ESCAPE):
                self._is_connected = True
                break
        self._serial.flushInput()
        return self._is_connected

    def _read_value(self, command, readType):
        """Reads a value of the given type in OsTech driver representation from
        the serial connection. Then the value is converted accordingly and
        returned. If no type is given a string is read until EOL_CHAR.
        """
        with self._serial_lock:
            if readType == bool:
                response = self._serial.read(1)
                if len(response)!=1:
                    raise serial.SerialTimeoutException
                if response == b"\xAA":
                    value = True
                elif response==b"\x55":
                    value = False
                else:
                    raise OsTechChecksumWrongError(command)
            elif readType == float:
                response = self._serial.read(5)
                if len(response) != 5:
                    raise serial.SerialTimeoutException
                value = struct.unpack('>fx', response)[0]
                chkRead = bytearray(response)[4]
                chkCalc = (0x55 + sum(bytearray(response[:4]))) % 256
                if chkRead != chkCalc:
                    raise OsTechChecksumWrongError
            elif readType == int:
                response = self._serial.read(3)
                if len(response) != 3:
                    raise serial.SerialTimeoutException
                value = struct.unpack('>Hx', response)[0]
                chkRead = bytearray(response)[2]
                chkCalc = (0x55 + sum(bytearray(response[:2]))) % 256
                if chkRead != chkCalc:
                    raise OsTechChecksumWrongError(command)
            else:
                value = self._read_line()
                if value == b"":
                    raise serial.SerialTimeoutException
        return value

    def _read_line(self):
        line = b""
        eolRead = False
        with self._serial_lock:
            while (not eolRead):
                c = b"" + self._serial.read(1)
                if len(c) == 0:
                    return line
                eolRead = c == EOL_CHAR
                line += c
        return line


def _format_value(value):
    """Converts a value to a format OsTech drivers understand.
    Numbers are forced to fit into 9 characters.
    """
    if type(value) == bool:
        if value:
            result = "R"
        else:
            result = "S"
    elif type(value) in six.integer_types:
        value = max(0, min(65535, value))
        result = "{:d}".format(value)
    elif type(value) == float:
        for i in range(9, 0, -1):
            result = "{:.{p}g}".format(value, p=i)
            result.replace("e-0", "e-")
            result.replace("e+0", "e+")
            if len(result) <= 9:
                break
    else:
        raise TypeError
    return six.ensure_binary(result, encoding="ascii")


class OsTechError(Exception):
    """General OsTech error"""


class OsTechEchoDiffersError(OsTechError):
    """Received echo differs from sent command."""

    def __init__(self, command=None):
        self.command = command

    def __repr__(self):
        return "OsTechEchoDiffersError({})".format(self.command)


class OsTechChecksumWrongError(OsTechError):
    """Checksum of read value is wrong."""

    def __init__(self, command=None):
        self.command = command

    def __repr__(self):
        return "OsTechChecksumWrongError({})".format(self.command)


if __name__=="__main__":
    print("--- EXAMPLE ---")

    port = "/dev/ttyUSB0"
    device = Device(port)
    device.connect()
    if not device.is_connected():
        sys.exit("No OsTech Device found on port {}.".format(port))

    deviceType = device.send_command(b"GVT", int)
    serialNumber = device.send_command(b"GVN", int)
    softwareVersion = device.send_command(b"GVS", int)
    temperature1 = device.send_command(b"1SA", float)
    temperature2 = device.send_command(b"2SA", float)
    tt1 = device.send_command(command=b"1TT", cmdType=float, newValue=20.0)
    tt2 = device.send_command(command=b"2TT", cmdType=float, newValue=25.0)
    t1 = device.send_command(command=b"1TC", cmdType=bool, newValue=False)

    print("{:<40}{:>8}".format("Device type:", deviceType))
    print("{:<40}{:>8}".format("Serial number:", serialNumber))
    print("{:<40}{:>8}".format("Software version:", softwareVersion))
    print("{:<40}{:>8}".format("Temperature 1:", temperature1))
    print("{:<40}{:>8}".format("Temperature 2:", temperature2))
    print("{:<40}{:>8}".format("Set TEC1 target temperature to 20°C:", tt1))
    print("{:<40}{:>8}".format("Set TEC2 target temperature to 25°C:", tt2))
    print("{:<40}{:>8}".format("Stop TEC1:", str(t1)))

    device.disconnect()