]> git.proxmox.com Git - proxmox-backup.git/commitdiff
play around with async tasks
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 10 Nov 2018 11:06:39 +0000 (12:06 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 10 Nov 2018 11:06:39 +0000 (12:06 +0100)
Cargo.toml
src/api3.rs
src/api_info.rs
src/main.rs

index 55cebade08fbed36e5670bdedd87f39145195312..b1f236acab61988793b8965d13831be1e2c9113c 100644 (file)
@@ -18,6 +18,7 @@ serde_json = "1.0.32"
 serde_derive = "1.0.80"
 url = "1.7.1"
 futures = "0.1.25"
+tokio = "0.1.11"
 http = "0.1.13"
 hyper = "0.12.14"
 lazy_static = "1.1.0"
index bf496f13f2b5e5e882b7eeae43cd5a63bf8b60ec..d397f472a31314bd31944d3d4678cb20ea8260c4 100644 (file)
@@ -6,8 +6,11 @@ use crate::json_schema::*;
 use crate::api_info::*;
 use serde_json::{json, Value};
 
+use futures::future::*;
+use tokio::prelude::*;
+use hyper::{Method, Body, Request, Response, Server, StatusCode};
 
-fn test_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> {
+fn test_sync_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> {
     println!("This is a test {}", param);
 
    // let force: Option<bool> = Some(false);
@@ -25,12 +28,30 @@ fn test_api_handler(param: Value, info: &ApiMethod) -> Result<Value, Error> {
     Ok(json!(null))
 }
 
+fn test_async_api_handler(
+    param: Value,
+    info: &ApiMethod
+) -> Box<Future<Item = Response<Body>, Error = Error> + Send> {
+    println!("This is a test {}", param);
+
+    let task = lazy(|| {
+        println!("A LAZY TASK");
+
+        let mut resp = Response::new(Body::from("A LAZY TASKs RESPONSE"));
+        *resp.status_mut() = StatusCode::OK;
+
+        ok(resp)
+    });
+
+    Box::new(task)
+}
 
 pub fn router() -> MethodInfo {
 
     let route = MethodInfo::new()
         .get(ApiMethod {
-            handler: test_api_handler,
+            handler: test_sync_api_handler,
+            async_handler: test_async_api_handler,
             description: "This is a simple test.",
             parameters: parameter!{
                 force => Boolean!{
@@ -43,5 +64,3 @@ pub fn router() -> MethodInfo {
 
     route
 }
-
-
index fb314d64ebdcfa878a5aed50f87ba57ef6b88ddd..44013c846886a68f4982f8e3bd459f8e87c13d87 100644 (file)
@@ -1,7 +1,9 @@
 use failure::*;
+use futures::future::*;
 
 use crate::json_schema::*;
 use serde_json::{Value};
+use hyper::{Body, Response};
 
 use std::collections::HashMap;
 
@@ -10,6 +12,7 @@ pub struct ApiMethod {
     pub parameters: Jss,
     pub returns: Jss,
     pub handler: fn(Value, &ApiMethod) -> Result<Value, Error>,
+    pub async_handler: fn(Value, &ApiMethod) -> Box<Future<Item = Response<Body>, Error = Error> + Send>
 }
 
 pub struct MethodInfo {
index 7305e55fba0365b530424ee7bcb6ef3823eb619a..0080941e24ea9aeffa3a7e19bc3b6ee06ced62aa 100644 (file)
@@ -10,7 +10,10 @@ use apitest::api_info::*;
 use apitest::json_schema::*;
 
 //use serde_derive::{Serialize, Deserialize};
-use serde_json::{json};
+use serde_json::{json, Value};
+
+use tokio::prelude::*;
+use tokio::timer::Delay;
 
 //use hyper::body::Payload;
 use hyper::http::request::Parts;
@@ -18,7 +21,9 @@ use hyper::{Method, Body, Request, Response, Server, StatusCode};
 use hyper::rt::{Future, Stream};
 use hyper::service::service_fn;
 
-use futures::future;
+use futures::future::*;
+
+use std::time::{Duration, Instant};
 
 type BoxFut = Box<Future<Item = Response<Body>, Error = failure::Error> + Send>;
 
@@ -33,15 +38,15 @@ macro_rules! error_response {
 macro_rules! http_error_future {
     ($status:ident, $msg:expr) => {{
         let resp = error_response!($status, $msg);
-        return Box::new(futures::future::ok(resp));
+        return Box::new(ok(resp));
     }}
 }
 
-fn handle_api_request<'a>(
+fn get_request_parameters_async<'a>(
     info: &'a ApiMethod,
     parts: Parts,
     req_body: Body,
-) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a>
+) -> Box<Future<Item = Value, Error = failure::Error> + Send + 'a>
 {
     let resp = req_body.concat2()
         .map_err(|err| format_err!("Promlems reading request body: {}", err))
@@ -68,25 +73,82 @@ fn handle_api_request<'a>(
                 }
             }
 
+            println!("GOT PARAMS {}", params);
+            Ok(params)
+        });
+
+    Box::new(resp)
+}
+
+fn handle_async_api_request<'a>(
+    info: &'a ApiMethod,
+    parts: Parts,
+    req_body: Body,
+) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a>
+{
+    let params = get_request_parameters_async(info, parts, req_body);
+
+    let resp = params
+        .and_then(move |params| {
+
+            println!("GOT PARAMS {}", params);
+
+            /*
+            let when = Instant::now() + Duration::from_millis(3000);
+            let task = Delay::new(when).then(|_| {
+                println!("A LAZY TASK");
+                ok(())
+            });
+
+            tokio::spawn(task);
+             */
+
+            (info.async_handler)(params, info)
+        });
+
+    Box::new(resp)
+}
+
+fn handle_sync_api_request<'a>(
+    info: &'a ApiMethod,
+    parts: Parts,
+    req_body: Body,
+) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a>
+{
+    let params = get_request_parameters_async(info, parts, req_body);
+
+    let resp = params
+        .and_then(move |params| {
+
             println!("GOT PARAMS {}", params);
 
+            /*
+            let when = Instant::now() + Duration::from_millis(3000);
+            let task = Delay::new(when).then(|_| {
+                println!("A LAZY TASK");
+                ok(())
+            });
+
+            tokio::spawn(task);
+             */
+
             let res = (info.handler)(params, info)?;
 
             Ok(res)
 
-   }).then(|result| {
-        match result {
-            Ok(ref value) => {
-                let json_str = value.to_string();
+        }).then(|result| {
+            match result {
+                Ok(ref value) => {
+                    let json_str = value.to_string();
 
-                Ok(Response::builder()
-                   .status(StatusCode::OK)
-                   .header("ContentType", "application/json")
-                   .body(Body::from(json_str))?)
+                    Ok(Response::builder()
+                       .status(StatusCode::OK)
+                       .header("ContentType", "application/json")
+                       .body(Body::from(json_str))?)
+                }
+                Err(err) => Ok(error_response!(BAD_REQUEST, err.to_string()))
             }
-            Err(err) => Ok(error_response!(BAD_REQUEST, err.to_string()))
-        }
-    });
+        });
 
     Box::new(resp)
 }
@@ -128,7 +190,8 @@ fn handle_request(req: Request<Body>) -> BoxFut {
 
                 // fixme: handle auth
 
-                return handle_api_request(api_method, parts, body);
+                //return handle_sync_api_request(api_method, parts, body);
+                return handle_async_api_request(api_method, parts, body);
 
             } else {
                 http_error_future!(NOT_FOUND, "Path not found.");
@@ -136,7 +199,7 @@ fn handle_request(req: Request<Body>) -> BoxFut {
         }
     }
 
-    Box::new(future::ok(Response::new(Body::from("RETURN WEB GUI\n"))))
+    Box::new(ok(Response::new(Body::from("RETURN WEB GUI\n"))))
 }
 
 lazy_static!{