Python Example Source Code

Python Source icon ostech.py — Python Source, 6 KB (7116 bytes)

File contents

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import with_statement
import sys
import time
import serial
import struct
import threading

TIMEOUT = 1 #: TimeOut in s
EOLCHAR = "\r" #: End of line as OsTech drivers expect


# OsTech Exception classes

class OsTechError(Exception):
    """General OsTech error"""

class OsTechCommError(OsTechError):
    """Communication errors"""

class OsTechEchoDiffersError(OsTechCommError):
    """Received echo differs from sent command."""
    def __init__(self, cmdName=None):
        self.cmdName = cmdName

    def __repr__(self):
        return "OsTechEchoDiffersError(%s)" % self.cmdName

class OsTechChecksumWrongError(OsTechCommError):
    """Checksum of read value is wrong."""

class OsTechPortInUseError(OsTechCommError):
    """A second instance using the same serial port is about to be created."""



def pack(value):
    """Converts a value to a format OsTech drivers understand.
    Numbers are forced to fit into 9 characters.
    """
    if type(value)==bool:
        if value: result = "R"
        else: result = "S"
    elif type(value) in [int, long]:
        result = "%u" % max(0, min(65535, value))
    elif type(value)==float:
        for i in range(9,0,-1):
            result = "%.*g" % (i,value)
            if len(result)<=9: break
    else:
        raise TypeError
    return result


class Comm(object):
    """Communication with OsTech drivers."""
    usedPorts = []
    usedPortsLock = threading.RLock()

    def __init__(self, port=0):
        self.port = port
        # mark port as used
        with self.usedPortsLock:
            if port not in self.usedPorts:
                self.usedPorts.append(port)
            else:
                raise OsTechPortInUseError
        # init serial port
        self._ser = serial.Serial(timeout=TIMEOUT, writeTimeout=TIMEOUT)
        self._ser.setPort(port)
        self._serLock = threading.RLock()

    def __del__(self):
        # mark port as unused
        with self.usedPortsLock:
            if self._ser.getPort() in self.usedPorts:
                self.usedPorts.remove(self._ser.getPort())

    def connect(self):
        """Establishes a connection to a driver."""
        with self._serLock:
            self._ser.open()

    def disconnect(self):
        """Closes connection to driver."""
        with self._serLock:
            self._ser.close()

    def isConnected(self):
        """Returns if connected with a driver."""
        with self._serLock:
            return self._ser.isOpen()

    def getPortName(self):
        return self._ser.makeDeviceName(self.port)

    def sendTextCmd(self, cmdStr):
        """Sends command to driver in standard mode and returns answer
        string.

        """
        cmd = Command(name=cmdStr, typ=str)
        with self._serLock:
            self.setBinaryMode(False)
            response = self.sendCmd(cmdStr)
            self.setBinaryMode(True)
        if response.value == "":
            raise serial.SerialTimeoutException
        return response.value

    def setBinaryMode(self, binary=True):
        """Enables or disables binary mode."""
        with self._serLock:
            if binary:
                self.sendCmd("GMS", int, 8)
            else: 
                self.sendCmd("GMC", str, 8)

    def sendCmd(self, cmd, typ, newValue=None, retry=True):
        """Sends command to driver and checks echo."""
        cmdStr = cmd
        if newValue is not None:
            cmdStr += pack(newValue)
        cmdStr += EOLCHAR
        with self._serLock:
            for i in range(2):
                try:
                    self._ser.flushInput()
                    self._ser.write(cmdStr)
                    echo = self._ser.readline(eol=EOLCHAR)
                    if echo == "":
                        raise serial.SerialTimeoutException
                    if echo != cmdStr.upper():
                        print "<<\"%s\" != >>\"%s\"\n" % (repr(echo), repr(cmdStr.upper()))
                        sys.stdout.flush()
                        raise OsTechEchoDiffersError(cmd.name)
                    if typ is not None:
                        value = self._readValue(typ)
                    else:
                        value = None
                    self._ser.flushInput()
                except (serial.SerialException, OsTechError):
                    if not retry:
                        raise
                    else:
                        print "KOMMUNIKATIONSFEHLER %d %s " % (i, cmdStr[:-1]) # DEBUG
                        time.sleep(.1)
                        self._ser.write(chr(27))
                        time.sleep(.1)
                        if i > 0:
                            raise
                else:
                    break
        return value

    def _readValue(self, readType):
        """Reads a value of the given type in OsTech driver representation from
        the serial connection. Then the value is converted accordingly and
        returned. If no type is given a string is read until EOLCHAR.
        """
        with self._serLock:
            if readType == bool:
                response = self._ser.read(1)
                if len(response)!=1:
                    raise serial.SerialTimeoutException
                if response == "\xAA":
                    value = True
                elif response=="\x55":
                    value = False
                else: raise OsTechChecksumWrongError
            elif readType == float:
                response = self._ser.read(5)
                if len(response) != 5:
                    raise serial.SerialTimeoutException
                value = struct.unpack('>fx', response)[0]
                chkRead = ord(response[4])
                chkCalc = (0x55 + sum(map(ord, response[:4]))) % 256
                if chkRead != chkCalc:
                    raise OsTechChecksumWrongError
            elif readType == int:
                response = self._ser.read(3)
                if len(response) != 3:
                    raise serial.SerialTimeoutException
                value = struct.unpack('>Hx', response)[0]
                chkRead = ord(response[2])
                chkCalc = (0x55 + sum(map(ord, response[:2]))) % 256
                if chkRead != chkCalc:
                    raise OsTechChecksumWrongError
            else:
                value = self._ser.readline(eol=EOLCHAR)
                if value == "":
                    raise serial.SerialTimeoutException
        return value


if __name__=="__main__":
    port = 0
    if len(sys.argv) > 1:
        if sys.argv[1] == "--help":
            print "Usage: ostech.py [serial port]"
            exit()
        try:
            port = int(sys.argv[1])
        except ValueError:
            port = sys.argv[1]
    print "Using port", port
    c = Comm(port)
    c.connect()
    c.setBinaryMode(True)
    print "Examples:"
    print "Read LTC:", c.sendCmd("LTC", bool)
    print "Set LTT to 18°C:", c.sendCmd("LTT", float, 18)
    print "Read LTA:", c.sendCmd("LTA", float)
    print "Read Status:", c.sendCmd("GS", int)