Skip to content

Commit

Permalink
Use RwLock in ClientCredentials
Browse files Browse the repository at this point in the history
  • Loading branch information
photino committed Jul 25, 2023
1 parent bdc7e0c commit 009f434
Show file tree
Hide file tree
Showing 22 changed files with 628 additions and 226 deletions.
2 changes: 2 additions & 0 deletions zino-core/benches/criterion_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod base64_simd;
mod box_error;
mod format_duration;
mod hashmap_vec;
mod json_raw_value;
mod serde_map;
mod uuid_simd;

Expand All @@ -11,6 +12,7 @@ criterion::criterion_group!(
box_error::bench,
format_duration::bench,
hashmap_vec::bench,
json_raw_value::bench,
serde_map::bench,
uuid_simd::bench,
);
Expand Down
49 changes: 49 additions & 0 deletions zino-core/benches/json_raw_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use zino_core::{extension::JsonObjectExt, Map, Uuid};

pub fn bench(c: &mut criterion::Criterion) {
c.bench_function("serialize_json_value", |b| {
b.iter(|| {
let mut res = Map::new();
res.upsert("status_code", 200);
res.upsert("request_id", Uuid::new_v4().to_string());

let mut data = Map::new();
data.upsert("name", "alice");
data.upsert("age", 18);
data.upsert("roles", vec!["admin", "worker"]);
res.upsert("data", serde_json::to_value(&data).unwrap());
serde_json::to_vec(&res)
})
});
c.bench_function("serialize_json_raw_value", |b| {
b.iter(|| {
let mut res = Map::new();
res.upsert("status_code", 200);
res.upsert("request_id", Uuid::new_v4().to_string());

let mut data = Map::new();
data.upsert("name", "alice");
data.upsert("age", 18);
data.upsert("roles", vec!["admin", "worker"]);
res.upsert(
"data",
serde_json::value::to_raw_value(&data).unwrap().get(),
);
serde_json::to_vec(&res)
})
});
c.bench_function("serialize_json_object", |b| {
b.iter(|| {
let mut res = Map::new();
res.upsert("status_code", 200);
res.upsert("request_id", Uuid::new_v4().to_string());

let mut data = Map::new();
data.upsert("name", "alice");
data.upsert("age", 18);
data.upsert("roles", vec!["admin", "worker"]);
res.upsert("data", data);
serde_json::to_vec(&res)
})
});
}
2 changes: 1 addition & 1 deletion zino-core/src/auth/authorization_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ use crate::error::Error;
pub trait AuthorizationProvider {
/// Grants an access token for the client credentials.
async fn grant_client_credentials(
client_credentials: &mut ClientCredentials<Self>,
client_credentials: &ClientCredentials<Self>,
) -> Result<(), Error>;
}
31 changes: 16 additions & 15 deletions zino-core/src/auth/client_credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use crate::{
extension::{JsonObjectExt, TomlTableExt},
Map, SharedString,
};
use parking_lot::RwLock;
use std::{marker::PhantomData, time::Duration};
use toml::Table;

