I want to automate testing of an instrument and wrote a little server program to imitate the instrument which will send back the command except when it receives a special command "*IDN?". When I ran the echo server directly in its own script, and then run a client script separately, everything works great, and I am getting back the expected results. Now I wanted to run the server directly from the testing script. So I thought I would start it using multiprocessing. But the problem seems to be when the server socket gets to the s.accept() line it just waits there and never returns. So how do I accomplish automated testing if I cannot run this server in the same code as the test function?
import socket
import multiprocessing as mp
import time,sys
HOST = '127.0.0.1' # Standard loopback interface address (localhost),
PORT = 65432 # Port to listen on (non-privileged ports are > 1023),
FTP_PORT = 63217 # Port for ftp testing, change to 21 for device
def handle_connection(conn,addr):
with conn:
conn.send('Connected by', addr)
print("Got connection")
data = conn.recv(1024)
if not data:
return 'Nodata'
elif (data == b'*IDN?\n'):
print('SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
conn.sendall(b'SONY/TEK,AWG520,0,SCPI:95.0 OS:3.0 USR:4.0\n')
return 'IDN'
else:
conn.sendall(data)
return 'Data'
def echo_server(c_conn,host=HOST,port=PORT):
# this server simulates the AWG command protocol, simply echoing the command back except for IDN?
p = mp.current_process()
print('Starting echo server:', p.name, p.pid)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
try:
while True:
print("Waiting for connection...")
c_conn.send('waiting for connection...')
conn, addr = s.accept()
handle_connection(conn,addr)
c_conn.send('serving client...')
finally:
conn.close()
c_conn.send('done')
time.sleep(2)
print('Exiting echo server:', p.name, p.pid)
sys.stdout.flush()
def test_echo_server():
print("entering client part")
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as mysock:
mysock.connect((HOST,PORT))
mysock.sendall('test\n'.encode())
data = mysock.recv(1024)
print('received:',repr(data))
if __name__ == '__main__':
parent_conn, child_conn = mp.Pipe()
echo_demon = mp.Process(name='echo', target=echo_server(child_conn, ))
echo_demon.daemon = True
echo_demon.start()
time.sleep(1)
echo_demon.join(1)
test_echo_server()
if parent_conn.poll(1):
print(parent_conn.recv())
else:
print('Waiting for echo server')
Aucun commentaire:
Enregistrer un commentaire