package at.acdp.urweb.rtde; import at.acdp.urweb.rtde.packets.*; import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.util.HashMap; import java.util.concurrent.Semaphore; import static at.acdp.urweb.rtde.CommandType.*; public class RTDEClient implements Runnable { private final static org.slf4j.Logger logger = LoggerFactory.getLogger(RTDEClient.class); private final Semaphore waitingRequestSem = new Semaphore(0); private volatile IRtdeData waitingRequest=null; public static int RTDE_PROTOCOL_VERSION = 2; private final String ip; private final int port; private volatile boolean _running = true; private volatile boolean initDone = false; private volatile DataOutputStream dos; private volatile DataInputStream dis; private Thread readThread; private RtdeSetupOutputs outputs; private HashMap lastData; private RtdeSetupOutputs so; public RTDEClient(String ip, int port) { this.ip = ip; this.port = port; } public void start(RtdeSetupOutputs so) throws IOException, InterruptedException { this.so=so; readThread = new Thread(this); readThread.start(); } public HashMap getLastData() { return lastData; } public void request(IRtdeData data) throws IOException, InterruptedException { while(dos==null||dis==null) { Thread.sleep(100); } if(readThread.getId()==Thread.currentThread().getId()) { logger.error("request() must not be called from readThread."); } if(data.hasReply()) { waitingRequest=data; } data.send(dos); if(data.hasReply()) { waitingRequestSem.acquire(); waitingRequest=null; } return; } private boolean init() throws IOException, InterruptedException { { var rpv = new RtdeRequestProtocolVersion(); rpv.send(dos); int length = dis.readShort(); int type = dis.readByte() & 0xFF; if(type!=RTDE_REQUEST_PROTOCOL_VERSION) return false; rpv.read(dis, length - 3); logger.info("RtdeRequestProtocolVersion succes:" + rpv.success); } { var ruv = new RtdeRequestURVersion(); ruv.send(dos); int length=0; length = dis.readShort(); int type = dis.readByte() & 0xFF; if(type!=RTDE_GET_URCONTROL_VERSION) return false; ruv.read(dis, length - 3); logger.info(String.format("RtdeRequestURVersion: %d.%d.%d.%d",ruv.major, ruv.minor, ruv.build, ruv.bugfix)); } { so.send(dos); int length = dis.readShort(); int type = dis.readByte() & 0xFF; if(type!=RTDE_CONTROL_PACKAGE_SETUP_OUTPUTS) return false; var r=so.read(dis, length - 1); logger.info("so:" +r.typesText); outputs=r; } { var start=new RtdeControlStart(); start.send(dos); int length = dis.readShort(); int type = dis.readByte() & 0xFF; if(type!=RTDE_CONTROL_PACKAGE_START) return false; start.read(dis, length -1); logger.info("start accepted:" +start.accepted); initDone=true; } return true; } private void handlePaket(int type, int length) throws IOException { var wreq=waitingRequest; if(wreq != null && type==wreq.getType()) { waitingRequest.read(dis, length-3); waitingRequestSem.release(); } else switch(type) { case RTDE_TEXT_MESSAGE: var rpv = new RtdeTextMessage(); rpv.read(dis, length-3); logger.info(String.format("RTDE_TEXT_MESSAGE: %s", rpv.text)); break; case RTDE_DATA_PACKAGE: var p = new RtdeOutDataPackage(outputs); p.read(dis, length-3); lastData=p.getPdata(); lastData.entrySet().forEach(entry->{ System.out.println(entry.getKey() + " " + entry.getValue()); }); break; default: { logger.warn(String.format("Unexpected package type: %d", type)); byte[] buf=new byte[length]; dis.readFully(buf); } } } @Override public void run() { while (_running) { initDone=false; try (Socket rt = new Socket(ip, port)){ rt.setSoTimeout(10000); rt.setReuseAddress(true); rt.setTcpNoDelay(true); if (rt.isConnected()) { logger.info("Connected to UR Realtime Client"); dos = new DataOutputStream(rt.getOutputStream()); dis = new DataInputStream(rt.getInputStream()); if(!init()) throw new IOException("init failed"); while (_running) { int length = dis.readShort(); int type=dis.readByte() & 0xFF; handlePaket(type, length); } } } catch (IOException| InterruptedException e) { logger.warn("rtde connection closed", e); } } } }