Base structure

develop
cahe 5 years ago
commit 0bec2a0e8c

2
.gitignore vendored

@ -0,0 +1,2 @@
/target
**/*.rs.bk

@ -0,0 +1,14 @@
[package]
name = "simple"
version = "0.1.0"
authors = ["cahe"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
warp = "0.2"
tokio = { version = "0.2", features = ["macros"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "postgres", "runtime-tokio", "macros" ] }

@ -0,0 +1 @@
pub mod robots;

@ -0,0 +1,119 @@
use serde::{Serialize, Deserialize};
use crate::simple::database::PgSharedPool;
use crate::base::robots;
use crate::data::postgres;
use crate::simple::handling::HandlingError;
#[derive(Serialize, Deserialize)]
pub struct GetRobot {
#[serde(skip_deserializing)]
pub id : i64,
pub name : String
}
type Model = GetRobot;
impl std::convert::From<robots::Robot> for GetRobot {
fn from(r : robots::Robot) -> GetRobot {
GetRobot{
id: r.id,
name: r.name
}
}
}
impl std::convert::Into<robots::Robot> for (i64, GetRobot) {
fn into(self) -> robots::Robot {
robots::Robot{
id: self.0,
name: self.1.name
}
}
}
pub async fn add_robot(pool : PgSharedPool, r : GetRobot) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::RobotsImpl::new(&mut tx);
let id = robots::add(&mut repo, robots::Robot{id: 0, name: r.name}).await?;
let robot = repo.get_by_id(id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
Ok(robot.into())
}
pub async fn get_robot(pool : PgSharedPool, id : i64) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::RobotsImpl::new(&mut tx);
Ok(repo.get_by_id(id).await?.into())
}
pub async fn get_all(
pool : PgSharedPool, offset : i64, limit : i64
) -> Result<(Vec<Model>, bool), HandlingError> {
let mut tx = pool.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::ModelImpl::new(&mut tx);
let (items, next) = repo.get_all()?
.paginate(offset, limit)
.await?;
let data = items
.into_iter()
.map(|r| r.into())
.collect();
Ok((data, next))
}
pub async fn update_robot(pool : PgSharedPool, id : i64, r : GetRobot) -> Result<GetRobot, HandlingError> {
let mut tx = pool.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::RobotsImpl::new(&mut tx);
let _ : () = robots::update(&mut repo, (id, r).into()).await?;
let robot = repo.get_by_id(id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
Ok(robot.into())
}
pub async fn delete_robot(pool : PgSharedPool, id : i64) -> Result<(), HandlingError> {
let mut tx = pool.begin()
.await
.map(|c| Box::new(c))
.map_err(|e| HandlingError::DBError(e))?;
let mut repo = postgres::ModelImpl::new(&mut tx);
let _ : () = robots::delete(&mut repo, id).await?;
tx
.commit()
.await
.map_err(|e| HandlingError::DBError(e))?;
Ok(())
}

@ -0,0 +1 @@
pub mod robots;

@ -0,0 +1,28 @@
use std::future::Future;
use std::boxed::Box;
use std::pin::Pin;
use crate::simple::handling::HandlingError;
pub struct Robot {
pub id : i64,
pub name : String
}
pub trait DefaultRepository {
fn add(&mut self, r : Robot) -> Pin<Box<dyn Future<Output = Result<i64, HandlingError>> + Send + '_>>;
fn update(&mut self, r : Robot) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>>;
fn delete(&mut self, id : i64) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>>;
}
pub async fn add(repo : &mut impl DefaultRepository, r : Robot) -> Result<i64, HandlingError> {
repo.add(r).await
}
pub async fn update(repo : &mut impl DefaultRepository, r : Robot) -> Result<(), HandlingError> {
repo.update(r).await
}
pub async fn delete(repo : &mut impl DefaultRepository, id : i64) -> Result<(), HandlingError> {
repo.delete(id).await
}

@ -0,0 +1 @@
pub mod postgres;

@ -0,0 +1,172 @@
use std::boxed::Box;
use std::pin::Pin;
use crate::simple::handling::{HandlingError};
use crate::base::robots;
use std::future::Future;
use sqlx::FromRow;
use sqlx::Done;
pub struct Collection<'a> {
tx : &'a mut Box<sqlx::Transaction<'static, sqlx::Postgres>>,
q : sqlx::query::Query<'a, sqlx::Postgres, sqlx::postgres::PgArguments>,
q_id : Option<i64>
}
impl Collection<'_> {
pub fn by_id(self, id : i64) -> Self {
Self {
tx: self.tx,
q: self.q,
q_id: Some(id)
}
}
pub async fn fetch(self, offset : i64, limit : i64) -> Result<Vec<robots::Robot>, HandlingError> {
let result = self.q
.bind(self.q_id.is_some())
.bind(self.q_id.unwrap_or_default())
.bind(offset)
.bind(limit)
.fetch_all(self.tx.as_mut())
.await;
match result {
Ok(v) => v
.into_iter()
.map(|d| {
let (id, name) = <(i64, String)>::from_row(&d)?;
Ok(robots::Robot{
id: id,
name: name
})
})
.collect(),
Err(e) => Err(HandlingError::DBError(e))
}
}
pub async fn paginate(self, offset : i64, limit : i64) -> Result<(Vec<robots::Robot>, bool), HandlingError> {
let mut items = self.fetch(offset, limit+1).await?;
if items.len() > (limit as usize) {
let _ = items.pop();
Ok((items, true))
} else {
Ok((items, false))
}
}
}
pub struct RobotsImpl<'a> {
pub tx : &'a mut Box<sqlx::Transaction<'static, sqlx::Postgres>>
}
pub type ModelImpl<'a> = RobotsImpl<'a>;
impl<'a> RobotsImpl<'_> {
pub fn new<'b>(tx : &'b mut Box<sqlx::Transaction<'static, sqlx::Postgres>>) -> RobotsImpl<'b> {
RobotsImpl{tx: tx}
}
async fn add(&mut self, r : robots::Robot) -> Result<i64, HandlingError> {
let result = sqlx::query(
"INSERT INTO robots (name) VALUES ($1::TEXT) RETURNING id")
.bind(r.name)
.fetch_one(self.tx.as_mut())
.await;
match result {
Ok(v) => {
let (id,) = <(i64,)>::from_row(&v)?;
Ok(id)
},
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
}
}
pub async fn get_by_id(&mut self, id : i64) -> Result<robots::Robot, HandlingError> {
let result = sqlx::query(
"SELECT * FROM robots WHERE id = $1::INTEGER")
.bind(id)
.fetch_one(self.tx.as_mut())
.await;
match result {
Ok(v) => {
let (id, name) = <(i64, String)>::from_row(&v)?;
Ok(robots::Robot{
id: id,
name: name
})
},
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
}
}
pub fn get_all(&'a mut self) -> Result<Collection<'a>, HandlingError> {
let all = sqlx::query::<sqlx::Postgres>(
"SELECT id, name
FROM robots
WHERE
(CASE WHEN $1::BOOLEAN THEN (id = $2::INTEGER) ELSE TRUE END)
OFFSET $3
LIMIT $4"
);
Ok(Collection{
tx: self.tx,
q: all,
q_id: None
})
}
async fn update(&mut self, r : robots::Robot) -> Result<(), HandlingError> {
let result = sqlx::query(
"UPDATE robots SET name = $1::TEXT WHERE id = $2::INTEGER")
.bind(r.name)
.bind(r.id)
.execute(self.tx.as_mut())
.await;
match result {
Ok(v) => match v.rows_affected() {
1 => Ok(()),
0 => Err(HandlingError::NotFound),
_ => Err(HandlingError::InternalError)
},
Err(sqlx::Error::RowNotFound) => Err(HandlingError::NotFound),
Err(e) => Err(HandlingError::DBError(e))
}
}
async fn delete(&mut self, id : i64) -> Result<(), HandlingError> {
let result = sqlx::query(
"DELETE FROM robots WHERE id = $1::INTEGER")
.bind(id)
.execute(self.tx.as_mut())
.await;
match result {
Ok(v) => match v.rows_affected() {
1 => Ok(()),
0 => Err(HandlingError::NotFound),
_ => Err(HandlingError::InternalError)
},
Err(e) => Err(HandlingError::DBError(e))
}
}
}
impl robots::DefaultRepository for RobotsImpl<'_> {
fn add(&mut self, r : robots::Robot) -> Pin<Box<dyn Future<Output = Result<i64, HandlingError>> + Send + '_>> {
Box::pin(self.add(r))
}
fn update(&mut self, r : robots::Robot) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>> {
Box::pin(self.update(r))
}
fn delete(&mut self, id : i64) -> Pin<Box<dyn Future<Output = Result<(), HandlingError>> + Send + '_>> {
Box::pin(self.delete(id))
}
}

@ -0,0 +1,86 @@
pub mod robots;
use serde::Serialize;
use warp::Filter;
use warp::Reply;
use warp::http::StatusCode;
use crate::simple::handling::HandlingError;
use crate::simple::http::APIBuilder;
impl warp::reject::Reject for HandlingError {}
#[derive(Serialize)]
struct APIError {
code: i64,
message: String,
}
fn build_error_response(e: &HandlingError) -> Result<warp::reply::Response, warp::http::Error> {
let (status, code, message) = match e {
HandlingError::DBError(_e) => (
StatusCode::INTERNAL_SERVER_ERROR, 1, format!("Database access error")
),
HandlingError::NotFound => (
StatusCode::NOT_FOUND, 2, format!("Not Found")
),
HandlingError::InternalError => (
StatusCode::INTERNAL_SERVER_ERROR, 3, format!("Internal unexpected error")
),
};
let body = warp::reply::json(&APIError {
code: code,
message: message,
});
Ok(warp::reply::with_status(body, status).into_response())
}
async fn handle_rejection(
r: warp::reject::Rejection,
) -> Result<warp::reply::Response, warp::reject::Rejection> {
eprintln!("{:#?}", r);
r.find::<HandlingError>()
.map(|e| {
build_error_response(e)
.unwrap_or(
StatusCode::INTERNAL_SERVER_ERROR
.into_response()
)
})
.ok_or(r)
}
pub async fn listen(
db_pool: crate::simple::database::PgSharedPool,
addr: impl Into<std::net::SocketAddr> + 'static,
) {
// build a base Filter for our API so that any request of any kind to "/"
// will return "200 OK" with an empty response body
let base = warp::any()
// match a path termination by a "/" character or lack thereof
.and(warp::path::end())
// reply with a "200 OK" with an empty response body
.map(|| warp::reply())
// box the filter so it may be passed be passed around easily
.boxed();
// begin building the API structure from the base Filter
// right now the pool of database connections is the only
// shared resource that we want to passa around when handling requests
let api = APIBuilder::new(base, db_pool)
// mount the "/robots" base prefix for creating
// and listing all robots in the database
.route(robots::all)
// mount the "/robot/{id}" prefix for retrieving and updating robots information
.route(robots::with_id)
// build the final filter that we'll be served
.build()
// attach a recover to handle rejections
.recover(handle_rejection);
// run the server returning a Future that will be run indefinitely
warp::serve(api).run(addr).await
}

@ -0,0 +1,111 @@
use warp::Filter;
use crate::simple::database::PgSharedPool;
use crate::simple::http::reply_json;
use serde::{Deserialize, Serialize};
use warp::http::StatusCode;
use crate::app;
#[derive(Deserialize)]
struct PaginationOptions {
pub offset: i64,
pub limit: i64,
}
#[derive(Serialize)]
struct Pagination<T> {
pub data: Vec<T>,
pub next: bool,
}
async fn create(
pool: PgSharedPool,
r: app::robots::GetRobot,
) -> Result<impl warp::Reply, warp::Rejection> {
reply_json(app::robots::add_robot(pool, r).await)
}
async fn get(
pool: PgSharedPool,
id: i64
) -> Result<impl warp::Reply, warp::Rejection> {
reply_json(app::robots::get_robot(pool, id).await)
}
async fn update(
pool: PgSharedPool,
id: i64,
r: app::robots::GetRobot,
) -> Result<impl warp::Reply, warp::Rejection> {
reply_json(app::robots::update_robot(pool, id, r).await)
}
async fn get_all(
pool: PgSharedPool,
p: PaginationOptions,
) -> Result<impl warp::Reply, warp::Rejection> {
reply_json(
app::robots::get_all(pool, p.offset, p.limit)
.await
.map(|(r, next)| Pagination {
data: r,
next: next,
}),
)
}
async fn delete(
pool : PgSharedPool,
id : i64
) -> Result<impl warp::Reply, warp::Rejection> {
match app::robots::delete_robot(pool, id).await {
Ok(_) => Ok(warp::reply::with_status(warp::reply(), StatusCode::NO_CONTENT)),
Err(e) => Err(warp::reject::custom(e))
}
}
pub fn all(
base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>,
pool: PgSharedPool
) -> warp::filters::BoxedFilter<(impl warp::Reply,)> {
let prefix = warp::path!("robots").map(move || pool.clone());
base.or(prefix
.clone()
.and(warp::filters::method::post())
.and(warp::body::json())
.and_then(create))
.or(prefix
.clone()
.and(warp::filters::method::get())
.and(warp::filters::query::query())
.and_then(get_all))
.boxed()
}
pub fn with_id(
base: warp::filters::BoxedFilter<(impl warp::Reply + 'static,)>,
pool: PgSharedPool,
) -> warp::filters::BoxedFilter<(impl warp::Reply,)> {
let prefix = warp::path!("robot" / i64)
.map(move |id| (pool.clone(), id))
.untuple_one();
base.or(prefix
.clone()
.and(warp::filters::method::get())
.and_then(get))
.or(prefix
.clone()
.and(warp::filters::method::put())
.and(warp::body::json())
.and_then(update))
.or(prefix
.clone()
.and(warp::filters::method::delete())
.and_then(delete))
.boxed()
}

@ -0,0 +1,12 @@
mod simple;
mod http;
mod app;
mod base;
mod data;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let db_pool = simple::database::connect_pool().await?;
Ok(http::listen(db_pool, ([127, 0, 0, 1], 8000)).await)
}

@ -0,0 +1,12 @@
pub type PgSharedPool = std::sync::Arc<sqlx::Pool<sqlx::Postgres>>;
pub async fn connect_pool() -> Result<PgSharedPool, sqlx::Error> {
Ok(
std::sync::Arc::new(
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgresql://postgres:chapuleta@127.0.0.1:5432")
.await?
)
)
}

@ -0,0 +1,12 @@
#[derive(Debug)]
pub enum HandlingError {
DBError(sqlx::Error),
NotFound,
InternalError
}
impl From<sqlx::Error> for HandlingError {
fn from(e : sqlx::Error) -> HandlingError {
HandlingError::DBError(e)
}
}

@ -0,0 +1,44 @@
use warp::Filter;
pub struct APIBuilder<T, S>
where
T: Filter,
S: Clone,
{
routes: T,
shared: S,
}
impl<T, S> APIBuilder<T, S>
where
T: Filter,
S: Clone,
{
pub fn new(base: T, shared: S) -> Self {
APIBuilder {
shared: shared,
routes: base,
}
}
pub fn route<U, F>(self, build: F) -> APIBuilder<U, S>
where
U: Filter,
F: FnOnce(T, S) -> U,
{
APIBuilder::new(build(self.routes, self.shared.clone()), self.shared)
}
pub fn build(self) -> T {
self.routes
}
}
pub fn reply_json<T, E>(r: Result<T, E>) -> Result<impl warp::Reply, warp::Rejection>
where
T: serde::Serialize,
E: warp::reject::Reject,
{
r.map(|s| warp::reply::json(&s))
.map_err(warp::reject::custom)
}

@ -0,0 +1,5 @@
pub mod database;
pub mod handling;
pub mod init;
pub mod http;
Loading…
Cancel
Save