Implement shared DI container

develop
cahe 3 years ago
parent 993821ab77
commit 8f554b7e5b

@ -11,4 +11,4 @@ warp = "0.2"
tokio = { version = "0.2", features = ["macros"] } tokio = { version = "0.2", features = ["macros"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" } serde_json = { version = "1.0" }
sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio", "macros" ] } sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio-rustls", "macros" ] }

@ -7,12 +7,12 @@ use crate::data::robots::postgres;
use crate::simple::HandlingError; use crate::simple::HandlingError;
pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result<GetRobot, HandlingError> { pub async fn add_robot(
let mut tx = pool shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
.begin() r: GetRobot,
.await ) -> Result<GetRobot, HandlingError> {
.map(|c| Box::new(c)) let pool = shared.find::<PgSharedPool>().unwrap();
.map_err(|e| HandlingError::DBError(e))?; let mut tx = pool.begin().await?;
let model = robots::Robot::new(0, r.name).try_into()?; let model = robots::Robot::new(0, r.name).try_into()?;
let id = postgres::add(tx.as_mut(), model).await?; let id = postgres::add(tx.as_mut(), model).await?;
@ -23,25 +23,22 @@ pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result<GetRobot, Hand
Ok(robot.into()) Ok(robot.into())
} }
pub async fn get_robot(pool: PgSharedPool, id: i64) -> Result<GetRobot, HandlingError> { pub async fn get_robot(
let mut tx = pool shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
.begin() id: i64,
.await ) -> Result<GetRobot, HandlingError> {
.map(|c| Box::new(c)) let pool = shared.find::<PgSharedPool>().unwrap();
.map_err(|e| HandlingError::DBError(e))?; let mut tx = pool.begin().await?;
Ok(postgres::get_by_id(tx.as_mut(), id).await?.into()) Ok(postgres::get_by_id(tx.as_mut(), id).await?.into())
} }
pub async fn get_all( pub async fn get_all(
pool: PgSharedPool, shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
offset: i64, offset: i64,
limit: i64, limit: i64,
) -> Result<(Vec<GetRobot>, bool), HandlingError> { ) -> Result<(Vec<GetRobot>, bool), HandlingError> {
let mut tx = pool let pool = shared.find::<PgSharedPool>().unwrap();
.acquire() let mut tx = pool.begin().await?;
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let all = postgres::Collection::new(&mut tx); let all = postgres::Collection::new(&mut tx);
let (items, next) = all.paginate(offset, limit).await?; let (items, next) = all.paginate(offset, limit).await?;
@ -52,34 +49,31 @@ pub async fn get_all(
} }
pub async fn update_robot( pub async fn update_robot(
pool: PgSharedPool, shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
id: i64, id: i64,
r: GetRobot, r: GetRobot,
) -> Result<GetRobot, HandlingError> { ) -> Result<GetRobot, HandlingError> {
let mut tx = pool let pool = shared.find::<PgSharedPool>().unwrap();
.begin() let mut tx = pool.begin().await?;
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let model = robots::Robot::new(id, r.name).try_into()?; let model = robots::Robot::new(id, r.name).try_into()?;
postgres::update(tx.as_mut(), model).await?; postgres::update(tx.as_mut(), model).await?;
let robot = postgres::get_by_id(tx.as_mut(), id).await?; let robot = postgres::get_by_id(tx.as_mut(), id).await?;
tx.commit().await.map_err(|e| HandlingError::DBError(e))?; tx.commit().await.map_err(HandlingError::DBError)?;
Ok(robot.into()) Ok(robot.into())
} }
pub async fn delete_robot(pool: PgSharedPool, id: i64) -> Result<(), HandlingError> { pub async fn delete_robot(
let mut tx = pool shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
.begin() id: i64,
.await ) -> Result<(), HandlingError> {
.map(|c| Box::new(c)) let pool = shared.find::<PgSharedPool>().unwrap();
.map_err(|e| HandlingError::DBError(e))?; let mut tx = pool.begin().await?;
postgres::delete(tx.as_mut(), id).await?; postgres::delete(tx.as_mut(), id).await?;
tx.commit().await.map_err(|e| HandlingError::DBError(e))?; tx.commit().await.map_err(HandlingError::DBError)?;
Ok(()) Ok(())
} }

@ -17,7 +17,7 @@ struct APIError {
message: String, message: String,
} }
// attempts to build a structured response for a handling error // attempts to build a structured response from a handling error
// the response will have a JSON body containing an internal error code // the response will have a JSON body containing an internal error code
// and a detailed message about the error // and a detailed message about the error
fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp::http::Error> { fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp::http::Error> {
@ -48,7 +48,7 @@ fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp
// when all handlers have rejected a request either because the path didn't match // when all handlers have rejected a request either because the path didn't match
// if incorrect parameters were passed in the query string or request body // if incorrect parameters were passed in the query string or request body
// or a business logic, database or internal error ocurred // or a business logic, database or internal error ocurred
// we can turn handle the rejections making an alternative response // we can handle the rejection providing an alternative response
async fn handle_rejection( async fn handle_rejection(
r: warp::reject::Rejection, r: warp::reject::Rejection,
) -> Result<warp::reply::Response, warp::reject::Rejection> { ) -> Result<warp::reply::Response, warp::reject::Rejection> {
@ -70,22 +70,22 @@ async fn handle_rejection(
// here we can import the many filters that compose the services // here we can import the many filters that compose the services
// we want exposed, setup route structure and plug middlewares // we want exposed, setup route structure and plug middlewares
pub async fn listen( pub async fn listen(
shared: crate::simple::database::PgSharedPool,
addr: impl Into<std::net::SocketAddr> + 'static, addr: impl Into<std::net::SocketAddr> + 'static,
shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
) { ) {
// build a base Filter for our API so that any request of any kind to "/" // build a base Filter for our API so that any request of any kind to "/"
// will return "200 OK" with an empty response body // will return "200 OK" with an empty response body
let base = warp::any() let base = warp::any()
// match a path termination "/" character or lack thereof // match a path termination "/" character or lack thereof
.and(warp::path::end()) .and(warp::path::end())
// reply with a "200 OK" and an empty response body // reply with "200 OK" and an empty response body
.map(|| warp::reply()) .map(|| warp::reply())
// box the filter so it may be passed be passed around easily // box the filter so it may be passed be passed around easily
.boxed(); .boxed();
// begin building the API structure from the base Filter // begin building the API structure from the base Filter
// right now the pool of database connections is the only // right now the pool of database connections is the only
// shared resource we'll be needing right now // shared resource we'll be using for now
let api = APIBuilder::new(base, shared) let api = APIBuilder::new(base, shared)
// mount the "/robots" base prefix for creating and listing all robots in the database // mount the "/robots" base prefix for creating and listing all robots in the database
.route(robots::all) .route(robots::all)

@ -23,18 +23,24 @@ struct Pagination<T> {
} }
// add a new robot entry // add a new robot entry
async fn create(pool: PgSharedPool, r: app::forms::GetRobot) -> Result<impl Reply, Rejection> { async fn create(
reply_json(app::robots::add_robot(pool, r).await) shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
r: app::forms::GetRobot,
) -> Result<impl Reply, Rejection> {
reply_json(app::robots::add_robot(shared, r).await)
} }
// retrieve information of a single robot // retrieve information of a single robot
async fn get(pool: PgSharedPool, id: i64) -> Result<impl Reply, Rejection> { async fn get(
reply_json(app::robots::get_robot(pool, id).await) shared: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
id: i64,
) -> Result<impl Reply, Rejection> {
reply_json(app::robots::get_robot(shared, id).await)
} }
// updates robot information // updates robot information
async fn update( async fn update(
pool: PgSharedPool, pool: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
id: i64, id: i64,
r: app::forms::GetRobot, r: app::forms::GetRobot,
) -> Result<impl Reply, Rejection> { ) -> Result<impl Reply, Rejection> {
@ -42,7 +48,10 @@ async fn update(
} }
// returns a pagination of all robots // returns a pagination of all robots
async fn get_all(pool: PgSharedPool, p: PaginationOptions) -> Result<impl Reply, Rejection> { async fn get_all(
pool: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
p: PaginationOptions,
) -> Result<impl Reply, Rejection> {
reply_json( reply_json(
app::robots::get_all(pool, p.offset, p.limit) app::robots::get_all(pool, p.offset, p.limit)
.await .await
@ -54,7 +63,10 @@ async fn get_all(pool: PgSharedPool, p: PaginationOptions) -> Result<impl Reply,
} }
// will delete the requested robot // will delete the requested robot
async fn delete(pool: PgSharedPool, id: i64) -> Result<impl Reply, Rejection> { async fn delete(
pool: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
id: i64,
) -> Result<impl Reply, Rejection> {
match app::robots::delete_robot(pool, id).await { match app::robots::delete_robot(pool, id).await {
Ok(_) => Ok(warp::reply::with_status( Ok(_) => Ok(warp::reply::with_status(
warp::reply(), warp::reply(),
@ -67,7 +79,7 @@ async fn delete(pool: PgSharedPool, id: i64) -> Result<impl Reply, Rejection> {
// builds the route structure for "/robots" endpoint // builds the route structure for "/robots" endpoint
pub fn all( pub fn all(
base: warp::filters::BoxedFilter<(impl Reply + 'static,)>, base: warp::filters::BoxedFilter<(impl Reply + 'static,)>,
pool: PgSharedPool, pool: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
) -> warp::filters::BoxedFilter<(impl Reply,)> { ) -> warp::filters::BoxedFilter<(impl Reply,)> {
// create a prefix that injects a database connection for every request // create a prefix that injects a database connection for every request
// every filter is added after the prefix so it receives a database connection // every filter is added after the prefix so it receives a database connection
@ -101,7 +113,7 @@ pub fn all(
// builds the route structure for the "/robot/{id}" endpoint // builds the route structure for the "/robot/{id}" endpoint
pub fn with_id( pub fn with_id(
base: warp::filters::BoxedFilter<(impl Reply + 'static,)>, base: warp::filters::BoxedFilter<(impl Reply + 'static,)>,
pool: PgSharedPool, pool: std::sync::Arc<std::boxed::Box<crate::simple::shared::SharedContainer>>,
) -> warp::filters::BoxedFilter<(impl Reply,)> { ) -> warp::filters::BoxedFilter<(impl Reply,)> {
// we create a prefix containing the id of a robot and a shared database connection // we create a prefix containing the id of a robot and a shared database connection
// every filter is added after the prefix // every filter is added after the prefix

@ -13,8 +13,8 @@ async fn main() -> Result<(), sqlx::Error> {
// build a new shared pool of postgres connections // build a new shared pool of postgres connections
// by wrapping the pool object into an std::Arc // by wrapping the pool object into an std::Arc
// make it so it can be cloned in a shallow manner // make it so it can be cloned in a shallow manner
// and shared between many threads // and shared between multiple threads
let db_pool = simple::database::make_shared( let db_pool = simple::database::PgSharedPool::new(
sqlx::postgres::PgPoolOptions::new() sqlx::postgres::PgPoolOptions::new()
// set a maximum of 5 concurrent active connections // set a maximum of 5 concurrent active connections
// when the limit is reached any additional contexts // when the limit is reached any additional contexts
@ -26,5 +26,12 @@ async fn main() -> Result<(), sqlx::Error> {
.await?, .await?,
); );
Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await) let mut shared = crate::simple::shared::SharedContainer::new();
shared.add(db_pool);
Ok(http::listen(
([127, 0, 0, 1], 8000),
std::sync::Arc::new(std::boxed::Box::new(shared)),
)
.await)
} }

@ -1,7 +1,21 @@
use std::sync::Arc; use crate::simple::HandlingError;
pub type PgSharedPool = Arc<sqlx::Pool<sqlx::Postgres>>; // An application wide shared pool of postgres database connections
#[derive(Clone)]
pub struct PgSharedPool(sqlx::Pool<sqlx::Postgres>);
pub fn make_shared(conn: sqlx::Pool<sqlx::Postgres>) -> PgSharedPool { impl PgSharedPool {
Arc::new(conn) pub fn new(conn: sqlx::Pool<sqlx::Postgres>) -> Self {
Self(conn)
}
pub async fn begin<'a>(
&self,
) -> Result<Box<sqlx::Transaction<'a, sqlx::Postgres>>, HandlingError> {
self.0
.begin()
.await
.map(Box::new)
.map_err(HandlingError::DBError)
}
} }

@ -1,5 +1,6 @@
pub mod database; pub mod database;
pub mod http; pub mod http;
pub mod shared;
#[derive(Debug)] #[derive(Debug)]
pub enum HandlingError { pub enum HandlingError {

@ -0,0 +1,32 @@
use std::any;
use std::boxed::Box;
use std::vec::Vec;
pub struct SharedContainer {
items: Vec<Box<dyn any::Any + Send + Sync + 'static>>,
}
impl SharedContainer {
pub fn find<E>(&self) -> Option<&E>
where
E: any::Any + 'static,
{
for x in self.items.iter() {
if let Some(val) = x.downcast_ref::<E>() {
return Some(val);
}
}
None
}
pub fn add<E>(&mut self, val: E)
where
E: any::Any + Send + Sync + 'static,
{
self.items.push(Box::new(val))
}
pub fn new() -> Self {
Self { items: Vec::new() }
}
}
Loading…
Cancel
Save