Move database utils and improve comments

develop
cahe 5 years ago
parent 0bec2a0e8c
commit 89ee547111

@ -1,17 +1,17 @@
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
use crate::simple::database::PgSharedPool;
use crate::base::robots;
use crate::data::postgres;
use crate::simple::handling::HandlingError;
use crate::simple::HandlingError;
#[derive(Serialize, Deserialize)]
pub struct GetRobot {
#[serde(skip_deserializing)]
pub id: i64,
pub name : String
pub name: String,
}
type Model = GetRobot;
@ -20,7 +20,7 @@ impl std::convert::From<robots::Robot> for GetRobot {
fn from(r: robots::Robot) -> GetRobot {
GetRobot {
id: r.id,
name: r.name
name: r.name,
}
}
}
@ -29,31 +29,37 @@ impl std::convert::Into<robots::Robot> for (i64, GetRobot) {
fn into(self) -> robots::Robot {
robots::Robot {
id: self.0,
name: self.1.name
name: self.1.name,
}
}
}
pub async fn add_robot(pool: PgSharedPool, r: GetRobot) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
let mut tx = pool
.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::RobotsImpl::new(&mut tx);
let id = robots::add(&mut repo, robots::Robot{id: 0, name: r.name}).await?;
let id = robots::add(
&mut repo,
robots::Robot {
id: 0,
name: r.name,
},
)
.await?;
let robot = repo.get_by_id(id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
tx.commit().await.map_err(|e| HandlingError::DBError(e))?;
Ok(robot.into())
}
pub async fn get_robot(pool: PgSharedPool, id: i64) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
let mut tx = pool
.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
@ -63,28 +69,31 @@ pub async fn get_robot(pool : PgSharedPool, id : i64) -> Result<GetRobot, Handli
}
pub async fn get_all(
pool : PgSharedPool, offset : i64, limit : i64
pool: PgSharedPool,
offset: i64,
limit: i64,
) -> Result<(Vec<Model>, bool), HandlingError> {
let mut tx = pool.begin()
let mut tx = pool
.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::ModelImpl::new(&mut tx);
let (items, next) = repo.get_all()?
.paginate(offset, limit)
.await?;
let (items, next) = repo.get_all()?.paginate(offset, limit).await?;
let data = items
.into_iter()
.map(|r| r.into())
.collect();
let data = items.into_iter().map(|r| r.into()).collect();
Ok((data, next))
}
pub async fn update_robot(pool : PgSharedPool, id : i64, r : GetRobot) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
pub async fn update_robot(
pool: PgSharedPool,
id: i64,
r: GetRobot,
) -> Result<GetRobot, HandlingError> {
let mut tx = pool
.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
@ -93,16 +102,14 @@ pub async fn update_robot(pool : PgSharedPool, id : i64, r : GetRobot) -> Result
let _: () = robots::update(&mut repo, (id, r).into()).await?;
let robot = repo.get_by_id(id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
tx.commit().await.map_err(|e| HandlingError::DBError(e))?;
Ok(robot.into())
}
pub async fn delete_robot(pool: PgSharedPool, id: i64) -> Result<(), HandlingError> {
let mut tx = pool.begin()
let mut tx = pool
.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
@ -110,10 +117,7 @@ pub async fn delete_robot(pool : PgSharedPool, id : i64) -> Result<(), HandlingE
let mut repo = postgres::ModelImpl::new(&mut tx);
let _: () = robots::delete(&mut repo, id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
tx.commit().await.map_err(|e| HandlingError::DBError(e))?;
Ok(())
}

@ -1,18 +1,14 @@
use std::future::Future;
use std::boxed::Box;
use std::pin::Pin;
use crate::simple::handling::HandlingError;
use crate::simple::{HandlingError, TraitFuture};
pub struct Robot {
pub id: i64,
pub name : String
pub name: String,
}
pub trait DefaultRepository {
fn add(&mut self, r : Robot) -> Pin<Box<dyn Future<Output = Result<i64, HandlingError>> + Send + '_>>;
fn update(&mut self, r : Robot) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>>;
fn delete(&mut self, id : i64) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>>;
fn add(&mut self, r: Robot) -> TraitFuture<Result<i64, HandlingError>>;
fn update(&mut self, r: Robot) -> TraitFuture<Result<(), HandlingError>>;
fn delete(&mut self, id: i64) -> TraitFuture<Result<(), HandlingError>>;
}
pub async fn add(repo: &mut impl DefaultRepository, r: Robot) -> Result<i64, HandlingError> {

@ -1,18 +1,16 @@
use crate::simple::TraitFuture;
use std::boxed::Box;
use std::pin::Pin;
use crate::simple::handling::{HandlingError};
use crate::base::robots;
use crate::simple::HandlingError;
use std::future::Future;
use sqlx::FromRow;
use sqlx::Done;
use sqlx::FromRow;
pub struct Collection<'a> {
tx: &'a mut Box<sqlx::Transaction<'static, sqlx::Postgres>>,
q: sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>,
q_id : Option<i64>
q_id: Option<i64>,
}
impl Collection<'_> {
@ -20,12 +18,13 @@ impl Collection<'_> {
Self {
tx: self.tx,
q: self.q,
q_id: Some(id)
q_id: Some(id),
}
}
pub async fn fetch(self, offset: i64, limit: i64) -> Result<Vec<robots::Robot>, HandlingError> {
let result = self.q
let result = self
.q
.bind(self.q_id.is_some())
.bind(self.q_id.unwrap_or_default())
.bind(offset)
@ -37,17 +36,18 @@ impl Collection<'_> {
.into_iter()
.map(|d| {
let (id, name) = <(i64, String)>::from_row(&d)?;
Ok(robots::Robot{
id: id,
name: name
})
Ok(robots::Robot { id: id, name: name })
})
.collect(),
Err(e) => Err(HandlingError::DBError(e))
Err(e) => Err(HandlingError::DBError(e)),
}
}
pub async fn paginate(self, offset : i64, limit : i64) -> Result<(Vec<robots::Robot>, bool), HandlingError> {
pub async fn paginate(
self,
offset: i64,
limit: i64,
) -> Result<(Vec<robots::Robot>, bool), HandlingError> {
let mut items = self.fetch(offset, limit + 1).await?;
if items.len() > (limit as usize) {
let _ = items.pop();
@ -59,7 +59,7 @@ impl Collection<'_> {
}
pub struct RobotsImpl<'a> {
pub tx : &'a mut Box<sqlx::Transaction<'static, sqlx::Postgres>>
pub tx: &'a mut Box<sqlx::Transaction<'static, sqlx::Postgres>>,
}
pub type ModelImpl<'a> = RobotsImpl<'a>;
@ -70,8 +70,7 @@ impl<'a> RobotsImpl<'_> {
}
async fn add(&mut self, r: robots::Robot) -> Result<i64, HandlingError> {
let result = sqlx::query(
"INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id")
let result = sqlx::query("INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id")
.bind(r.name)
.fetch_one(self.tx.as_mut())
.await;
@ -79,28 +78,24 @@ impl<'a> RobotsImpl<'_> {
Ok(v) => {
let (id,) = <(i64,)>::from_row(&v)?;
Ok(id)
},
}
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
Err(e) => Err(HandlingError::DBError(e)),
}
}
pub async fn get_by_id(&mut self, id: i64) -> Result<robots::Robot, HandlingError> {
let result = sqlx::query(
"SELECT * FROM robots WHERE id = $1::INTEGER")
let result = sqlx::query("SELECT * FROM robots WHERE id = $1::INTEGER")
.bind(id)
.fetch_one(self.tx.as_mut())
.await;
match result {
Ok(v) => {
let (id, name) = <(i64, String)>::from_row(&v)?;
Ok(robots::Robot{
id: id,
name: name
})
},
Ok(robots::Robot { id: id, name: name })
}
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
Err(e) => Err(HandlingError::DBError(e)),
}
}
@ -111,18 +106,17 @@ impl<'a> RobotsImpl<'_> {
WHERE
(CASE WHEN $1::BOOLEAN THEN (id = $2::INTEGER) ELSE TRUE END)
OFFSET $3
LIMIT $4"
LIMIT $4",
);
Ok(Collection {
tx: self.tx,
q: all,
q_id: None
q_id: None,
})
}
async fn update(&mut self, r: robots::Robot) -> Result<(), HandlingError> {
let result = sqlx::query(
"UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER")
let result = sqlx::query("UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER")
.bind(r.name)
.bind(r.id)
.execute(self.tx.as_mut())
@ -131,18 +125,15 @@ impl<'a> RobotsImpl<'_> {
Ok(v) => match v.rows_affected() {
1 => Ok(()),
0 => Err(HandlingError::NotFound),
_ => Err(HandlingError::InternalError)
_ => Err(HandlingError::InternalError),
},
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
Err(e) => Err(HandlingError::DBError(e)),
}
}
async fn delete(&mut self, id: i64) -> Result<(), HandlingError> {
let result = sqlx::query(
"DELETE FROM robots WHERE id = $1::INTEGER")
let result = sqlx::query("DELETE FROM robots WHERE id = $1::INTEGER")
.bind(id)
.execute(self.tx.as_mut())
.await;
@ -150,23 +141,23 @@ impl<'a> RobotsImpl<'_> {
Ok(v) => match v.rows_affected() {
1 => Ok(()),
0 => Err(HandlingError::NotFound),
_ => Err(HandlingError::InternalError)
_ => Err(HandlingError::InternalError),
},
Err(e) => Err(HandlingError::DBError(e))
Err(e) => Err(HandlingError::DBError(e)),
}
}
}
impl robots::DefaultRepository for RobotsImpl<'_> {
fn add(&mut self, r : robots::Robot) -> Pin<Box<dyn Future<Output = Result<i64, HandlingError>> + Send + '_>> {
fn add(&mut self, r: robots::Robot) -> TraitFuture<Result<i64, HandlingError>> {
Box::pin(self.add(r))
}
fn update(&mut self, r : robots::Robot) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>> {
fn update(&mut self, r: robots::Robot) -> TraitFuture<Result<(), HandlingError>> {
Box::pin(self.update(r))
}
fn delete(&mut self, id : i64) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>> {
fn delete(&mut self, id: i64) -> TraitFuture<Result<(), HandlingError>> {
Box::pin(self.delete(id))
}
}

@ -6,8 +6,8 @@ use warp::Reply;
use warp::http::StatusCode;
use crate::simple::handling::HandlingError;
use crate::simple::http::APIBuilder;
use crate::simple::HandlingError;
impl warp::reject::Reject for HandlingError {}
@ -17,16 +17,22 @@ struct APIError {
message: String,
}
// attempts to build a structured response for a handling error
// the response will have a JSON body containing an internal error code
// and a detailed message about the error
fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp::http::Error> {
let (status, code, message) = match e {
// a database access error will be treated as an internal error
HandlingError::DBError(_e) => (
StatusCode::INTERNAL_SERVER_ERROR, 1, format!("Database access error")
),
HandlingError::NotFound => (
StatusCode::NOT_FOUND, 2, format!("Not Found")
StatusCode::INTERNAL_SERVER_ERROR,
1,
format!("Database access error"),
),
HandlingError::NotFound => (StatusCode::NOT_FOUND, 2, format!("Not Found")),
HandlingError::InternalError => (
StatusCode::INTERNAL_SERVER_ERROR, 3, format!("Internal unexpected error")
StatusCode::INTERNAL_SERVER_ERROR,
3,
format!("Internal unexpected error"),
),
};
@ -38,41 +44,50 @@ fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp
Ok(warp::reply::with_status(body, status).into_response())
}
// handles an API request rejection
// when all handlers have rejected a request either because the path didn't match
// if incorrect parameters were passed in the query string or request body
// or a business logic, database or internal error ocurred
// we can turn handle the rejections making an alternative response
async fn handle_rejection(
r: warp::reject::Rejection,
) -> Result<warp::reply::Response, warp::reject::Rejection> {
eprintln!("{:#?}", r);
// right now we only handle our own handling error type
// errors returned by warp filters will be returned as they are
// their response and status code will remain unchanged
r.find::<HandlingError>()
.map(|e| {
build_error_response(e)
.unwrap_or(
StatusCode::INTERNAL_SERVER_ERROR
.into_response()
)
// any failed attempt to handle the error will create a "500 Internal server error"
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR.into_response())
})
// if we fail to identify a known handling error we return the rejection unchanged
.ok_or(r)
}
// will create the server for our HTTP API as a warp server
// here we can import the many filters that compose the services
// we want exposed, setup route structure and plug middlewares
pub async fn listen(
db_pool: crate::simple::database::PgSharedPool,
shared: crate::simple::database::PgSharedPool,
addr: impl Into<std::net::SocketAddr> + 'static,
) {
// build a base Filter for our API so that any request of any kind to "/"
// will return "200 OK" with an empty response body
let base = warp::any()
// match a path termination by a "/" character or lack thereof
// match a path termination "/" character or lack thereof
.and(warp::path::end())
// reply with a "200 OK" with an empty response body
// reply with a "200 OK" and an empty response body
.map(|| warp::reply())
// box the filter so it may be passed be passed around easily
.boxed();
// begin building the API structure from the base Filter
// right now the pool of database connections is the only
// shared resource that we want to passa around when handling requests
let api = APIBuilder::new(base, db_pool)
// mount the "/robots" base prefix for creating
// and listing all robots in the database
// shared resource we'll be needing right now
let api = APIBuilder::new(base, shared)
// mount the "/robots" base prefix for creating and listing all robots in the database
.route(robots::all)
// mount the "/robot/{id}" prefix for retrieving and updating robots information
.route(robots::with_id)
@ -81,6 +96,6 @@ pub async fn listen(
// attach a recover to handle rejections
.recover(handle_rejection);
// run the server returning a Future that will be run indefinitely
// run the server returning a Future that never resolves
warp::serve(api).run(addr).await
}

@ -28,10 +28,7 @@ async fn create(
reply_json(app::robots::add_robot(pool, r).await)
}
async fn get(
pool: PgSharedPool,
id: i64
) -> Result<impl warp::Reply, warp::Rejection> {
async fn get(pool: PgSharedPool, id: i64) -> Result<impl warp::Reply, warp::Rejection> {
reply_json(app::robots::get_robot(pool, id).await)
}
@ -57,19 +54,19 @@ async fn get_all(
)
}
async fn delete(
pool : PgSharedPool,
id : i64
) -> Result<impl warp::Reply, warp::Rejection> {
async fn delete(pool: PgSharedPool, id: i64) -> Result<impl warp::Reply, warp::Rejection> {
match app::robots::delete_robot(pool, id).await {
Ok(_) => Ok(warp::reply::with_status(warp::reply(), StatusCode::NO_CONTENT)),
Err(e) => Err(warp::reject::custom(e))
Ok(_) => Ok(warp::reply::with_status(
warp::reply(),
StatusCode::NO_CONTENT,
)),
Err(e) => Err(warp::reject::custom(e)),
}
}
pub fn all(
base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>,
pool: PgSharedPool
pool: PgSharedPool,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)> {
let prefix = warp::path!("robots").map(move || pool.clone());

@ -1,12 +1,30 @@
mod simple;
mod http;
mod app;
mod base;
mod data;
mod http;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let db_pool = simple::database::connect_pool().await?;
// fetch database configuration from environment variables
let db_address = std::env::var("POSTGRES_ADDRESS").expect("No database address specified");
// build a new shared pool of postgres connections
// by wrapping the pool object into an std::Arc
// make it so it can be cloned in a shallow manner
// and shared between many threads
let db_pool = simple::database::make_shared(
sqlx::postgres::PgPoolOptions::new()
// set a maximum of 5 concurrent active connections
// when the limit is reached any additional contexts
// will block until a connection is available
.max_connections(5)
// the address is expected to be a postgres connection string
// in the format postgres://user:password@address:port/database
.connect(db_address.as_str())
.await?,
);
Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await)
}

@ -1,12 +1,7 @@
pub type PgSharedPool = std::sync::Arc<sqlx::Pool<sqlx::Postgres>>;
use std::sync::Arc;
pub async fn connect_pool() -> Result<PgSharedPool, sqlx::Error> {
Ok(
std::sync::Arc::new(
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgresql://postgres:chapuleta@127.0.0.1:5432")
.await?
)
)
pub type PgSharedPool = Arc<sqlx::Pool<sqlx::Postgres>>;
pub fn make_shared(conn: sqlx::Pool<sqlx::Postgres>) -> PgSharedPool {
Arc::new(conn)
}

@ -1,12 +0,0 @@
#[derive(Debug)]
pub enum HandlingError {
DBError(sqlx::Error),
NotFound,
InternalError
}
impl From<sqlx::Error> for HandlingError {
fn from(e : sqlx::Error) -> HandlingError {
HandlingError::DBError(e)
}
}

@ -1,5 +1,20 @@
use std::future::Future;
use std::pin::Pin;
pub mod database;
pub mod handling;
pub mod init;
pub mod http;
pub type TraitFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[derive(Debug)]
pub enum HandlingError {
DBError(sqlx::Error),
NotFound,
InternalError,
}
impl From<sqlx::Error> for HandlingError {
fn from(e: sqlx::Error) -> HandlingError {
HandlingError::DBError(e)
}
}

Loading…
Cancel
Save