diff --git a/src/app/robots.rs b/src/app/robots.rs index 156b354..ec513ff 100644 --- a/src/app/robots.rs +++ b/src/app/robots.rs @@ -1,59 +1,65 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use crate::simple::database::PgSharedPool; use crate::base::robots; use crate::data::postgres; -use crate::simple::handling::HandlingError; +use crate::simple::HandlingError; #[derive(Serialize, Deserialize)] pub struct GetRobot { #[serde(skip_deserializing)] - pub id : i64, - pub name : String + pub id: i64, + pub name: String, } type Model = GetRobot; impl std::convert::From for GetRobot { - fn from(r : robots::Robot) -> GetRobot { - GetRobot{ + fn from(r: robots::Robot) -> GetRobot { + GetRobot { id: r.id, - name: r.name + name: r.name, } } } impl std::convert::Into for (i64, GetRobot) { fn into(self) -> robots::Robot { - robots::Robot{ + robots::Robot { id: self.0, - name: self.1.name + name: self.1.name, } } } -pub async fn add_robot(pool : PgSharedPool, r : GetRobot) -> Result { - let mut tx = pool.begin() +pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result { + let mut tx = pool + .begin() .await .map(|c| Box::new(c)) .map_err(|e| HandlingError::DBError(e))?; let mut repo = postgres::RobotsImpl::new(&mut tx); - let id = robots::add(&mut repo, robots::Robot{id: 0, name: r.name}).await?; + let id = robots::add( + &mut repo, + robots::Robot { + id: 0, + name: r.name, + }, + ) + .await?; let robot = repo.get_by_id(id).await?; - tx - .commit() - .await - .map_err(|e| HandlingError::DBError(e))?; + tx.commit().await.map_err(|e| HandlingError::DBError(e))?; Ok(robot.into()) } -pub async fn get_robot(pool : PgSharedPool, id : i64) -> Result { - let mut tx = pool.begin() +pub async fn get_robot(pool: PgSharedPool, id: i64) -> Result { + let mut tx = pool + .begin() .await .map(|c| Box::new(c)) .map_err(|e| HandlingError::DBError(e))?; @@ -63,57 +69,55 @@ pub async fn get_robot(pool : PgSharedPool, id : i64) -> Result Result<(Vec, bool), HandlingError> { - let mut tx = pool.begin() + let mut tx = pool + .begin() .await .map(|c| Box::new(c)) .map_err(|e| HandlingError::DBError(e))?; let mut repo = postgres::ModelImpl::new(&mut tx); - let (items, next) = repo.get_all()? - .paginate(offset, limit) - .await?; + let (items, next) = repo.get_all()?.paginate(offset, limit).await?; - let data = items - .into_iter() - .map(|r| r.into()) - .collect(); + let data = items.into_iter().map(|r| r.into()).collect(); Ok((data, next)) } -pub async fn update_robot(pool : PgSharedPool, id : i64, r : GetRobot) -> Result { - let mut tx = pool.begin() +pub async fn update_robot( + pool: PgSharedPool, + id: i64, + r: GetRobot, +) -> Result { + let mut tx = pool + .begin() .await .map(|c| Box::new(c)) .map_err(|e| HandlingError::DBError(e))?; let mut repo = postgres::RobotsImpl::new(&mut tx); - let _ : () = robots::update(&mut repo, (id, r).into()).await?; + let _: () = robots::update(&mut repo, (id, r).into()).await?; let robot = repo.get_by_id(id).await?; - tx - .commit() - .await - .map_err(|e| HandlingError::DBError(e))?; + tx.commit().await.map_err(|e| HandlingError::DBError(e))?; Ok(robot.into()) } -pub async fn delete_robot(pool : PgSharedPool, id : i64) -> Result<(), HandlingError> { - let mut tx = pool.begin() +pub async fn delete_robot(pool: PgSharedPool, id: i64) -> Result<(), HandlingError> { + let mut tx = pool + .begin() .await .map(|c| Box::new(c)) .map_err(|e| HandlingError::DBError(e))?; let mut repo = postgres::ModelImpl::new(&mut tx); - let _ : () = robots::delete(&mut repo, id).await?; + let _: () = robots::delete(&mut repo, id).await?; - tx - .commit() - .await - .map_err(|e| HandlingError::DBError(e))?; + tx.commit().await.map_err(|e| HandlingError::DBError(e))?; Ok(()) } diff --git a/src/base/robots.rs b/src/base/robots.rs index 124135e..02a8831 100644 --- a/src/base/robots.rs +++ b/src/base/robots.rs @@ -1,28 +1,24 @@ -use std::future::Future; -use std::boxed::Box; -use std::pin::Pin; - -use crate::simple::handling::HandlingError; +use crate::simple::{HandlingError, TraitFuture}; pub struct Robot { - pub id : i64, - pub name : String + pub id: i64, + pub name: String, } pub trait DefaultRepository { - fn add(&mut self, r : Robot) -> Pin> + Send + '_>>; - fn update(&mut self, r : Robot) -> Pin> + Send + '_>>; - fn delete(&mut self, id : i64) -> Pin> + Send + '_>>; + fn add(&mut self, r: Robot) -> TraitFuture>; + fn update(&mut self, r: Robot) -> TraitFuture>; + fn delete(&mut self, id: i64) -> TraitFuture>; } -pub async fn add(repo : &mut impl DefaultRepository, r : Robot) -> Result { +pub async fn add(repo: &mut impl DefaultRepository, r: Robot) -> Result { repo.add(r).await } -pub async fn update(repo : &mut impl DefaultRepository, r : Robot) -> Result<(), HandlingError> { +pub async fn update(repo: &mut impl DefaultRepository, r: Robot) -> Result<(), HandlingError> { repo.update(r).await } -pub async fn delete(repo : &mut impl DefaultRepository, id : i64) -> Result<(), HandlingError> { +pub async fn delete(repo: &mut impl DefaultRepository, id: i64) -> Result<(), HandlingError> { repo.delete(id).await } diff --git a/src/data/postgres.rs b/src/data/postgres.rs index e9b4feb..220fa26 100644 --- a/src/data/postgres.rs +++ b/src/data/postgres.rs @@ -1,31 +1,30 @@ +use crate::simple::TraitFuture; use std::boxed::Box; -use std::pin::Pin; -use crate::simple::handling::{HandlingError}; use crate::base::robots; +use crate::simple::HandlingError; -use std::future::Future; - -use sqlx::FromRow; use sqlx::Done; +use sqlx::FromRow; pub struct Collection<'a> { - tx : &'a mut Box>, - q : sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>, - q_id : Option + tx: &'a mut Box>, + q: sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>, + q_id: Option, } impl Collection<'_> { - pub fn by_id(self, id : i64) -> Self { + pub fn by_id(self, id: i64) -> Self { Self { tx: self.tx, q: self.q, - q_id: Some(id) + q_id: Some(id), } } - pub async fn fetch(self, offset : i64, limit : i64) -> Result, HandlingError> { - let result = self.q + pub async fn fetch(self, offset: i64, limit: i64) -> Result, HandlingError> { + let result = self + .q .bind(self.q_id.is_some()) .bind(self.q_id.unwrap_or_default()) .bind(offset) @@ -37,18 +36,19 @@ impl Collection<'_> { .into_iter() .map(|d| { let (id, name) = <(i64, String)>::from_row(&d)?; - Ok(robots::Robot{ - id: id, - name: name - }) + Ok(robots::Robot { id: id, name: name }) }) .collect(), - Err(e) => Err(HandlingError::DBError(e)) + Err(e) => Err(HandlingError::DBError(e)), } } - pub async fn paginate(self, offset : i64, limit : i64) -> Result<(Vec, bool), HandlingError> { - let mut items = self.fetch(offset, limit+1).await?; + pub async fn paginate( + self, + offset: i64, + limit: i64, + ) -> Result<(Vec, bool), HandlingError> { + let mut items = self.fetch(offset, limit + 1).await?; if items.len() > (limit as usize) { let _ = items.pop(); Ok((items, true)) @@ -59,19 +59,18 @@ impl Collection<'_> { } pub struct RobotsImpl<'a> { - pub tx : &'a mut Box> + pub tx: &'a mut Box>, } pub type ModelImpl<'a> = RobotsImpl<'a>; impl<'a> RobotsImpl<'_> { - pub fn new<'b>(tx : &'b mut Box>) -> RobotsImpl<'b> { - RobotsImpl{tx: tx} + pub fn new<'b>(tx: &'b mut Box>) -> RobotsImpl<'b> { + RobotsImpl { tx: tx } } - async fn add(&mut self, r : robots::Robot) -> Result { - let result = sqlx::query( - "INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id") + async fn add(&mut self, r: robots::Robot) -> Result { + let result = sqlx::query("INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id") .bind(r.name) .fetch_one(self.tx.as_mut()) .await; @@ -79,28 +78,24 @@ impl<'a> RobotsImpl<'_> { Ok(v) => { let (id,) = <(i64,)>::from_row(&v)?; Ok(id) - }, + } Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), - Err(e) => Err(HandlingError::DBError(e)) + Err(e) => Err(HandlingError::DBError(e)), } } - pub async fn get_by_id(&mut self, id : i64) -> Result { - let result = sqlx::query( - "SELECT * FROM robots WHERE id = $1::INTEGER") + pub async fn get_by_id(&mut self, id: i64) -> Result { + let result = sqlx::query("SELECT * FROM robots WHERE id = $1::INTEGER") .bind(id) .fetch_one(self.tx.as_mut()) .await; match result { Ok(v) => { let (id, name) = <(i64, String)>::from_row(&v)?; - Ok(robots::Robot{ - id: id, - name: name - }) - }, + Ok(robots::Robot { id: id, name: name }) + } Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), - Err(e) => Err(HandlingError::DBError(e)) + Err(e) => Err(HandlingError::DBError(e)), } } @@ -111,18 +106,17 @@ impl<'a> RobotsImpl<'_> { WHERE (CASE WHEN $1::BOOLEAN THEN (id = $2::INTEGER) ELSE TRUE END) OFFSET $3 - LIMIT $4" + LIMIT $4", ); - Ok(Collection{ + Ok(Collection { tx: self.tx, q: all, - q_id: None + q_id: None, }) } - async fn update(&mut self, r : robots::Robot) -> Result<(), HandlingError> { - let result = sqlx::query( - "UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER") + async fn update(&mut self, r: robots::Robot) -> Result<(), HandlingError> { + let result = sqlx::query("UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER") .bind(r.name) .bind(r.id) .execute(self.tx.as_mut()) @@ -131,18 +125,15 @@ impl<'a> RobotsImpl<'_> { Ok(v) => match v.rows_affected() { 1 => Ok(()), 0 => Err(HandlingError::NotFound), - _ => Err(HandlingError::InternalError) + _ => Err(HandlingError::InternalError), }, Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound), - Err(e) => Err(HandlingError::DBError(e)) + Err(e) => Err(HandlingError::DBError(e)), } } - - - async fn delete(&mut self, id : i64) -> Result<(), HandlingError> { - let result = sqlx::query( - "DELETE FROM robots WHERE id = $1::INTEGER") + async fn delete(&mut self, id: i64) -> Result<(), HandlingError> { + let result = sqlx::query("DELETE FROM robots WHERE id = $1::INTEGER") .bind(id) .execute(self.tx.as_mut()) .await; @@ -150,23 +141,23 @@ impl<'a> RobotsImpl<'_> { Ok(v) => match v.rows_affected() { 1 => Ok(()), 0 => Err(HandlingError::NotFound), - _ => Err(HandlingError::InternalError) + _ => Err(HandlingError::InternalError), }, - Err(e) => Err(HandlingError::DBError(e)) + Err(e) => Err(HandlingError::DBError(e)), } } } impl robots::DefaultRepository for RobotsImpl<'_> { - fn add(&mut self, r : robots::Robot) -> Pin> + Send + '_>> { + fn add(&mut self, r: robots::Robot) -> TraitFuture> { Box::pin(self.add(r)) } - fn update(&mut self, r : robots::Robot) -> Pin> + Send + '_>> { + fn update(&mut self, r: robots::Robot) -> TraitFuture> { Box::pin(self.update(r)) } - fn delete(&mut self, id : i64) -> Pin> + Send + '_>> { + fn delete(&mut self, id: i64) -> TraitFuture> { Box::pin(self.delete(id)) } } diff --git a/src/http/mod.rs b/src/http/mod.rs index c0cc3b5..144e961 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -6,8 +6,8 @@ use warp::Reply; use warp::http::StatusCode; -use crate::simple::handling::HandlingError; use crate::simple::http::APIBuilder; +use crate::simple::HandlingError; impl warp::reject::Reject for HandlingError {} @@ -17,16 +17,22 @@ struct APIError { message: String, } +// attempts to build a structured response for a handling error +// the response will have a JSON body containing an internal error code +// and a detailed message about the error fn build_error_response(e: &HandlingError) -> Result { let (status, code, message) = match e { + // a database access error will be treated as an internal error HandlingError::DBError(_e) => ( - StatusCode::INTERNAL_SERVER_ERROR, 1, format!("Database access error") - ), - HandlingError::NotFound => ( - StatusCode::NOT_FOUND, 2, format!("Not Found") + StatusCode::INTERNAL_SERVER_ERROR, + 1, + format!("Database access error"), ), + HandlingError::NotFound => (StatusCode::NOT_FOUND, 2, format!("Not Found")), HandlingError::InternalError => ( - StatusCode::INTERNAL_SERVER_ERROR, 3, format!("Internal unexpected error") + StatusCode::INTERNAL_SERVER_ERROR, + 3, + format!("Internal unexpected error"), ), }; @@ -38,41 +44,50 @@ fn build_error_response(e: &HandlingError) -> Result Result { eprintln!("{:#?}", r); + // right now we only handle our own handling error type + // errors returned by warp filters will be returned as they are + // their response and status code will remain unchanged r.find::() .map(|e| { build_error_response(e) - .unwrap_or( - StatusCode::INTERNAL_SERVER_ERROR - .into_response() - ) + // any failed attempt to handle the error will create a "500 Internal server error" + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR.into_response()) }) + // if we fail to identify a known handling error we return the rejection unchanged .ok_or(r) } +// will create the server for our HTTP API as a warp server +// here we can import the many filters that compose the services +// we want exposed, setup route structure and plug middlewares pub async fn listen( - db_pool: crate::simple::database::PgSharedPool, + shared: crate::simple::database::PgSharedPool, addr: impl Into + 'static, ) { // build a base Filter for our API so that any request of any kind to "/" // will return "200 OK" with an empty response body let base = warp::any() - // match a path termination by a "/" character or lack thereof + // match a path termination "/" character or lack thereof .and(warp::path::end()) - // reply with a "200 OK" with an empty response body + // reply with a "200 OK" and an empty response body .map(|| warp::reply()) // box the filter so it may be passed be passed around easily .boxed(); // begin building the API structure from the base Filter // right now the pool of database connections is the only - // shared resource that we want to passa around when handling requests - let api = APIBuilder::new(base, db_pool) - // mount the "/robots" base prefix for creating - // and listing all robots in the database + // shared resource we'll be needing right now + let api = APIBuilder::new(base, shared) + // mount the "/robots" base prefix for creating and listing all robots in the database .route(robots::all) // mount the "/robot/{id}" prefix for retrieving and updating robots information .route(robots::with_id) @@ -81,6 +96,6 @@ pub async fn listen( // attach a recover to handle rejections .recover(handle_rejection); - // run the server returning a Future that will be run indefinitely + // run the server returning a Future that never resolves warp::serve(api).run(addr).await } diff --git a/src/http/robots.rs b/src/http/robots.rs index ee1d353..1bf7ae0 100644 --- a/src/http/robots.rs +++ b/src/http/robots.rs @@ -28,10 +28,7 @@ async fn create( reply_json(app::robots::add_robot(pool, r).await) } -async fn get( - pool: PgSharedPool, - id: i64 -) -> Result { +async fn get(pool: PgSharedPool, id: i64) -> Result { reply_json(app::robots::get_robot(pool, id).await) } @@ -57,19 +54,19 @@ async fn get_all( ) } -async fn delete( - pool : PgSharedPool, - id : i64 -) -> Result { +async fn delete(pool: PgSharedPool, id: i64) -> Result { match app::robots::delete_robot(pool, id).await { - Ok(_) => Ok(warp::reply::with_status(warp::reply(), StatusCode::NO_CONTENT)), - Err(e) => Err(warp::reject::custom(e)) + Ok(_) => Ok(warp::reply::with_status( + warp::reply(), + StatusCode::NO_CONTENT, + )), + Err(e) => Err(warp::reject::custom(e)), } } pub fn all( base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>, - pool: PgSharedPool + pool: PgSharedPool, ) -> warp::filters::BoxedFilter<(impl warp::Reply,)> { let prefix = warp::path!("robots").map(move || pool.clone()); diff --git a/src/main.rs b/src/main.rs index adb8b1a..68ebd36 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,30 @@ mod simple; -mod http; mod app; mod base; mod data; +mod http; #[tokio::main] async fn main() -> Result<(), sqlx::Error> { - let db_pool = simple::database::connect_pool().await?; + // fetch database configuration from environment variables + let db_address = std::env::var("POSTGRES_ADDRESS").expect("No database address specified"); + + // build a new shared pool of postgres connections + // by wrapping the pool object into an std::Arc + // make it so it can be cloned in a shallow manner + // and shared between many threads + let db_pool = simple::database::make_shared( + sqlx::postgres::PgPoolOptions::new() + // set a maximum of 5 concurrent active connections + // when the limit is reached any additional contexts + // will block until a connection is available + .max_connections(5) + // the address is expected to be a postgres connection string + // in the format postgres://user:password@address:port/database + .connect(db_address.as_str()) + .await?, + ); + Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await) } diff --git a/src/simple/database.rs b/src/simple/database.rs index 93d2b8d..a1cfbc9 100644 --- a/src/simple/database.rs +++ b/src/simple/database.rs @@ -1,12 +1,7 @@ -pub type PgSharedPool = std::sync::Arc>; +use std::sync::Arc; -pub async fn connect_pool() -> Result { - Ok( - std::sync::Arc::new( - sqlx::postgres::PgPoolOptions::new() - .max_connections(5) - .connect("postgresql://postgres:chapuleta@127.0.0.1:5432") - .await? - ) - ) +pub type PgSharedPool = Arc>; + +pub fn make_shared(conn: sqlx::Pool) -> PgSharedPool { + Arc::new(conn) } diff --git a/src/simple/handling.rs b/src/simple/handling.rs deleted file mode 100644 index 681679e..0000000 --- a/src/simple/handling.rs +++ /dev/null @@ -1,12 +0,0 @@ -#[derive(Debug)] -pub enum HandlingError { - DBError(sqlx::Error), - NotFound, - InternalError -} - -impl From for HandlingError { - fn from(e : sqlx::Error) -> HandlingError { - HandlingError::DBError(e) - } -} diff --git a/src/simple/init.rs b/src/simple/init.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/simple/mod.rs b/src/simple/mod.rs index 6f97325..141a61d 100644 --- a/src/simple/mod.rs +++ b/src/simple/mod.rs @@ -1,5 +1,20 @@ +use std::future::Future; +use std::pin::Pin; + pub mod database; -pub mod handling; -pub mod init; pub mod http; +pub type TraitFuture<'a, T> = Pin + Send + 'a>>; + +#[derive(Debug)] +pub enum HandlingError { + DBError(sqlx::Error), + NotFound, + InternalError, +} + +impl From for HandlingError { + fn from(e: sqlx::Error) -> HandlingError { + HandlingError::DBError(e) + } +}