commit 0bec2a0e8ca714ce9848a9b0202c66891ecb4713 Author: cahe Date: Sun Sep 20 13:24:12 2020 -0300 Base structure diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..53eaa21 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +**/*.rs.bk diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9cc2bff --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "simple" +version = "0.1.0" +authors = ["cahe"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +warp = "0.2" +tokio = { version = "0.2", features = ["macros"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } +sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio", "macros" ] } diff --git a/src/app/mod.rs b/src/app/mod.rs new file mode 100644 index 0000000..ee09d7f --- /dev/null +++ b/src/app/mod.rs @@ -0,0 +1 @@ +pub mod robots; diff --git a/src/app/robots.rs b/src/app/robots.rs new file mode 100644 index 0000000..156b354 --- /dev/null +++ b/src/app/robots.rs @@ -0,0 +1,119 @@ +use serde::{Serialize, Deserialize}; + +use crate::simple::database::PgSharedPool; + +use crate::base::robots; +use crate::data::postgres; + +use crate::simple::handling::HandlingError; + +#[derive(Serialize, Deserialize)] +pub struct GetRobot { + #[serde(skip_deserializing)] + pub id : i64, + pub name : String +} + +type Model = GetRobot; + +impl std::convert::From for GetRobot { + fn from(r : robots::Robot) -> GetRobot { + GetRobot{ + id: r.id, + name: r.name + } + } +} + +impl std::convert::Into for (i64, GetRobot) { + fn into(self) -> robots::Robot { + robots::Robot{ + id: self.0, + name: self.1.name + } + } +} + +pub async fn add_robot(pool : PgSharedPool, r : GetRobot) -> Result { + let mut tx = pool.begin() + .await + .map(|c| Box::new(c)) + .map_err(|e| HandlingError::DBError(e))?; + + let mut repo = postgres::RobotsImpl::new(&mut tx); + let id = robots::add(&mut repo, robots::Robot{id: 0, name: r.name}).await?; + let robot = repo.get_by_id(id).await?; + + tx + .commit() + .await + .map_err(|e| HandlingError::DBError(e))?; + + Ok(robot.into()) +} + +pub async fn get_robot(pool : PgSharedPool, id : i64) -> Result { + let mut tx = pool.begin() + .await + .map(|c| Box::new(c)) + .map_err(|e| HandlingError::DBError(e))?; + + let mut repo = postgres::RobotsImpl::new(&mut tx); + Ok(repo.get_by_id(id).await?.into()) +} + +pub async fn get_all( + pool : PgSharedPool, offset : i64, limit : i64 +) -> Result<(Vec, bool), HandlingError> { + let mut tx = pool.begin() + .await + .map(|c| Box::new(c)) + .map_err(|e| HandlingError::DBError(e))?; + + let mut repo = postgres::ModelImpl::new(&mut tx); + let (items, next) = repo.get_all()? + .paginate(offset, limit) + .await?; + + let data = items + .into_iter() + .map(|r| r.into()) + .collect(); + + Ok((data, next)) +} + +pub async fn update_robot(pool : PgSharedPool, id : i64, r : GetRobot) -> Result { + let mut tx = pool.begin() + .await + .map(|c| Box::new(c)) + .map_err(|e| HandlingError::DBError(e))?; + + let mut repo = postgres::RobotsImpl::new(&mut tx); + let _ : () = robots::update(&mut repo, (id, r).into()).await?; + let robot = repo.get_by_id(id).await?; + + tx + .commit() + .await + .map_err(|e| HandlingError::DBError(e))?; + + Ok(robot.into()) +} + +pub async fn delete_robot(pool : PgSharedPool, id : i64) -> Result<(), HandlingError> { + let mut tx = pool.begin() + .await + .map(|c| Box::new(c)) + .map_err(|e| HandlingError::DBError(e))?; + + let mut repo = postgres::ModelImpl::new(&mut tx); + let _ : () = robots::delete(&mut repo, id).await?; + + tx + .commit() + .await + .map_err(|e| HandlingError::DBError(e))?; + + Ok(()) +} diff --git a/src/base/mod.rs b/src/base/mod.rs new file mode 100644 index 0000000..ee09d7f --- /dev/null +++ b/src/base/mod.rs @@ -0,0 +1 @@ +pub mod robots; diff --git a/src/base/robots.rs b/src/base/robots.rs new file mode 100644 index 0000000..124135e --- /dev/null +++ b/src/base/robots.rs @@ -0,0 +1,28 @@ +use std::future::Future; +use std::boxed::Box; +use std::pin::Pin; + +use crate::simple::handling::HandlingError; + +pub struct Robot { + pub id : i64, + pub name : String +} + +pub trait DefaultRepository { + fn add(&mut self, r : Robot) -> Pin> + Send + '_>>; + fn update(&mut self, r : Robot) -> Pin> + Send + '_>>; + fn delete(&mut self, id : i64) -> Pin> + Send + '_>>; +} + +pub async fn add(repo : &mut impl DefaultRepository, r : Robot) -> Result { + repo.add(r).await +} + +pub async fn update(repo : &mut impl DefaultRepository, r : Robot) -> Result<(), HandlingError> { + repo.update(r).await +} + +pub async fn delete(repo : &mut impl DefaultRepository, id : i64) -> Result<(), HandlingError> { + repo.delete(id).await +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 0000000..26e9103 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1 @@ +pub mod postgres; diff --git a/src/data/postgres.rs b/src/data/postgres.rs new file mode 100644 index 0000000..e9b4feb --- /dev/null +++ b/src/data/postgres.rs @@ -0,0 +1,172 @@ +use std::boxed::Box; +use std::pin::Pin; + +use crate::simple::handling::{HandlingError}; +use crate::base::robots; + +use std::future::Future; + +use sqlx::FromRow; +use sqlx::Done; + +pub struct Collection<'a> { + tx : &'a mut Box>, + q : sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>, + q_id : Option +} + +impl Collection<'_> { + pub fn by_id(self, id : i64) -> Self { + Self { + tx: self.tx, + q: self.q, + q_id: Some(id) + } + } + + pub async fn fetch(self, offset : i64, limit : i64) -> Result, HandlingError> { + let result = self.q + .bind(self.q_id.is_some()) + .bind(self.q_id.unwrap_or_default()) + .bind(offset) + .bind(limit) + .fetch_all(self.tx.as_mut()) + .await; + match result { + Ok(v) => v + .into_iter() + .map(|d| { + let (id, name) = <(i64, String)>::from_row(&d)?; + Ok(robots::Robot{ + id: id, + name: name + }) + }) + .collect(), + Err(e) => Err(HandlingError::DBError(e)) + } + } + + pub async fn paginate(self, offset : i64, limit : i64) -> Result<(Vec, bool), HandlingError> { + let mut items = self.fetch(offset, limit+1).await?; + if items.len() > (limit as usize) { + let _ = items.pop(); + Ok((items, true)) + } else { + Ok((items, false)) + } + } +} + +pub struct RobotsImpl<'a> { + pub tx : &'a mut Box> +} + +pub type ModelImpl<'a> = RobotsImpl<'a>; + +impl<'a> RobotsImpl<'_> { + pub fn new<'b>(tx : &'b mut Box>) -> RobotsImpl<'b> { + RobotsImpl{tx: tx} + } + + async fn add(&mut self, r : robots::Robot) -> Result { + let result = sqlx::query( + "INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id") + .bind(r.name) + .fetch_one(self.tx.as_mut()) + .await; + match result { + Ok(v) => { + let (id,) = <(i64,)>::from_row(&v)?; + Ok(id) + }, + Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), + Err(e) => Err(HandlingError::DBError(e)) + } + } + + pub async fn get_by_id(&mut self, id : i64) -> Result { + let result = sqlx::query( + "SELECT * FROM robots WHERE id = $1::INTEGER") + .bind(id) + .fetch_one(self.tx.as_mut()) + .await; + match result { + Ok(v) => { + let (id, name) = <(i64, String)>::from_row(&v)?; + Ok(robots::Robot{ + id: id, + name: name + }) + }, + Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), + Err(e) => Err(HandlingError::DBError(e)) + } + } + + pub fn get_all(&'a mut self) -> Result, HandlingError> { + let all = sqlx::query::( + "SELECT id, name + FROM robots + WHERE + (CASE WHEN $1::BOOLEAN THEN (id = $2::INTEGER) ELSE TRUE END) + OFFSET $3 + LIMIT $4" + ); + Ok(Collection{ + tx: self.tx, + q: all, + q_id: None + }) + } + + async fn update(&mut self, r : robots::Robot) -> Result<(), HandlingError> { + let result = sqlx::query( + "UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER") + .bind(r.name) + .bind(r.id) + .execute(self.tx.as_mut()) + .await; + match result { + Ok(v) => match v.rows_affected() { + 1 => Ok(()), + 0 => Err(HandlingError::NotFound), + _ => Err(HandlingError::InternalError) + }, + Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), + Err(e) => Err(HandlingError::DBError(e)) + } + } + + + + async fn delete(&mut self, id : i64) -> Result<(), HandlingError> { + let result = sqlx::query( + "DELETE FROM robots WHERE id = $1::INTEGER") + .bind(id) + .execute(self.tx.as_mut()) + .await; + match result { + Ok(v) => match v.rows_affected() { + 1 => Ok(()), + 0 => Err(HandlingError::NotFound), + _ => Err(HandlingError::InternalError) + }, + Err(e) => Err(HandlingError::DBError(e)) + } + } +} + +impl robots::DefaultRepository for RobotsImpl<'_> { + fn add(&mut self, r : robots::Robot) -> Pin> + Send + '_>> { + Box::pin(self.add(r)) + } + + fn update(&mut self, r : robots::Robot) -> Pin> + Send + '_>> { + Box::pin(self.update(r)) + } + + fn delete(&mut self, id : i64) -> Pin> + Send + '_>> { + Box::pin(self.delete(id)) + } +} diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..c0cc3b5 --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,86 @@ +pub mod robots; + +use serde::Serialize; +use warp::Filter; +use warp::Reply; + +use warp::http::StatusCode; + +use crate::simple::handling::HandlingError; +use crate::simple::http::APIBuilder; + +impl warp::reject::Reject for HandlingError {} + +#[derive(Serialize)] +struct APIError { + code: i64, + message: String, +} + +fn build_error_response(e: &HandlingError) -> Result { + let (status, code, message) = match e { + HandlingError::DBError(_e) => ( + StatusCode::INTERNAL_SERVER_ERROR, 1, format!("Database access error") + ), + HandlingError::NotFound => ( + StatusCode::NOT_FOUND, 2, format!("Not Found") + ), + HandlingError::InternalError => ( + StatusCode::INTERNAL_SERVER_ERROR, 3, format!("Internal unexpected error") + ), + }; + + let body = warp::reply::json(&APIError { + code: code, + message: message, + }); + + Ok(warp::reply::with_status(body, status).into_response()) +} + +async fn handle_rejection( + r: warp::reject::Rejection, +) -> Result { + eprintln!("{:#?}", r); + r.find::() + .map(|e| { + build_error_response(e) + .unwrap_or( + StatusCode::INTERNAL_SERVER_ERROR + .into_response() + ) + }) + .ok_or(r) +} + +pub async fn listen( + db_pool: crate::simple::database::PgSharedPool, + addr: impl Into + 'static, +) { + // build a base Filter for our API so that any request of any kind to "/" + // will return "200 OK" with an empty response body + let base = warp::any() + // match a path termination by a "/" character or lack thereof + .and(warp::path::end()) + // reply with a "200 OK" with an empty response body + .map(|| warp::reply()) + // box the filter so it may be passed be passed around easily + .boxed(); + + // begin building the API structure from the base Filter + // right now the pool of database connections is the only + // shared resource that we want to passa around when handling requests + let api = APIBuilder::new(base, db_pool) + // mount the "/robots" base prefix for creating + // and listing all robots in the database + .route(robots::all) + // mount the "/robot/{id}" prefix for retrieving and updating robots information + .route(robots::with_id) + // build the final filter that we'll be served + .build() + // attach a recover to handle rejections + .recover(handle_rejection); + + // run the server returning a Future that will be run indefinitely + warp::serve(api).run(addr).await +} diff --git a/src/http/robots.rs b/src/http/robots.rs new file mode 100644 index 0000000..ee1d353 --- /dev/null +++ b/src/http/robots.rs @@ -0,0 +1,111 @@ +use warp::Filter; + +use crate::simple::database::PgSharedPool; +use crate::simple::http::reply_json; + +use serde::{Deserialize, Serialize}; + +use warp::http::StatusCode; + +use crate::app; + +#[derive(Deserialize)] +struct PaginationOptions { + pub offset: i64, + pub limit: i64, +} + +#[derive(Serialize)] +struct Pagination { + pub data: Vec, + pub next: bool, +} + +async fn create( + pool: PgSharedPool, + r: app::robots::GetRobot, +) -> Result { + reply_json(app::robots::add_robot(pool, r).await) +} + +async fn get( + pool: PgSharedPool, + id: i64 +) -> Result { + reply_json(app::robots::get_robot(pool, id).await) +} + +async fn update( + pool: PgSharedPool, + id: i64, + r: app::robots::GetRobot, +) -> Result { + reply_json(app::robots::update_robot(pool, id, r).await) +} + +async fn get_all( + pool: PgSharedPool, + p: PaginationOptions, +) -> Result { + reply_json( + app::robots::get_all(pool, p.offset, p.limit) + .await + .map(|(r, next)| Pagination { + data: r, + next: next, + }), + ) +} + +async fn delete( + pool : PgSharedPool, + id : i64 +) -> Result { + match app::robots::delete_robot(pool, id).await { + Ok(_) => Ok(warp::reply::with_status(warp::reply(), StatusCode::NO_CONTENT)), + Err(e) => Err(warp::reject::custom(e)) + } +} + +pub fn all( + base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>, + pool: PgSharedPool +) -> warp::filters::BoxedFilter<(impl warp::Reply,)> { + let prefix = warp::path!("robots").map(move || pool.clone()); + + base.or(prefix + .clone() + .and(warp::filters::method::post()) + .and(warp::body::json()) + .and_then(create)) + .or(prefix + .clone() + .and(warp::filters::method::get()) + .and(warp::filters::query::query()) + .and_then(get_all)) + .boxed() +} + +pub fn with_id( + base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>, + pool: PgSharedPool, +) -> warp::filters::BoxedFilter<(impl warp::Reply,)> { + let prefix = warp::path!("robot" / i64) + .map(move |id| (pool.clone(), id)) + .untuple_one(); + + base.or(prefix + .clone() + .and(warp::filters::method::get()) + .and_then(get)) + .or(prefix + .clone() + .and(warp::filters::method::put()) + .and(warp::body::json()) + .and_then(update)) + .or(prefix + .clone() + .and(warp::filters::method::delete()) + .and_then(delete)) + .boxed() +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..adb8b1a --- /dev/null +++ b/src/main.rs @@ -0,0 +1,12 @@ +mod simple; + +mod http; +mod app; +mod base; +mod data; + +#[tokio::main] +async fn main() -> Result<(), sqlx::Error> { + let db_pool = simple::database::connect_pool().await?; + Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await) +} diff --git a/src/simple/database.rs b/src/simple/database.rs new file mode 100644 index 0000000..93d2b8d --- /dev/null +++ b/src/simple/database.rs @@ -0,0 +1,12 @@ +pub type PgSharedPool = std::sync::Arc>; + +pub async fn connect_pool() -> Result { + Ok( + std::sync::Arc::new( + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect("postgresql://postgres:chapuleta@127.0.0.1:5432") + .await? + ) + ) +} diff --git a/src/simple/handling.rs b/src/simple/handling.rs new file mode 100644 index 0000000..681679e --- /dev/null +++ b/src/simple/handling.rs @@ -0,0 +1,12 @@ +#[derive(Debug)] +pub enum HandlingError { + DBError(sqlx::Error), + NotFound, + InternalError +} + +impl From for HandlingError { + fn from(e : sqlx::Error) -> HandlingError { + HandlingError::DBError(e) + } +} diff --git a/src/simple/http.rs b/src/simple/http.rs new file mode 100644 index 0000000..3c08655 --- /dev/null +++ b/src/simple/http.rs @@ -0,0 +1,44 @@ +use warp::Filter; + +pub struct APIBuilder +where + T: Filter, + S: Clone, +{ + routes: T, + shared: S, +} + +impl APIBuilder +where + T: Filter, + S: Clone, +{ + pub fn new(base: T, shared: S) -> Self { + APIBuilder { + shared: shared, + routes: base, + } + } + + pub fn route(self, build: F) -> APIBuilder + where + U: Filter, + F: FnOnce(T, S) -> U, + { + APIBuilder::new(build(self.routes, self.shared.clone()), self.shared) + } + + pub fn build(self) -> T { + self.routes + } +} + +pub fn reply_json(r: Result) -> Result +where + T: serde::Serialize, + E: warp::reject::Reject, +{ + r.map(|s| warp::reply::json(&s)) + .map_err(warp::reject::custom) +} diff --git a/src/simple/init.rs b/src/simple/init.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/simple/mod.rs b/src/simple/mod.rs new file mode 100644 index 0000000..6f97325 --- /dev/null +++ b/src/simple/mod.rs @@ -0,0 +1,5 @@ +pub mod database; +pub mod handling; +pub mod init; +pub mod http; +