/// Credentials for the client authentication.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct ClientCredentials<S: ?Sized> {
/// Client ID.
client_id: SharedString,
Expand All @@ -18,9 +19,9 @@ pub struct ClientCredentials<S: ?Sized> {
/// Client secret.
client_secret: SharedString,
/// Access token.
access_token: String,
access_token: RwLock<String>,
/// Expires time.
expires_at: DateTime,
expires_at: RwLock<DateTime>,
/// Phantom type of authorization server.
phantom: PhantomData<S>,
}
Expand All @@ -33,8 +34,8 @@ impl<S: ?Sized> ClientCredentials<S> {
client_id: client_id.into(),
client_key: "".into(),
client_secret: client_secret.into(),
access_token: String::new(),
expires_at: DateTime::now(),
access_token: RwLock::new(String::new()),
expires_at: RwLock::new(DateTime::now()),
phantom: PhantomData,
}
}
Expand All @@ -53,8 +54,8 @@ impl<S: ?Sized> ClientCredentials<S> {
client_id: client_id.into(),
client_key: client_key.into(),
client_secret: client_secret.into(),
access_token: String::new(),
expires_at: DateTime::now(),
access_token: RwLock::new(String::new()),
expires_at: RwLock::new(DateTime::now()),
phantom: PhantomData,
})
}
Expand All @@ -67,14 +68,14 @@ impl<S: ?Sized> ClientCredentials<S> {

/// Sets the access token.
#[inline]
pub fn set_access_token(&mut self, access_token: impl ToString) {
self.access_token = access_token.to_string();
pub fn set_access_token(&self, access_token: impl ToString) {
*self.access_token.write() = access_token.to_string();
}

/// Sets the expires.
#[inline]
pub fn set_expires(&mut self, expires_in: Duration) {
self.expires_at = DateTime::now() + expires_in
pub fn set_expires(&self, expires_in: Duration) {
*self.expires_at.write() = DateTime::now() + expires_in
}

/// Returns the client ID.
Expand All @@ -97,14 +98,14 @@ impl<S: ?Sized> ClientCredentials<S> {

/// Returns the access token regardless of whether it has been expired.
#[inline]
pub fn access_token(&self) -> &str {
&self.access_token
pub fn access_token(&self) -> String {
self.access_token.read().clone()
}

/// Returns the time the client credentials expire at.
#[inline]
pub fn expires_at(&self) -> DateTime {
self.expires_at
*self.expires_at.read()
}

/// Returns `true` if the access token for the client credentials has been expired.
Expand Down Expand Up @@ -135,7 +136,7 @@ impl<S: ?Sized> ClientCredentials<S> {
impl<S: ?Sized + AuthorizationProvider> ClientCredentials<S> {
/// Requests an access token for the client credentials.
#[inline]
pub async fn request(&mut self) -> Result<&str, Error> {
pub async fn request(&self) -> Result<String, Error> {
if self.is_expired() {
S::grant_client_credentials(self).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion zino-core/src/auth/jwt_claims.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<T> JwtClaims<T> {
dt.as_secs()
.checked_add_signed(-DateTime::current_timestamp())
})
.map(|secs| Duration::from_secs(secs))
.map(Duration::from_secs)
.unwrap_or_default()
}

Expand Down
56 changes: 49 additions & 7 deletions zino-core/src/connector/connector_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct HttpConnector {
headers: Map,
/// Optional request body.
body: Option<Box<RawValue>>,
/// JSON Pointer for looking up a value from the response data.
json_pointer: Option<String>,
}

impl HttpConnector {
Expand All @@ -39,6 +41,7 @@ impl HttpConnector {
base_url: base_url.parse()?,
headers: Map::new(),
body: None,
json_pointer: None,
})
}

Expand All @@ -59,6 +62,10 @@ impl HttpConnector {
let raw_value = serde_json::value::to_raw_value(body)?;
connector.body = Some(raw_value);
}
if let Some(json_pointer) = config.get_str("json-pointer") {
connector.json_pointer = Some(json_pointer.into());
}

Ok(connector)
}

Expand All @@ -68,12 +75,27 @@ impl HttpConnector {
self.headers.upsert(key, value.into());
}

/// Sets the request query.
#[inline]
pub fn set_query<T: Serialize>(&mut self, query: &T) {
if let Ok(query) = serde_qs::to_string(query) {
self.base_url.set_query(Some(&query));
}
}

/// Sets the request body.
#[inline]
pub fn set_body<T: Serialize>(&mut self, body: &T) {
self.body = serde_json::value::to_raw_value(body).ok();
}

/// Sets a JSON Pointer for looking up a value from the response data.
/// It only applies when the response data a JSON object.
#[inline]
pub fn set_json_pointer(&mut self, pointer: impl Into<String>) {
self.json_pointer = Some(pointer.into());
}

/// Makes an HTTP request with the given query and params.
pub async fn fetch(&self, query: &str, params: Option<&Map>) -> Result<Response, Error> {
let mut url = self.base_url.clone();
Expand Down Expand Up @@ -153,14 +175,19 @@ impl Connector for HttpConnector {
let records = match self.fetch_json(query, params).await? {
JsonValue::Array(vec) => vec
.into_iter()
.filter_map(|value| value.into_map_opt())
.filter_map(|v| v.into_map_opt())
.map(|m| m.into_avro_record())
.collect::<Vec<_>>(),
JsonValue::Object(mut map) => {
if let Some(value) = map.remove("data").or_else(|| map.remove("result")) {
let data = if let Some(json_pointer) = &self.json_pointer {
map.lookup(json_pointer).cloned()
} else {
map.remove("data").or_else(|| map.remove("result"))
};
if let Some(value) = data {
if let JsonValue::Array(vec) = value {
vec.into_iter()
.filter_map(|value| value.into_map_opt())
.filter_map(|v| v.into_map_opt())
.map(|m| m.into_avro_record())
.collect::<Vec<_>>()
} else {
Expand All @@ -186,10 +213,15 @@ impl Connector for HttpConnector {
.filter_map(|value| value.into_map_opt())
.collect::<Vec<_>>(),
JsonValue::Object(mut map) => {
if let Some(value) = map.remove("data").or_else(|| map.remove("result")) {
let data = if let Some(json_pointer) = &self.json_pointer {
map.lookup(json_pointer).cloned()
} else {
map.remove("data").or_else(|| map.remove("result"))
};
if let Some(value) = data {
if let JsonValue::Array(vec) = value {
vec.into_iter()
.filter_map(|value| value.into_map_opt())
.filter_map(|v| v.into_map_opt())
.collect::<Vec<_>>()
} else {
vec![Map::from_entry("data", value)]
Expand All @@ -206,7 +238,12 @@ impl Connector for HttpConnector {
async fn query_one(&self, query: &str, params: Option<&Map>) -> Result<Option<Record>, Error> {
let record = match self.fetch_json(query, params).await? {
JsonValue::Object(mut map) => {
if let Some(value) = map.remove("data").or_else(|| map.remove("result")) {
let data = if let Some(json_pointer) = &self.json_pointer {
map.lookup(json_pointer).cloned()
} else {
map.remove("data").or_else(|| map.remove("result"))
};
if let Some(value) = data {
if let JsonValue::Object(data) = value {
data.into_avro_record()
} else {
Expand All @@ -227,7 +264,12 @@ impl Connector for HttpConnector {
params: Option<&Map>,
) -> Result<Option<T>, Error> {
if let JsonValue::Object(mut map) = self.fetch_json(query, params).await? {
if let Some(value) = map.remove("data").or_else(|| map.remove("result")) {
let data = if let Some(json_pointer) = &self.json_pointer {
map.lookup(json_pointer).cloned()
} else {
map.remove("data").or_else(|| map.remove("result"))
};
if let Some(value) = data {
serde_json::from_value(value).map_err(Error::from)
} else {
serde_json::from_value(map.into()).map_err(Error::from)
Expand Down
Loading

0 comments on commit 009f434

Please sign in to comment.