Skip to content

Commit

Permalink
refactor: simplify query param extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 30, 2025
1 parent 11a6378 commit a016af7
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ use crate::{event, stats};
use crate::{metadata, validator};
use actix_web::http::header::{self, HeaderMap};
use actix_web::http::StatusCode;
use actix_web::web::{Json, Path};
use actix_web::{web, HttpRequest, Responder};
use actix_web::web::{Json, Path, Query};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -261,10 +262,15 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, Stre
Ok(stats)
}

#[derive(Debug, Deserialize)]
pub struct StatsParams {
date: Option<String>,
}

pub async fn get_stats(
req: HttpRequest,
stream_name: Path<String>,
) -> Result<impl Responder, StreamError> {
Query(StatsParams { date }): Query<StatsParams>,
) -> Result<HttpResponse, StreamError> {
let stream_name = stream_name.into_inner();

if !STREAM_INFO.stream_exists(&stream_name) {
Expand All @@ -281,23 +287,9 @@ pub async fn get_stats(
}
}

let query_string = req.query_string();
if !query_string.is_empty() {
let tokens = query_string.split('=').collect::<Vec<&str>>();
let date_key = tokens[0];
let date_value = tokens[1];
if date_key != "date" {
return Err(StreamError::Custom {
msg: "Invalid query parameter".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;
return Ok((web::Json(stats), StatusCode::OK));
}
if let Some(date) = date {
let stats = get_stats_date(&stream_name, &date).await?;
return Ok(HttpResponse::build(StatusCode::OK).json(stats));
}

let stats = stats::get_current_stats(&stream_name, "json")
Expand Down Expand Up @@ -362,7 +354,7 @@ pub async fn get_stats(

let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
Ok(HttpResponse::build(StatusCode::OK).json(stats))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -767,9 +759,11 @@ mod tests {
use crate::handlers::http::logstream::get_stats;
use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders;
use actix_web::test::TestRequest;
use actix_web::web;
use actix_web::web::{self, Query};
use anyhow::bail;

use super::StatsParams;

// TODO: Fix this test with routes
// #[actix_web::test]
// #[should_panic]
Expand All @@ -780,9 +774,12 @@ mod tests {

#[actix_web::test]
async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> {
let req = TestRequest::default().to_http_request();

match get_stats(req, web::Path::from("test".to_string())).await {
match get_stats(
web::Path::from("test".to_string()),
Query(StatsParams { date: None }),
)
.await
{
Err(StreamError::StreamNotFound(_)) => Ok(()),
_ => bail!("expected StreamNotFound error"),
}
Expand Down

0 comments on commit a016af7

Please sign in to comment.