Browse Source

perform cpee callback
remove uploads after 30sek finished

Martin Kunz 5 years ago
parent
commit
65549084bb

+ 20 - 0
pom.xml

@@ -49,6 +49,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
                             <createSourcesJar>false</createSourcesJar>
                             <shadedClassifierName>shaded</shadedClassifierName>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
@@ -86,6 +96,11 @@
             <artifactId>okhttp</artifactId>
             <version>3.14.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp-tls</artifactId>
+            <version>3.14.0</version>
+        </dependency>
         <dependency>
             <groupId>com.sparkjava</groupId>
             <artifactId>spark-core</artifactId>
@@ -102,5 +117,10 @@
             <artifactId>logback-classic</artifactId>
             <version>1.2.3</version>
         </dependency>
+        <dependency>
+            <groupId>info.faljse</groupId>
+            <artifactId>SDNotify</artifactId>
+            <version>1.3</version>
+        </dependency>
     </dependencies>
 </project>

+ 61 - 26
src/main/java/com/acdp/transceivr/LoadTools.java

@@ -2,6 +2,7 @@ package com.acdp.transceivr;
 
 import okhttp3.*;
 import okio.BufferedSink;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -9,31 +10,43 @@ import java.io.InputStream;
 public class LoadTools {
     public static final MediaType MEDIA_TYPE_BINARY
             = MediaType.parse("application/octet-stream");
+    public static final MediaType MEDIA_TYPE_JSON
+            = MediaType.get("application/json; charset=utf-8");
+    private final static org.slf4j.Logger logger = LoggerFactory.getLogger(LoadTools.class);
+
+    private static void finishTransfer(Transfer t) {
+        t.finishedAT=System.currentTimeMillis();
+        t.finished=true;
+    }
 
     private static void download(Transfer t, ProgressListener progressListener) throws IOException {
         Request request = new Request.Builder()
-            .url(t.from)
-            .build();
+                .url(t.from)
+                .build();
 
         OkHttpClient client = new OkHttpClient.Builder()
-            .addNetworkInterceptor(chain -> {
-                Response originalResponse = chain.proceed(chain.request());
-                return originalResponse.newBuilder()
-                        .body(new ProgressResponseBody(originalResponse.body(), progressListener))
-                        .build();
-            })
-            .build();
-        var call=client.newCall(request);
-        t.call=call;
+                .addNetworkInterceptor(chain -> {
+                    Response originalResponse = chain.proceed(chain.request());
+                    return originalResponse.newBuilder()
+                            .body(new ProgressResponseBody(originalResponse.body(), progressListener))
+                            .build();
+                })
+                .build();
+        var call = client.newCall(request);
+        t.call = call;
         call.enqueue(new Callback() {
             @Override
             public void onFailure(Call call, IOException e) {
-                t.uploadError=e.toString();
+                t.uploadError = e.toString();
             }
 
             @Override
             public void onResponse(Call call, Response response) throws IOException {
-                if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+                t.downloadCode = response.code();
+                if (!response.isSuccessful()) {
+                    finishTransfer(t);
+                    return;
+                }
                 try {
                     upload(t, response.body().byteStream());
                 } catch (IOException e) {
@@ -56,7 +69,6 @@ public class LoadTools {
             }
         };
 
-
         OkHttpClient client = new OkHttpClient();
         Request request = new Request.Builder()
                 .url(t.to)
@@ -64,22 +76,45 @@ public class LoadTools {
                 .build();
 
         client.newCall(request).enqueue(
-            new Callback() {
-                @Override
-                public void onFailure(Call call, IOException e) {
-                    t.uploadError = e.toString();
-                }
+                new Callback() {
+                    @Override
+                    public void onFailure(Call call, IOException e) {
+                        t.uploadError = e.toString();
+                    }
 
-                @Override
-                public void onResponse(Call call, Response response) throws IOException {
-                    if (!response.isSuccessful()) {
-                        t.uploadError = response.toString();
+                    @Override
+                    public void onResponse(Call call, Response response) throws IOException {
+                        if (!response.isSuccessful()) {
+                            t.uploadError = response.toString();
+                            finishTransfer(t);
+                        }
                         t.uploadCode = response.code();
+                        t.uploadResponseBody = response.body().string();
+                        t.uploadDone = true;
+                        if (t.cpeeCallback != null) {
+                            try {
+                                OkHttpClient client = new OkHttpClient();
+                                Request request = new Request.Builder()
+                                        .url(t.cpeeCallback + "/")
+                                        .put(RequestBody.create(MEDIA_TYPE_JSON, "[{}]"))
+                                        .build();
+                                logger.info("putting to {}", t.cpeeCallback);
+                                Response res = client.newCall(request).execute();
+                                logger.info("result {}", res.toString());
+                                t.cpeeCallbackResult = res.body().string();
+                                t.cpeeCallbackCode = res.code();
+                                t.cpeeCallbackMessage = res.message();
+                            } catch (Exception e) {
+                                logger.warn("cpee callback failed", e);
+                                t.cpeeCallbackError = e.getMessage();
+                                finishTransfer(t);
+                            }
+                        }
+                        else {
+                            finishTransfer(t);
+                        }
                     }
-                    t.uploadResponseBody = response.body().string();
-                    t.uploadDone = true;
                 }
-            }
         );
     }
 

+ 19 - 2
src/main/java/com/acdp/transceivr/Main.java

@@ -1,7 +1,24 @@
 package com.acdp.transceivr;
 
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
 public class Main {
-    public static void main(String [ ] args) {
-        new WebServer(8082).start();
+    private final static org.slf4j.Logger logger = LoggerFactory.getLogger(Main.class);
+
+    public static void main(String[] args) {
+        Params app = null;
+        try {
+             app = CommandLine.populateCommand(new Params(), args);
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            CommandLine.usage(new Params(), System.out);
+            System.exit(1);
+        }
+        try {
+            new WebServer(app).start();
+        } catch (Exception e) {
+            logger.error("Server exited", e);
+        }
     }
 }

+ 13 - 0
src/main/java/com/acdp/transceivr/Params.java

@@ -0,0 +1,13 @@
+package com.acdp.transceivr;
+
+import picocli.CommandLine;
+
+import java.io.File;
+
+public class Params {
+    @CommandLine.Option(names = { "-p", "--port" }, description = "HTTP Server port", required = true)
+    public int port = 8080;
+
+    @CommandLine.Option(names = { "-w", "--webroot" }, description = "Use webroot from filesystem", defaultValue = "")
+    public String webroot = "";
+}

+ 26 - 3
src/main/java/com/acdp/transceivr/Transfer.java

@@ -4,13 +4,15 @@ import com.eclipsesource.json.JsonObject;
 import okhttp3.Call;
 
 public class Transfer {
-    public Call call;
+
+
+
 
     public Transfer(int id) {
         this.id = id;
         startTS=System.currentTimeMillis();
     }
-
+    public Call call;
     public long bytesRead;
     public long contentLength;
     public String from;
@@ -19,10 +21,22 @@ public class Transfer {
     public int uploadCode;
     public String uploadResponseBody;
     public boolean downloadDone = false;
+    public int downloadCode;
+    public String downloadError;
     public boolean uploadDone = false;
     public boolean canceled=false;
     public long startTS;
     public long currentTS;
+    public String cpeeCallback;
+    public String cpeeCallbackId;
+    public String cpeeInstanceURL;
+    public boolean doCpeeCallback=false;
+    public String cpeeCallbackResult;
+    public int cpeeCallbackCode;
+    public String cpeeCallbackMessage;
+    public String cpeeCallbackError;
+    public boolean finished;
+    public long finishedAT;
 
     int id;
 
@@ -36,9 +50,18 @@ public class Transfer {
         js.add("uploadError", uploadError);
         js.add("uploadCode", uploadCode);
         js.add("uploadResponseBody", uploadResponseBody);
+        js.add("uploadDone", uploadDone);
         js.add("progress", contentLength>0?((100 * bytesRead) / contentLength):0);
         js.add("downloadDone", downloadDone);
-        js.add("uploadDone", uploadDone);
+        js.add("downloadCode", downloadCode);
+        js.add("downloadError", downloadError);
+        js.add("cpeeCallbackResult", cpeeCallbackResult);
+        js.add("cpeeCallbackCode", cpeeCallbackCode);
+        js.add("cpeeCallbackMessage", cpeeCallbackMessage);
+        js.add("cpeeCallbackError", cpeeCallbackError);
+
+
+
         var diff=currentTS-startTS;
         js.add("rate",  diff>0?(bytesRead/diff):0);
         return js;

+ 45 - 13
src/main/java/com/acdp/transceivr/WebServer.java

@@ -1,8 +1,13 @@
 package com.acdp.transceivr;
 
 import com.eclipsesource.json.JsonArray;
+import info.faljse.SDNotify.SDNotify;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static spark.Spark.*;
@@ -10,17 +15,21 @@ import static spark.Spark.*;
 
 public class WebServer {
     private final static org.slf4j.Logger logger = LoggerFactory.getLogger(WebServer.class);
-    private final int port;
+    private final Params params;
     private int nextID = 1;
     private ConcurrentHashMap<Integer, Transfer> uploads = new ConcurrentHashMap<>();
+    private Timer t=new Timer("cleanup",true);
 
-    public WebServer(int port) {
-        this.port = port;
+    public WebServer(Params params) {
+        this.params = params;
     }
 
     public void start() {
-        port(port);
-        staticFiles.location("/webroot");
+        port(params.port);
+        if(!params.webroot.isEmpty())
+            staticFileLocation(params.webroot);
+        else
+            staticFiles.location("/webroot");
         get("/zero", (req, res) -> {
             var os = res.raw().getOutputStream();
             var ba = new byte[1024];
@@ -30,15 +39,16 @@ public class WebServer {
         post("/null", (req, res) -> {
             var is = req.raw().getInputStream();
             var ba = new byte[1024];
-            while (true)
-                is.read(ba);
+            while (is.read(ba)!=-1);
+            res.status(200);
+            return "";
         });
         post("/cancel", (req, res) -> {
             int id = Integer.parseInt(req.queryParams("id"));
-            var t=uploads.get(id);
-            t.canceled=true;
+            var t = uploads.get(id);
+            t.canceled = true;
             t.call.cancel();
-
+            res.status(200);
             return "OK";
         });
         get("/status", (req, res) -> {
@@ -53,14 +63,36 @@ public class WebServer {
                 Transfer t = new Transfer(nextID++);
                 t.from = req.queryParams("from");
                 t.to = req.queryParams("to");
+                t.cpeeCallback = req.headers("CPEE-CALLBACK");
+                t.cpeeCallbackId = req.headers("CPEE-CALLBACK-ID");
+                t.cpeeInstanceURL = req.headers("CPEE-INSTANCE-URL");
+                if (Boolean.valueOf(req.queryParams("callback"))) {
+                    res.header("CPEE-CALLBACK", "true");
+                    t.doCpeeCallback = true;
+                }
                 LoadTools.startTransfer(t);
                 uploads.put(t.id, t);
+                res.status(200);
                 return "OK: " + t.id;
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 return "FAILED: " + e.toString();
             }
         });
-    }
+        awaitInitialization();
+        t.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                for (var i = uploads.entrySet().iterator(); i.hasNext();) {
+                    var e = i.next();
+                    if(e.getValue().finished&&(e.getValue().finishedAT+30000<System.currentTimeMillis())){
+                        i.remove();
+                        logger.info("Removed transfer {}", e.getValue().id);
+                    }
+                }
+            }
+        },1000,1000);
 
+        SDNotify.sendNotify(); //notify: ready
+        logger.info("Running");
+    }
 }

+ 6 - 1
src/main/resources/webroot/index.html

@@ -52,9 +52,14 @@ uploadError: {{ item.uploadError}}
 uploadCode: {{ item.uploadCode}}
 uploadResponseBody: {{ item.uploadResponseBody}}
 uploadDone: {{ item.uploadDone}}
+downloadError: {{ item.downloadError}}
+downloadCode: {{ item.downloadCode}}
 downloadDone: {{ item.downloadDone}}
+cpeeCallbackResult: {{ item.cpeeCallbackResult}}
+cpeeCallbackCode: {{ item.cpeeCallbackCode}}
+cpeeCallbackMessage: {{ item.cpeeCallbackMessage}}
+cpeeCallbackError: {{ item.cpeeCallbackError}}
 rate: {{ item.rate}}k/s
-
 </pre>
                 </td>
                 <td></td>