Skip to content

Commit

Permalink
Redesign the CloundEvent type
Browse files Browse the repository at this point in the history
  • Loading branch information
photino committed Dec 16, 2023
1 parent f9f1860 commit 4be2be7
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 86 deletions.
6 changes: 3 additions & 3 deletions zino-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ etag = "4.0.0"
faster-hex = "0.9.0"
fluent = "0.16.0"
futures = "0.3.29"
hkdf = "0.12.3"
hkdf = "0.12.4"
hmac = "0.12.1"
http = "0.2.11"
http-body = "0.4.6"
Expand All @@ -191,7 +191,7 @@ jwt-simple = "0.11.9"
md-5 = "0.10.6"
mime = "0.3.17"
mime_guess = "2.0.4"
multer = "2.1.0"
multer = "3.0.0"
parking_lot = "0.12.1"
percent-encoding = "2.3.1"
rand = "0.8.5"
Expand Down Expand Up @@ -241,7 +241,7 @@ version = "0.21.1"
optional = true

[dependencies.metrics-exporter-prometheus]
version = "0.12.1"
version = "0.12.2"
optional = true

[dependencies.minijinja]
Expand Down
92 changes: 65 additions & 27 deletions zino-core/src/channel/cloud_event.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,83 @@
use crate::{datetime::DateTime, JsonValue, Map};
use crate::{datetime::DateTime, JsonValue, Map, SharedString};
use serde::{Deserialize, Serialize};

