¿Cómo escribir un programa de comunicación serie en Python?
Abra el puerto serie e inicie un hilo para monitorear la entrada de datos del puerto serie. Cuando haya datos, procese los datos.
Ejemplo de escritura de un programa de comunicación serie en Python:
#coding=gb18030?
import sys,threading,time;?
importar serie;?
importar binascii,codificaciones;?
importar re;?
importar socket;?
clase ReadThread:
def __init__(self, Salida=Ninguno, Puerto=0, Registro=Ninguno, i_FirstMethod=True):
self.l_serial = Ninguno;
self .alive = False;
self.waitEnd = Ninguno;
self.l_serial = Ninguno;
self.alive = False;self.output = Salida;
self.port = Puerto;
self.re_num = Ninguno;?
def esperando(self):
si no es self .waitEnd es Ninguno:
self.waitEnd.wait();?
def SetStopEvent(self):
si no, self.waitEnd es Ninguno:
self.waitEnd.set();
self.alive = False;
self.stop();?l_serial = serial.Serial(); /p>
self.l_serial.port = self.port;
self.l_serial.baudrate = 9600;
self.l_serial.timeout = 2; ?
self.re_num = re.compile('\d');?
intenta:
si no, self.output es Ninguno:
self.output.WriteTex;
si no, self.log es Ninguno:
self.log.info;
self.l _serial. la salida es Ninguna:
self.output.WriteText;
si no, self.log es Ninguna:
self.log.error(u'%s' % ex);
devuelve Falso;
si self.l_serial.isOpen():
si no, self.output es Ninguno:
self.output.WriteText;
si no es self.log es Ninguno:
si no es self.log.info.
self.waitEnd = threading .Event();
self.alive = Verdadero;
self.thread_read = Ninguno
self.threading.Thread(target=self.FirstReader) ;
self.thread_
read.setDaemon(1);
self.thread_read.start();
self.waitEnd = subprocesamiento.Event();
self.waitEnd = threading.Event();
self.alive = Verdadero
self.thread_read = Ninguno
self.start();
devuelve Verdadero;
de lo contrario:
si no, self.output es Ninguno:
self.output.WriteText;
si no, self.log es Ninguno:
self.log.info;
devuelve Falso
def InitHead(self):
intenta:
time.sleep(3);
si no, self.output es Ninguno:
self.output.WriteText
p>p>
si no, self.output es Ninguno:
self.write('\x11');
data1 = self.l_serial.read( 1024);
p>excepto ValueError, por ejemplo:
si no self.output es Ninguno:
self.output.WriteText;
si no, self.log es Ninguno:
self.log.error(u'%s' % ex);
self.SetStopEvent();
return;? p>
si no, self.output es Ninguno:
self.output.WriteText;
si no, self.log es Ninguno: p>
self.log.info;
self.output.WriteText(u'======================== ======== ====\r\n');?
def EnviarDatos(self, i_msg):
lmsg = '';
está bien = Falso;
if isinstance(i_msg, unicode):
lmsg = i_msg.encode(' gb18030');
lmsg = i_msg;
pasar
excepto excepción, por ejemplo:
pasar
regresar está bien;?
def; FirstReader(self):
data1 = '';
isQuanJiao = True;
isFirstMethod = True;
isEnd = True;
readCount = 0;?
saveCount = 0;
RepPos = 0
#leer contenido de información principal
self.InitHead() ;?
mientras self.alive:
>
intenta:
data = '';
n = self.l_serial.inWaiting();
si n:
datos = datos + self.l_serial.read (n);
#print binascii.b2a_hex(datos),?
para l en xrange(len(datos)):
si ord(datos[l])==0x8E:
isQuanJiao = True;
continuar;
si ord(datos [l])==0x8F:
if ord(data[l])==0x8F:
isQuanJiao = False;
continuar;
si ord(datos[l])==0x80 o ord(datos[l])==0x00:
si len(datos1)>10:
si not self.re_ num.search(data1,1) es Ninguno:
saveCount = saveCount + 1;
si RepPos==0:
RepPos = self.output.GetInsertionPoint();
self.output.Remove(RepPos,self.output.GetLastPosition());?
self.SendData(data1); >
data1 = '';
continuar
excepto excepción, por ejemplo:
si no, self.log es Ninguno:
self.log. error(u'%s' % ex);?
self.waitEnd.set();
self.alive = False;?
def stop(self):
self.alive = False;
self.thread_read.join();
if self.l_serial. isOpen():
self.l_serial. close();
si no, self.output es Ninguno:
self.output.WriteText
si no, self.log es Ninguno:
self.log.info;?
def printHex(self, s):
s1 = binascii.b2a_hex(s);
imprimir s1;?
if __name__ == '__main__':
rt = ReadThread();
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
intenta:
si rt.start():
rt.waiting();
rt.stop(); p>
rt.stop();
p>
else:
pasar;
excepto excepción,se:
imprimir str(se);?
<p>if rt.alive:
rt.stop();?
imprimir 'Fin OK ;
del rt;