Skip to content

Commit

Permalink
flowctl/suggest-schema: it's okay if the docs journal is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jul 6, 2023
1 parent bcc681f commit 4c00155
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions crates/flowctl/src/raw/suggest_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@


use doc::{inference::Shape, SchemaIndexBuilder, FailedValidation};
use futures::{TryStreamExt, StreamExt};
use futures::{TryStreamExt, StreamExt, Stream};

use json::schema::build::build_schema;
use models::Schema;
Expand All @@ -13,12 +11,12 @@ use schema_inference::{json_decoder::JsonCodec, inference::infer_shape, shape, s
use tokio::io::BufReader;
use tokio_util::{compat::FuturesAsyncReadCompatExt, codec::FramedRead};

use std::io::ErrorKind;
use std::{io::ErrorKind, pin::Pin};
use url::Url;

use crate::{catalog::{fetch_live_specs, List, SpecTypeSelector, NameSelector, collect_specs}, collection::{CollectionJournalSelector, Partition, read::{journal_reader, ReadArgs}}};

use anyhow::anyhow;
use anyhow::{anyhow, Context};

/// With some of our captures, we have an existing document schema for their collections, but we
/// frequently run into issues with these document schemas: they are sometimes completely wrong
Expand Down Expand Up @@ -95,7 +93,13 @@ pub async fn do_suggest_schema(
}

// Reader for the collection itself
let reader = journal_reader(ctx, &args).await?;
let reader = match journal_reader(ctx, &args).await {
Ok(r) => Some(r),
Err(e) if e.to_string().contains("does not exist or has never been written to") => {
None
},
Err(e) => anyhow::bail!(e)
};

// Reader for the ops log of the task
let ops_collection = "ops.us-central1.v1/logs".to_string();
Expand Down Expand Up @@ -135,7 +139,11 @@ pub async fn do_suggest_schema(

// Chain together the collection document reader and the log_invalid_documents stream so we can
// run schema-inference on both
let mut docs_stream = Box::pin(FramedRead::new(FuturesAsyncReadCompatExt::compat(reader), codec).map_err(to_io_error).chain(log_invalid_documents));
let mut docs_stream: Pin<Box<dyn Stream<Item = Result<serde_json::Value, std::io::Error>>>> = if let Some(reader) = reader {
Box::pin(FramedRead::new(FuturesAsyncReadCompatExt::compat(reader), codec).map_err(to_io_error).chain(log_invalid_documents))
} else {
Box::pin(log_invalid_documents)
};

// The original collection schema to be used as the starting point of schema-inference
let schema_model = collection_def.schema.unwrap();
Expand Down

0 comments on commit 4c00155

Please sign in to comment.