/// Cloud event.
/// See [the spec](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md).
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(default)]
pub struct CloudEvent {
pub struct CloudEvent<T = ()> {
/// Spec version.
#[serde(rename = "specversion")]
spec_version: SharedString,
/// Event ID.
id: String,
/// Event source.
source: String,
/// Event topic.
/// Event type.
#[serde(rename = "type")]
topic: String,
/// Response data.
event_type: String,
/// Timestamp.
#[serde(rename = "time")]
timestamp: DateTime,
/// Event data.
#[serde(skip_serializing_if = "JsonValue::is_null")]
data: JsonValue,
/// Session ID.
/// Optional data content type.
#[serde(rename = "datacontenttype")]
data_content_type: Option<SharedString>,
/// Optional data schema.
#[serde(rename = "dataschema")]
data_schema: Option<SharedString>,
/// Optional subject.
#[serde(skip_serializing_if = "Option::is_none")]
subject: Option<SharedString>,
/// Optional session ID.
#[serde(rename = "sessionid")]
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
/// Timestamp.
#[serde(rename = "time")]
timestamp: DateTime,
/// Version.
#[serde(rename = "specversion")]
version: String,
/// Extensions.
#[serde(flatten)]
extensions: T,
}

impl CloudEvent {
impl<T: Default> CloudEvent<T> {
/// Creates a new instance.
#[inline]
pub fn new(id: String, source: String, topic: String, data: JsonValue) -> Self {
pub fn new(id: String, source: String, event_type: String) -> Self {
Self {
spec_version: "1.0".into(),
id,
source,
topic,
data,
session_id: None,
event_type,
timestamp: DateTime::now(),
version: "1.0".to_owned(),
data: JsonValue::Null,
data_content_type: None,
data_schema: None,
subject: None,
session_id: None,
extensions: T::default(),
}
}
}

/// Sets the session ID.
impl<T> CloudEvent<T> {
/// Sets the event data.
#[inline]
pub fn set_session_id(&mut self, session_id: String) {
self.session_id = Some(session_id);
pub fn set_data(&mut self, data: impl Into<JsonValue>) {
self.data = data.into();
}

/// Returns the session ID.
/// Sets the subject.
#[inline]
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
pub fn set_subject(&mut self, subject: impl Into<SharedString>) {
self.subject = Some(subject.into());
}

/// Sets the session ID.
#[inline]
pub fn set_session_id(&mut self, session_id: impl ToString) {
self.session_id = Some(session_id.to_string());
}

/// Returns the event ID as a `str`.
Expand All @@ -68,18 +92,32 @@ impl CloudEvent {
self.source.as_str()
}

/// Returns the event topic (a.k.a *event type*) as a `str`.
/// Returns the event type as a `str`.
#[inline]
pub fn event_type(&self) -> &str {
self.event_type.as_str()
}

/// Returns a reference to the optional subject.
#[inline]
pub fn topic(&self) -> &str {
self.topic.as_str()
pub fn subject(&self) -> Option<&str> {
self.subject.as_deref()
}

/// Returns a reference to the optional session ID.
#[inline]
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}

/// Stringifies the event data as `String`.
#[inline]
pub fn stringify_data(&self) -> String {
self.data.to_string()
}
}

impl<T: Serialize> CloudEvent<T> {
/// Consumes the event and returns as a json object.
///
/// # Panics
Expand Down
15 changes: 12 additions & 3 deletions zino-core/src/model/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,20 @@ impl Query {
self.filters.remove(key)
}

/// Sets the sort order.
/// Sets the sort with an ascending order.
#[inline]
pub fn set_sort_order(&mut self, field: impl Into<SharedString>, descending: bool) {
pub fn order_by_asc(&mut self, field: impl Into<SharedString>) {
let field = field.into();
self.sort_order.retain(|(s, _)| s != &field);
self.sort_order.push((field, descending));
self.sort_order.push((field, false));
}

/// Sets the sort with an descending order.
#[inline]
pub fn order_by_desc(&mut self, field: impl Into<SharedString>) {
let field = field.into();
self.sort_order.retain(|(s, _)| s != &field);
self.sort_order.push((field, true));
}

/// Sets the query offset.
Expand Down Expand Up @@ -226,6 +234,7 @@ impl Query {
}

/// Returns the sort order.
/// A `true` boolean value represents a descending order.
#[inline]
pub fn sort_order(&self) -> &[(SharedString, bool)] {
&self.sort_order
Expand Down
2 changes: 1 addition & 1 deletion zino-core/src/orm/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ where
query.allow_fields(Self::fields());
query.deny_fields(&ignored_fields);
query.add_filter("status", Map::from_entry("$ne", "Deleted"));
query.set_sort_order("updated_at", true);
query.order_by_desc("updated_at");
query
}

Expand Down
38 changes: 13 additions & 25 deletions zino-core/src/orm/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,18 +793,18 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {

/// Populates the related data in the corresponding `columns` for `Vec<Map>` using
/// a merged select on the primary key, which solves the `N+1` problem.
async fn populate<const N: usize>(
async fn populate(
query: &mut Query,
data: &mut Vec<Map>,
columns: [&str; N],
columns: &[&str],
) -> Result<u64, Error> {
let pool = Self::acquire_reader().await?.pool();
Self::before_query(query).await?;

let primary_key_name = Self::PRIMARY_KEY_NAME;
let mut values = Vec::new();
for row in data.iter() {
for col in columns {
for &col in columns {
if let Some(value) = row.get(col).cloned() {
if let JsonValue::Array(mut vec) = value {
values.append(&mut vec);
Expand Down Expand Up @@ -849,7 +849,7 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {
Self::after_query(&ctx).await?;

for row in data {
for col in columns {
for &col in columns {
if let Some(vec) = row.get_array(col)
&& !vec.is_empty()
{
Expand Down Expand Up @@ -884,17 +884,17 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {

/// Populates the related data in the corresponding `columns` for `Map` using
/// a merged select on the primary key, which solves the `N+1` problem.
async fn populate_one<const N: usize>(
async fn populate_one(
query: &mut Query,
data: &mut Map,
columns: [&str; N],
columns: &[&str],
) -> Result<(), Error> {
let pool = Self::acquire_reader().await?.pool();
Self::before_query(query).await?;

let primary_key_name = Self::PRIMARY_KEY_NAME;
let mut values = Vec::new();
for col in columns {
for &col in columns {
if let Some(value) = data.get(col).cloned() {
if let JsonValue::Array(mut vec) = value {
values.append(&mut vec);
Expand Down Expand Up @@ -935,7 +935,7 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {
Self::after_scan(&ctx).await?;
Self::after_query(&ctx).await?;

for col in columns {
for &col in columns {
if let Some(vec) = data.get_array(col)
&& !vec.is_empty()
{
Expand Down Expand Up @@ -969,11 +969,7 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {

/// Performs a left outer join to another table to filter rows in the joined table,
/// and decodes it as `Vec<T>`.
async fn lookup<M, T, const N: usize>(
query: &Query,
left_columns: [&str; N],
right_columns: [&str; N],
) -> Result<Vec<T>, Error>
async fn lookup<M, T>(query: &Query, columns: &[(&str, &str)]) -> Result<Vec<T>, Error>
where
M: Schema,
T: DecodeRow<DatabaseRow, Error = Error>,
Expand All @@ -989,9 +985,8 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {
let filters = query.format_filters::<Self>();
let sort = query.format_sort();
let pagination = query.format_pagination();
let on_expressions = left_columns
let on_expressions = columns
.iter()
.zip(right_columns.iter())
.map(|(left_col, right_col)| {
let left_col = format!("{model_name}.{left_col}");
let right_col = format!("{other_model_name}.{right_col}");
Expand Down Expand Up @@ -1022,16 +1017,12 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {

/// Performs a left outer join to another table to filter rows in the "joined" table,
/// and parses it as `Vec<T>`.
async fn lookup_as<M, T, const N: usize>(
query: &Query,
left_columns: [&str; N],
right_columns: [&str; N],
) -> Result<Vec<T>, Error>
async fn lookup_as<M, T>(query: &Query, columns: &[(&str, &str)]) -> Result<Vec<T>, Error>
where
M: Schema,
T: DeserializeOwned,
{
let mut data = Self::lookup::<M, Map, N>(query, left_columns, right_columns).await?;
let mut data = Self::lookup::<M, Map>(query, columns).await?;
let translate_enabled = query.translate_enabled();
for model in data.iter_mut() {
Self::after_decode(model).await?;
Expand Down Expand Up @@ -1403,10 +1394,7 @@ pub trait Schema: 'static + Send + Sync + ModelHooks {
}

/// Returns `true` if the model is unique on the column values.
async fn is_unique_on<const N: usize>(
&self,
columns: [(&str, JsonValue); N],
) -> Result<bool, Error> {
async fn is_unique_on(&self, columns: Vec<(&str, JsonValue)>) -> Result<bool, Error> {
let primary_key_name = Self::PRIMARY_KEY_NAME;
let mut query = Query::default();
let mut fields = vec![primary_key_name];
Expand Down
9 changes: 5 additions & 4 deletions zino-core/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ pub trait RequestContext {
}
}

/// Creates a new subscription instance.
/// Constructs a new subscription instance.
fn subscription(&self) -> Subscription {
let mut subscription = self.parse_query::<Subscription>().unwrap_or_default();
if subscription.session_id().is_none()
Expand All @@ -775,14 +775,15 @@ pub trait RequestContext {
subscription
}

/// Creates a new cloud event instance.
fn cloud_event(&self, topic: String, data: JsonValue) -> CloudEvent {
/// Constructs a new cloud event instance.
fn cloud_event(&self, event_type: String, data: JsonValue) -> CloudEvent {
let id = self.request_id().to_string();
let source = self.instance();
let mut event = CloudEvent::new(id, source, topic, data);
let mut event = CloudEvent::new(id, source, event_type);
if let Some(session_id) = self.session_id() {
event.set_session_id(session_id);
}
event.set_data(data);
event
}
}
Loading

0 comments on commit 4be2be7

Please sign in to comment.