From 8f554b7e5b6377ac2ca96e5c878ade899a419208 Mon Sep 17 00:00:00 2001 From: cahe Date: Sat, 15 Oct 2022 13:12:38 -0300 Subject: [PATCH] Implement shared DI container --- Cargo.toml | 2 +- src/app/robots.rs | 58 +++++++++++++++++++----------------------- src/http/mod.rs | 10 ++++---- src/http/robots.rs | 30 +++++++++++++++------- src/main.rs | 13 +++++++--- src/simple/database.rs | 22 +++++++++++++--- src/simple/mod.rs | 1 + src/simple/shared.rs | 32 +++++++++++++++++++++++ 8 files changed, 114 insertions(+), 54 deletions(-) create mode 100644 src/simple/shared.rs diff --git a/Cargo.toml b/Cargo.toml index 9cc2bff..e8d9d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,4 @@ warp = "0.2" tokio = { version = "0.2", features = ["macros"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } -sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio", "macros" ] } +sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio-rustls", "macros" ] } diff --git a/src/app/robots.rs b/src/app/robots.rs index 35a8cec..2de1aa2 100644 --- a/src/app/robots.rs +++ b/src/app/robots.rs @@ -7,12 +7,12 @@ use crate::data::robots::postgres; use crate::simple::HandlingError; -pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result { - let mut tx = pool - .begin() - .await - .map(|c| Box::new(c)) - .map_err(|e| HandlingError::DBError(e))?; +pub async fn add_robot( + shared: std::sync::Arc>, + r: GetRobot, +) -> Result { + let pool = shared.find::().unwrap(); + let mut tx = pool.begin().await?; let model = robots::Robot::new(0, r.name).try_into()?; let id = postgres::add(tx.as_mut(), model).await?; @@ -23,25 +23,22 @@ pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result Result { - let mut tx = pool - .begin() - .await - .map(|c| Box::new(c)) - .map_err(|e| HandlingError::DBError(e))?; +pub async fn get_robot( + shared: std::sync::Arc>, + id: i64, +) -> Result { + let pool = shared.find::().unwrap(); + let mut tx = pool.begin().await?; Ok(postgres::get_by_id(tx.as_mut(), id).await?.into()) } pub async fn get_all( - pool: PgSharedPool, + shared: std::sync::Arc>, offset: i64, limit: i64, ) -> Result<(Vec, bool), HandlingError> { - let mut tx = pool - .acquire() - .await - .map(|c| Box::new(c)) - .map_err(|e| HandlingError::DBError(e))?; + let pool = shared.find::().unwrap(); + let mut tx = pool.begin().await?; let all = postgres::Collection::new(&mut tx); let (items, next) = all.paginate(offset, limit).await?; @@ -52,34 +49,31 @@ pub async fn get_all( } pub async fn update_robot( - pool: PgSharedPool, + shared: std::sync::Arc>, id: i64, r: GetRobot, ) -> Result { - let mut tx = pool - .begin() - .await - .map(|c| Box::new(c)) - .map_err(|e| HandlingError::DBError(e))?; + let pool = shared.find::().unwrap(); + let mut tx = pool.begin().await?; let model = robots::Robot::new(id, r.name).try_into()?; postgres::update(tx.as_mut(), model).await?; let robot = postgres::get_by_id(tx.as_mut(), id).await?; - tx.commit().await.map_err(|e| HandlingError::DBError(e))?; + tx.commit().await.map_err(HandlingError::DBError)?; Ok(robot.into()) } -pub async fn delete_robot(pool: PgSharedPool, id: i64) -> Result<(), HandlingError> { - let mut tx = pool - .begin() - .await - .map(|c| Box::new(c)) - .map_err(|e| HandlingError::DBError(e))?; +pub async fn delete_robot( + shared: std::sync::Arc>, + id: i64, +) -> Result<(), HandlingError> { + let pool = shared.find::().unwrap(); + let mut tx = pool.begin().await?; postgres::delete(tx.as_mut(), id).await?; - tx.commit().await.map_err(|e| HandlingError::DBError(e))?; + tx.commit().await.map_err(HandlingError::DBError)?; Ok(()) } diff --git a/src/http/mod.rs b/src/http/mod.rs index 144e961..8d42eae 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -17,7 +17,7 @@ struct APIError { message: String, } -// attempts to build a structured response for a handling error +// attempts to build a structured response from a handling error // the response will have a JSON body containing an internal error code // and a detailed message about the error fn build_error_response(e: &HandlingError) -> Result { @@ -48,7 +48,7 @@ fn build_error_response(e: &HandlingError) -> Result Result { @@ -70,22 +70,22 @@ async fn handle_rejection( // here we can import the many filters that compose the services // we want exposed, setup route structure and plug middlewares pub async fn listen( - shared: crate::simple::database::PgSharedPool, addr: impl Into + 'static, + shared: std::sync::Arc>, ) { // build a base Filter for our API so that any request of any kind to "/" // will return "200 OK" with an empty response body let base = warp::any() // match a path termination "/" character or lack thereof .and(warp::path::end()) - // reply with a "200 OK" and an empty response body + // reply with "200 OK" and an empty response body .map(|| warp::reply()) // box the filter so it may be passed be passed around easily .boxed(); // begin building the API structure from the base Filter // right now the pool of database connections is the only - // shared resource we'll be needing right now + // shared resource we'll be using for now let api = APIBuilder::new(base, shared) // mount the "/robots" base prefix for creating and listing all robots in the database .route(robots::all) diff --git a/src/http/robots.rs b/src/http/robots.rs index 54cb544..4bd5d3e 100644 --- a/src/http/robots.rs +++ b/src/http/robots.rs @@ -23,18 +23,24 @@ struct Pagination { } // add a new robot entry -async fn create(pool: PgSharedPool, r: app::forms::GetRobot) -> Result { - reply_json(app::robots::add_robot(pool, r).await) +async fn create( + shared: std::sync::Arc>, + r: app::forms::GetRobot, +) -> Result { + reply_json(app::robots::add_robot(shared, r).await) } // retrieve information of a single robot -async fn get(pool: PgSharedPool, id: i64) -> Result { - reply_json(app::robots::get_robot(pool, id).await) +async fn get( + shared: std::sync::Arc>, + id: i64, +) -> Result { + reply_json(app::robots::get_robot(shared, id).await) } // updates robot information async fn update( - pool: PgSharedPool, + pool: std::sync::Arc>, id: i64, r: app::forms::GetRobot, ) -> Result { @@ -42,7 +48,10 @@ async fn update( } // returns a pagination of all robots -async fn get_all(pool: PgSharedPool, p: PaginationOptions) -> Result { +async fn get_all( + pool: std::sync::Arc>, + p: PaginationOptions, +) -> Result { reply_json( app::robots::get_all(pool, p.offset, p.limit) .await @@ -54,7 +63,10 @@ async fn get_all(pool: PgSharedPool, p: PaginationOptions) -> Result Result { +async fn delete( + pool: std::sync::Arc>, + id: i64, +) -> Result { match app::robots::delete_robot(pool, id).await { Ok(_) => Ok(warp::reply::with_status( warp::reply(), @@ -67,7 +79,7 @@ async fn delete(pool: PgSharedPool, id: i64) -> Result { // builds the route structure for "/robots" endpoint pub fn all( base: warp::filters::BoxedFilter<(impl Reply + 'static,)>, - pool: PgSharedPool, + pool: std::sync::Arc>, ) -> warp::filters::BoxedFilter<(impl Reply,)> { // create a prefix that injects a database connection for every request // every filter is added after the prefix so it receives a database connection @@ -101,7 +113,7 @@ pub fn all( // builds the route structure for the "/robot/{id}" endpoint pub fn with_id( base: warp::filters::BoxedFilter<(impl Reply + 'static,)>, - pool: PgSharedPool, + pool: std::sync::Arc>, ) -> warp::filters::BoxedFilter<(impl Reply,)> { // we create a prefix containing the id of a robot and a shared database connection // every filter is added after the prefix diff --git a/src/main.rs b/src/main.rs index 68ebd36..094cc3b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,8 +13,8 @@ async fn main() -> Result<(), sqlx::Error> { // build a new shared pool of postgres connections // by wrapping the pool object into an std::Arc // make it so it can be cloned in a shallow manner - // and shared between many threads - let db_pool = simple::database::make_shared( + // and shared between multiple threads + let db_pool = simple::database::PgSharedPool::new( sqlx::postgres::PgPoolOptions::new() // set a maximum of 5 concurrent active connections // when the limit is reached any additional contexts @@ -26,5 +26,12 @@ async fn main() -> Result<(), sqlx::Error> { .await?, ); - Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await) + let mut shared = crate::simple::shared::SharedContainer::new(); + shared.add(db_pool); + + Ok(http::listen( + ([127, 0, 0, 1], 8000), + std::sync::Arc::new(std::boxed::Box::new(shared)), + ) + .await) } diff --git a/src/simple/database.rs b/src/simple/database.rs index a1cfbc9..9929b3f 100644 --- a/src/simple/database.rs +++ b/src/simple/database.rs @@ -1,7 +1,21 @@ -use std::sync::Arc; +use crate::simple::HandlingError; -pub type PgSharedPool = Arc>; +// An application wide shared pool of postgres database connections +#[derive(Clone)] +pub struct PgSharedPool(sqlx::Pool); -pub fn make_shared(conn: sqlx::Pool) -> PgSharedPool { - Arc::new(conn) +impl PgSharedPool { + pub fn new(conn: sqlx::Pool) -> Self { + Self(conn) + } + + pub async fn begin<'a>( + &self, + ) -> Result>, HandlingError> { + self.0 + .begin() + .await + .map(Box::new) + .map_err(HandlingError::DBError) + } } diff --git a/src/simple/mod.rs b/src/simple/mod.rs index dfc7f6f..4bc0574 100644 --- a/src/simple/mod.rs +++ b/src/simple/mod.rs @@ -1,5 +1,6 @@ pub mod database; pub mod http; +pub mod shared; #[derive(Debug)] pub enum HandlingError { diff --git a/src/simple/shared.rs b/src/simple/shared.rs new file mode 100644 index 0000000..da10116 --- /dev/null +++ b/src/simple/shared.rs @@ -0,0 +1,32 @@ +use std::any; +use std::boxed::Box; +use std::vec::Vec; + +pub struct SharedContainer { + items: Vec>, +} + +impl SharedContainer { + pub fn find(&self) -> Option<&E> + where + E: any::Any + 'static, + { + for x in self.items.iter() { + if let Some(val) = x.downcast_ref::() { + return Some(val); + } + } + None + } + + pub fn add(&mut self, val: E) + where + E: any::Any + Send + Sync + 'static, + { + self.items.push(Box::new(val)) + } + + pub fn new() -> Self { + Self { items: Vec::new() } + } +}