Red de conocimiento informático - Material del sitio web - ¿Cómo escribir un programa de comunicación serie en Python?

¿Cómo escribir un programa de comunicación serie en Python?

Abra el puerto serie e inicie un hilo para monitorear la entrada de datos del puerto serie. Cuando haya datos, procese los datos.

Ejemplo de escritura de un programa de comunicación serie en Python:

#coding=gb18030?

import sys,threading,time;?

importar serie;?

importar binascii,codificaciones;?

importar re;?

importar socket;?

clase ReadThread:

def __init__(self, Salida=Ninguno, Puerto=0, Registro=Ninguno, i_FirstMethod=True):

self.l_serial = Ninguno;

self .alive = False;

self.waitEnd = Ninguno;

self.l_serial = Ninguno;

self.alive = False;self.output = Salida;

self.port = Puerto;

self.re_num = Ninguno;?

def esperando(self):

si no es self .waitEnd es Ninguno:

self.waitEnd.wait();?

def SetStopEvent(self):

si no, self.waitEnd es Ninguno:

self.waitEnd.set();

self.alive = False;

self.stop();?l_serial = serial.Serial(); /p>

self.l_serial.port = self.port;

self.l_serial.baudrate = 9600;

self.l_serial.timeout = 2; ?

self.re_num = re.compile('\d');?

intenta:

si no, self.output es Ninguno:

self.output.WriteTex;

si no, self.log es Ninguno:

self.log.info;

self.l _serial. la salida es Ninguna:

self.output.WriteText;

si no, self.log es Ninguna:

self.log.error(u'%s' % ex);

devuelve Falso;

si self.l_serial.isOpen():

si no, self.output es Ninguno:

self.output.WriteText;

si no es self.log es Ninguno:

si no es self.log.info.

self.waitEnd = threading .Event();

self.alive = Verdadero;

self.thread_read = Ninguno

self.threading.Thread(target=self.FirstReader) ;

self.thread_

read.setDaemon(1);

self.thread_read.start();

self.waitEnd = subprocesamiento.Event();

self.waitEnd = threading.Event();

self.alive = Verdadero

self.thread_read = Ninguno

self.start();

devuelve Verdadero;

de lo contrario:

si no, self.output es Ninguno:

self.output.WriteText;

si no, self.log es Ninguno:

self.log.info;

devuelve Falso

def InitHead(self):

intenta:

time.sleep(3);

si no, self.output es Ninguno:

self.output.WriteText

p>

p>

si no, self.output es Ninguno:

self.write('\x11');

data1 = self.l_serial.read( 1024);

p>

excepto ValueError, por ejemplo:

si no self.output es Ninguno:

self.output.WriteText;

si no, self.log es Ninguno:

self.log.error(u'%s' % ex);

self.SetStopEvent();

return;?

si no, self.output es Ninguno:

self.output.WriteText;

si no, self.log es Ninguno:

self.log.info;

self.output.WriteText(u'======================== ======== ====\r\n');?

def EnviarDatos(self, i_msg):

lmsg = '';

está bien = Falso;

if isinstance(i_msg, unicode):

lmsg = i_msg.encode(' gb18030');

lmsg = i_msg;

pasar

excepto excepción, por ejemplo:

pasar

regresar está bien;?

def; FirstReader(self):

data1 = '';

isQuanJiao = True;

isFirstMethod = True;

isEnd = True;

readCount = 0;?

saveCount = 0;

RepPos = 0

#leer contenido de información principal

self.InitHead() ;?

mientras self.alive:

>

intenta:

data = '';

n = self.l_serial.inWaiting();

si n:

datos = datos + self.l_serial.read (n);

#print binascii.b2a_hex(datos),?

para l en xrange(len(datos)):

si ord(datos[l])==0x8E:

isQuanJiao = True;

continuar;

si ord(datos [l])==0x8F:

if ord(data[l])==0x8F:

isQuanJiao = False;

continuar;

si ord(datos[l])==0x80 o ord(datos[l])==0x00:

si len(datos1)>10:

si not self.re_ num.search(data1,1) es Ninguno:

saveCount = saveCount + 1;

si RepPos==0:

RepPos = self.output.GetInsertionPoint();

self.output.Remove(RepPos,self.output.GetLastPosition());?

self.SendData(data1); >

data1 = '';

continuar

excepto excepción, por ejemplo:

si no, self.log es Ninguno:

self.log. error(u'%s' % ex);?

self.waitEnd.set();

self.alive = False;?

def stop(self):

self.alive = False;

self.thread_read.join();

if self.l_serial. isOpen():

self.l_serial. close();

si no, self.output es Ninguno:

self.output.WriteText

si no, self.log es Ninguno:

self.log.info;?

def printHex(self, s):

s1 = binascii.b2a_hex(s);

imprimir s1;?

if __name__ == '__main__':

rt = ReadThread();

f = open("sendport.cfg", "r")

rt.sendport = f.read()

f.close()

intenta:

si rt.start():

rt.waiting();

rt.stop();

rt.stop();

p>

else:

pasar;

excepto excepción,se:

imprimir str(se);?

<

p>if rt.alive:

rt.stop();?

imprimir 'Fin OK ;

del rt;