Browse Source

add cmd queue
modify urscripts to detect end

Martin Kunz 5 years ago
parent
commit
00efa801bb

+ 10 - 0
pom.xml

@@ -94,6 +94,16 @@
 			<version>5.3.1</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp</artifactId>
+			<version>3.14.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp-tls</artifactId>
+			<version>3.14.0</version>
+		</dependency>
 		<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
 		<dependency>
 			<groupId>ch.qos.logback</groupId>

+ 1 - 1
src/main/java/at/acdp/urweb/Main.java

@@ -11,7 +11,7 @@ public class Main {
         try {
             app = picocli.CommandLine.populateCommand(new Params(), args);
         } catch (Exception e) {
-            System.out.println(e.getMessage());
+            logger.error("failed.", e);
             picocli.CommandLine.usage(new Params(), System.out);
             System.exit(1);
         }

+ 22 - 0
src/main/java/at/acdp/urweb/RobotCommand.java

@@ -0,0 +1,22 @@
+package at.acdp.urweb;
+
+public class RobotCommand {
+    public String cmd;
+    public int id;
+    public String cpeeCallback;
+    public String cpeeCallbackResult;
+    public int cpeeCallbackCode;
+    public String cpeeCallbackMessage;
+    public String cpeeCallbackError;
+
+    public RobotCommand(int id, String cmd) {
+        this.cmd = cmd;
+        this.id = id;
+    }
+
+    public RobotCommand(int id, String cmd, String cpeeCallback) {
+        this.cmd = cmd;
+        this.id = id;
+        this.cpeeCallback = cpeeCallback;
+    }
+}

+ 91 - 22
src/main/java/at/acdp/urweb/URBot.java

@@ -1,54 +1,123 @@
 package at.acdp.urweb;
 
 import at.acdp.urweb.sclient.data.MasterBoardData;
+import at.acdp.urweb.sclient.data.RobotProgramLabel;
 import at.acdp.urweb.sclient.data.ScReadThread;
+import okhttp3.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class URBot {
-    private ScReadThread sc;
     private static final Logger log = LoggerFactory.getLogger(URBot.class);
+    private static final MediaType MEDIA_TYPE_JSON
+            = MediaType.get("application/json; charset=utf-8");
+    private ScReadThread sc;
+    private BlockingQueue<RobotCommand> cmdq = new LinkedBlockingQueue<>();
+    private AtomicInteger nextID = new AtomicInteger(1);
+    private Semaphore cmdDoneSem = new Semaphore(0);
+    private RobotProgramLabel currentAck;
 
     public URBot(String ip) {
-        sc=new ScReadThread(ip);
-        Thread t=new Thread(sc);
-        t.start();
+        sc = new ScReadThread(ip, this);
+    }
+
+    public synchronized void ackNumber(RobotProgramLabel rpl) {
+        this.currentAck = rpl;
+        cmdDoneSem.release();
+    }
+
+    public void start() {
+        Thread readThread = new Thread(sc);
+        readThread.start();
+        Thread queueThread = new Thread(() -> {
+            try {
+                while (true) {
+                    var c = cmdq.take();
+                    write(c.cmd);
+                    while (true) {
+                        cmdDoneSem.acquire();
+                        if (currentAck.id == c.id  && currentAck.message.equals("URWEB_END"))
+                            break;
+                    }
+                    if (c.cpeeCallback != null) {
+                        ackCPEE(c);
+                    }
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
     }
 
-    public void sendProgram(String program) {
-        String[] lines=program.split("[\\r\\n]+");
-        String res="";
-        for(String line:lines) {
-            line=line.trim();
-            if(line.startsWith("//"))
+
+    private void write(String program) {
+        String[] lines = program.split("[\\r\\n]+");
+        String res = "";
+        for (String line : lines) {
+            line = line.trim();
+            if (line.startsWith("//"))
                 continue;
-            if(line.startsWith("#"))
+            if (line.startsWith("#"))
                 continue;
-            if(!line.endsWith("\n"))
-                line+="\n";
+            if (!line.endsWith("\n"))
+                line += "\n";
+            res += line;
+        }
+        if (!program.endsWith("\n"))
+            program += "\n";
+        try {
+            this.cmdq.put(new RobotCommand(nextID.getAndIncrement(), program));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
 
-            res+=line;
+    private void ackCPEE(RobotCommand r) {
+        try {
+            OkHttpClient client = new OkHttpClient();
+            Request request = new Request.Builder()
+                    .url(r.cpeeCallback + "/")
+                    .put(RequestBody.create(MEDIA_TYPE_JSON, "[{}]"))
+                    .build();
+            log.info("putting to {}", r.cpeeCallback);
+            Response res = client.newCall(request).execute();
+            log.info("result {}", res.toString());
+            r.cpeeCallbackResult = res.body().string();
+            r.cpeeCallbackCode = res.code();
+            r.cpeeCallbackMessage = res.message();
+        } catch (Exception e) {
+            log.warn("cpee callback failed", e);
+            r.cpeeCallbackError = e.getMessage();
         }
-        if(!program.endsWith("\n"))
-            program+="\n";
-        sc.writeCmd(program);
+    }
+
+    private void sendStringCmd(String cmd) {
+        sc.writeCmd(new RobotCommand(nextID.getAndIncrement(), cmd));
     }
 
     public void sendFreedrive(int timeout) {
-        sc.writeCmd(String.format("def myProg():\n\tfreedrive_mode()\n\tsleep(%d)\nend", timeout));
+        sendStringCmd(String.format("def myProg():\n\tfreedrive_mode()\n\tsleep(%d)\nend", timeout));
     }
 
     public void sendMessage(String message) {
-        sc.writeCmd(String.format("textmsg(%s)", message));
+        sendStringCmd(String.format("textmsg(%s)", message));
     }
 
     public void setDigital(int which, boolean val) {
-        String c=String.format("digital_out[%d]=%s", which, val?"True":"False");
-        sc.writeCmd(c);
+        sendStringCmd(String.format("digital_out[%d]=%s", which, val ? "True" : "False"));
     }
 
     public boolean getDigital(int which) {
         MasterBoardData mb = sc.getRde().getLastMB();
-        return ((mb.digitalOutputBits&(1<<which)) >0) ;
+        return ((mb.digitalOutputBits & (1 << which)) > 0);
+    }
+
+    public void sendCmd(String cmd) {
+        write(String.format("def urweb():\n{}\nend\n", cmd));
     }
 }

+ 5 - 1
src/main/java/at/acdp/urweb/sclient/SecondaryClient.java

@@ -1,6 +1,7 @@
 package at.acdp.urweb.sclient;
 
 import at.acdp.urweb.CountDataInputStream;
+import at.acdp.urweb.URBot;
 import at.acdp.urweb.sclient.data.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,15 +14,17 @@ public class SecondaryClient {
     private static final Logger log = LoggerFactory.getLogger(SecondaryClient.class);
     private final String ip;
     private final int port;
+    private final URBot urBot;
     private volatile boolean _running = true;
     private Socket rt;
     private OutputStream os;
     private CountDataInputStream in;
     private MasterBoardData lastMB=new MasterBoardData();
 
-    public SecondaryClient(String ip, int port) {
+    public SecondaryClient(String ip, int port, URBot urBot) {
         this.ip = ip;
         this.port = port;
+        this.urBot = urBot;
     }
 
     public VersionMessage connect() throws IOException {
@@ -217,6 +220,7 @@ public class SecondaryClient {
                 rpl.read(di, size);
                 URLog.add(rpl);
                 log.debug(rpl.toString());
+                urBot.ackNumber(rpl);
                 break;
             case RobotMessageType.ERROR_CODE:
                 var rme=new RobotMessageError();

+ 3 - 4
src/main/java/at/acdp/urweb/sclient/data/RobotProgramLabel.java

@@ -6,20 +6,19 @@ import com.eclipsesource.json.JsonObject;
 import java.io.IOException;
 
 public class RobotProgramLabel implements IRead, IJsonObject{
-    public int row;
+    public int id;
     public String message;
 
     @Override
     public JsonObject toJSON() {
         return new JsonObject()
-            .add("row", row)
+            .add("id", id)
             .add("message", message);
-
     }
 
     @Override
     public void read(CountDataInputStream di, int size) throws IOException {
-        row=di.readInt();
+        id=di.readInt();
         byte[] asd=new byte[size-15-4];
         di.readFully(asd);
         message=new String(asd);

+ 22 - 15
src/main/java/at/acdp/urweb/sclient/data/ScReadThread.java

@@ -1,9 +1,10 @@
 package at.acdp.urweb.sclient.data;
 
+import at.acdp.urweb.RobotCommand;
+import at.acdp.urweb.URBot;
 import at.acdp.urweb.sclient.SecondaryClient;
 import at.acdp.urweb.sclient.URLog;
-import at.acdp.urweb.sclient.data.Message;
-import at.acdp.urweb.sclient.data.VersionMessage;
+import okhttp3.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -12,13 +13,14 @@ import java.io.IOException;
 public class ScReadThread implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(SecondaryClient.class);
     private final String ip;
-
+    private final URBot urBot;
     SecondaryClient rde;
 
-
-    public ScReadThread(String ip) {
-        this.ip=ip;
+    public ScReadThread(String ip, URBot urBot) {
+        this.ip = ip;
+        this.urBot=urBot;
     }
+    
 
     public SecondaryClient getRde() {
         return rde;
@@ -26,28 +28,33 @@ public class ScReadThread implements Runnable {
 
     @Override
     public void run() {
-        while(true) {
+        while (true) {
             try {
                 read();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 log.warn("read thread terminated", e);
-                URLog.add(new Message("read thread terminated", e ));
+                URLog.add(new Message("read thread terminated", e));
             }
         }
     }
 
     private void read() throws IOException {
-        rde=new SecondaryClient(ip, 30001);
+        rde = new SecondaryClient(ip, 30001, urBot);
         VersionMessage vm = rde.connect();
-        while(true) {
+        while (true) {
             rde.readPackage();
         }
     }
 
-    public void writeCmd(String s) {
-        if(!s.endsWith("\n"))
-            s+="\n";
-        log.info("writecmd: \""+s+"\"");
+    public void writeCmd(RobotCommand r) {
+        writeCmd(r.cmd);
+    }
+
+
+    private void writeCmd(String s) {
+        if (!s.endsWith("\n"))
+            s += "\n";
+        log.info("writecmd: \"" + s + "\"");
         rde.writeCmd(s);
     }
 }

+ 4 - 3
src/main/java/at/acdp/urweb/web/WebServer.java

@@ -9,15 +9,16 @@ import static spark.Spark.*;
 
 public class WebServer {
     private final static org.slf4j.Logger logger = LoggerFactory.getLogger(WebServer.class);
-    private final URBot urbot;
+    private URBot urbot;
     private final Params params;
 
     public WebServer(Params params) {
         this.params = params;
-        this.urbot=new URBot(params.robotIP);
     }
 
     public void start() {
+        this.urbot=new URBot(params.robotIP);
+        this.urbot.start();
         port(params.port);
         if (!params.webroot.isEmpty())
             staticFileLocation(params.webroot);
@@ -27,7 +28,7 @@ public class WebServer {
         post("/cmd", (req, res) -> {
             byte[] bytes = req.raw().getInputStream().readAllBytes();
             String cmd=new String(bytes);
-            urbot.sendProgram(cmd);
+            urbot.sendCmd(cmd);
             return "";
         });
         post("/freedrive",  (req, res) -> {