Python Example Source Code
ostech.py
—
Python Source,
6 KB (7116 bytes)
File contents
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import with_statement import sys import time import serial import struct import threading TIMEOUT = 1 #: TimeOut in s EOLCHAR = "\r" #: End of line as OsTech drivers expect # OsTech Exception classes class OsTechError(Exception): """General OsTech error""" class OsTechCommError(OsTechError): """Communication errors""" class OsTechEchoDiffersError(OsTechCommError): """Received echo differs from sent command.""" def __init__(self, cmdName=None): self.cmdName = cmdName def __repr__(self): return "OsTechEchoDiffersError(%s)" % self.cmdName class OsTechChecksumWrongError(OsTechCommError): """Checksum of read value is wrong.""" class OsTechPortInUseError(OsTechCommError): """A second instance using the same serial port is about to be created.""" def pack(value): """Converts a value to a format OsTech drivers understand. Numbers are forced to fit into 9 characters. """ if type(value)==bool: if value: result = "R" else: result = "S" elif type(value) in [int, long]: result = "%u" % max(0, min(65535, value)) elif type(value)==float: for i in range(9,0,-1): result = "%.*g" % (i,value) if len(result)<=9: break else: raise TypeError return result class Comm(object): """Communication with OsTech drivers.""" usedPorts = [] usedPortsLock = threading.RLock() def __init__(self, port=0): self.port = port # mark port as used with self.usedPortsLock: if port not in self.usedPorts: self.usedPorts.append(port) else: raise OsTechPortInUseError # init serial port self._ser = serial.Serial(timeout=TIMEOUT, writeTimeout=TIMEOUT) self._ser.setPort(port) self._serLock = threading.RLock() def __del__(self): # mark port as unused with self.usedPortsLock: if self._ser.getPort() in self.usedPorts: self.usedPorts.remove(self._ser.getPort()) def connect(self): """Establishes a connection to a driver.""" with self._serLock: self._ser.open() def disconnect(self): """Closes connection to driver.""" with self._serLock: self._ser.close() def isConnected(self): """Returns if connected with a driver.""" with self._serLock: return self._ser.isOpen() def getPortName(self): return self._ser.makeDeviceName(self.port) def sendTextCmd(self, cmdStr): """Sends command to driver in standard mode and returns answer string. """ cmd = Command(name=cmdStr, typ=str) with self._serLock: self.setBinaryMode(False) response = self.sendCmd(cmdStr) self.setBinaryMode(True) if response.value == "": raise serial.SerialTimeoutException return response.value def setBinaryMode(self, binary=True): """Enables or disables binary mode.""" with self._serLock: if binary: self.sendCmd("GMS", int, 8) else: self.sendCmd("GMC", str, 8) def sendCmd(self, cmd, typ, newValue=None, retry=True): """Sends command to driver and checks echo.""" cmdStr = cmd if newValue is not None: cmdStr += pack(newValue) cmdStr += EOLCHAR with self._serLock: for i in range(2): try: self._ser.flushInput() self._ser.write(cmdStr) echo = self._ser.readline(eol=EOLCHAR) if echo == "": raise serial.SerialTimeoutException if echo != cmdStr.upper(): print "<<\"%s\" != >>\"%s\"\n" % (repr(echo), repr(cmdStr.upper())) sys.stdout.flush() raise OsTechEchoDiffersError(cmd.name) if typ is not None: value = self._readValue(typ) else: value = None self._ser.flushInput() except (serial.SerialException, OsTechError): if not retry: raise else: print "KOMMUNIKATIONSFEHLER %d %s " % (i, cmdStr[:-1]) # DEBUG time.sleep(.1) self._ser.write(chr(27)) time.sleep(.1) if i > 0: raise else: break return value def _readValue(self, readType): """Reads a value of the given type in OsTech driver representation from the serial connection. Then the value is converted accordingly and returned. If no type is given a string is read until EOLCHAR. """ with self._serLock: if readType == bool: response = self._ser.read(1) if len(response)!=1: raise serial.SerialTimeoutException if response == "\xAA": value = True elif response=="\x55": value = False else: raise OsTechChecksumWrongError elif readType == float: response = self._ser.read(5) if len(response) != 5: raise serial.SerialTimeoutException value = struct.unpack('>fx', response)[0] chkRead = ord(response[4]) chkCalc = (0x55 + sum(map(ord, response[:4]))) % 256 if chkRead != chkCalc: raise OsTechChecksumWrongError elif readType == int: response = self._ser.read(3) if len(response) != 3: raise serial.SerialTimeoutException value = struct.unpack('>Hx', response)[0] chkRead = ord(response[2]) chkCalc = (0x55 + sum(map(ord, response[:2]))) % 256 if chkRead != chkCalc: raise OsTechChecksumWrongError else: value = self._ser.readline(eol=EOLCHAR) if value == "": raise serial.SerialTimeoutException return value if __name__=="__main__": port = 0 if len(sys.argv) > 1: if sys.argv[1] == "--help": print "Usage: ostech.py [serial port]" exit() try: port = int(sys.argv[1]) except ValueError: port = sys.argv[1] print "Using port", port c = Comm(port) c.connect() c.setBinaryMode(True) print "Examples:" print "Read LTC:", c.sendCmd("LTC", bool) print "Set LTT to 18°C:", c.sendCmd("LTT", float, 18) print "Read LTA:", c.sendCmd("LTA", float) print "Read Status:", c.sendCmd("GS", int)