Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semantic detection PoC #103

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
410 changes: 407 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
members = [
"gen",
"core",
"semdet",
"synth",
"dist/playground"
]
2 changes: 1 addition & 1 deletion core/src/graph/string/faker.rs
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ impl Default for Locale {
#[derive(Clone, Default, Deserialize, Debug, Serialize, PartialEq, Eq)]
pub struct FakerArgs {
#[serde(default)]
locales: Vec<Locale>,
pub locales: Vec<Locale>,
}

type FakerFunction = for<'r> fn(&'r mut dyn RngCore, &FakerArgs) -> String;
41 changes: 41 additions & 0 deletions semdet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[package]
name = "semantic-detection"
version = "0.1.0"
edition = "2018"
authors = [
"Damien Broka <damien@getsynth.com>",
]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "semantic_detection"
crate-type=["lib", "dylib"]

[features]
default = [ "dummy" ]
train = [ "pyo3" ]
dummy = [ ]
torch = [ "tch" ]

[dependencies.arrow]
version = "5.1.0"

[dependencies.fake]
version = "2.4.1"
features = ["http"]

[dependencies.pyo3]
version = "0.14.2"
optional = true
features = [ "extension-module" ]

[dependencies.tch]
version = "0.5.0"
optional = true

[dependencies.ndarray]
version = "0.15.3"

[dev-dependencies.rand]
version = "0.8.4"
22 changes: 22 additions & 0 deletions semdet/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::env;
use std::fs;
use std::io::Result;
use std::path::{Path, PathBuf};

fn main() -> Result<()> {
let pretrained_path = env::var_os("PRETRAINED")
.map(PathBuf::from)
.unwrap_or_else(|| Path::new("train").join("dummy.tch"));
let target_path = PathBuf::from(env::var_os("OUT_DIR").unwrap()).join("pretrained.tch");
eprintln!(
"attempting to copy pretrained weights:\n\t<- {}\n\t-> {}",
pretrained_path.to_str().unwrap(),
target_path.to_str().unwrap()
);
fs::copy(&pretrained_path, &target_path)?;
println!(
"cargo:rustc-env=PRETRAINED={}",
target_path.to_str().unwrap()
);
Ok(())
}
102 changes: 102 additions & 0 deletions semdet/src/decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use ndarray::{ArrayView, Ix1};

use std::convert::Infallible;

/// Trait for functions that produce a value from an input [`Array`](ndarray::Array) of prescribed
/// shape.
///
/// The type parameter `D` should probably be a [`Dimension`](ndarray::Dimension) for implementations
/// to be useful.
pub trait Decoder<D> {
type Err: std::error::Error + 'static;

/// The type of values returned.
type Value;

/// Compute and return a [`Self::Value`](Decoder::Value) from the input `tensor`.
///
/// Implementations are allowed to panic if `tensor.shape() != self.shape()`.
fn decode(&self, tensor: ArrayView<f32, D>) -> Result<Self::Value, Self::Err>;

/// The shape that is required of a valid input of this decoder.
fn shape(&self) -> D;
}

impl<'d, D, Dm> Decoder<Dm> for &'d D
where
D: Decoder<Dm>,
{
type Err = D::Err;
type Value = D::Value;

fn decode(&self, tensor: ArrayView<f32, Dm>) -> Result<Self::Value, Self::Err> {
<D as Decoder<Dm>>::decode(self, tensor)
}

fn shape(&self) -> Dm {
<D as Decoder<Dm>>::shape(self)
}
}

pub struct MaxIndexDecoder<S> {
index: Vec<S>,
}

impl<S> MaxIndexDecoder<S> {
/// # Panics
///
/// If `index` is empty.
pub fn from_vec(index: Vec<S>) -> Self {
assert!(
!index.is_empty(),
"passed `index` to `from_values` must not be empty"
);
Self { index }
}
}

impl<S> Decoder<Ix1> for MaxIndexDecoder<S>
where
S: Clone,
{
type Err = Infallible;
type Value = Option<S>;

fn decode(&self, tensor: ArrayView<f32, Ix1>) -> Result<Self::Value, Self::Err> {
let (idx, by) = tensor
.iter()
.enumerate()
.max_by(|(_, l), (_, r)| l.total_cmp(r))
.unwrap();
if *by > (1. / tensor.len() as f32) {
let value = self.index.get(idx).unwrap().clone();
Ok(Some(value))
} else {
Ok(None)
}
}

fn shape(&self) -> Ix1 {
Ix1(self.index.len())
}
}

#[cfg(test)]
pub mod tests {
use super::Decoder;
use super::MaxIndexDecoder;

use ndarray::{Array, Ix1};

#[test]
fn decoder_max_index() {
let decoder = MaxIndexDecoder::from_vec((0..10).collect());

for idx in 0..10 {
let mut input = Array::zeros(Ix1(10));
*input.get_mut(idx).unwrap() = 1.;
let output = decoder.decode(input.view()).unwrap();
assert_eq!(output, Some(idx));
}
}
}
219 changes: 219 additions & 0 deletions semdet/src/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use arrow::array::{GenericStringArray, StringOffsetSizeTrait};
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;

use ndarray::{ArrayViewMut, Axis, Ix1, Slice};

use std::collections::HashSet;
use std::convert::Infallible;

/// Trait for functions that compute an [`Array`](ndarray::Array) of prescribed shape from an input
/// [`RecordBatch`](arrow::record_batch::RecordBatch).
///
/// The type parameter `D` should probably be a [`Dimension`](ndarray::Dimension) for
/// implementations to be useful.
pub trait Encoder<D> {
type Err: std::error::Error + 'static;

/// Compute the values from the `input` and store the result in the initialized mutable `buffer`.
///
/// Implementations are allowed to panic if `buffer.shape() != self.shape()`.
fn encode<'f>(
&self,
input: &RecordBatch,
buffer: ArrayViewMut<'f, f32, D>,
) -> Result<(), Self::Err>;

/// The shape of the output of this encoder.
fn shape(&self) -> D;
}

impl<'e, E, Dm> Encoder<Dm> for &'e E
where
E: Encoder<Dm>,
{
type Err = E::Err;

fn encode<'f>(
&self,
input: &RecordBatch,
buffer: ArrayViewMut<'f, f32, Dm>,
) -> Result<(), Self::Err> {
<E as Encoder<Dm>>::encode(self, input, buffer)
}

fn shape(&self) -> Dm {
<E as Encoder<Dm>>::shape(self)
}
}

/// An [`Encoder`](Encoder) that simply counts how many rows in the input are present in a dictionary.
#[derive(Debug)]
pub struct Dictionary {
dict: HashSet<String>,
}

impl Dictionary {
/// Create a new dictionary from a collection of [`&str`](str).
pub fn new<'a, I>(values: I) -> Self
where
I: IntoIterator<Item = &'a str>,
{
Self {
dict: values.into_iter().map(|s| s.to_string()).collect(),
}
}

fn count<T: StringOffsetSizeTrait>(&self, data: &GenericStringArray<T>) -> u64 {
data.into_iter()
.filter_map(|m_s| m_s.and_then(|s| self.dict.get(s)))
.count() as u64
}
}

impl Encoder<Ix1> for Dictionary {
type Err = Infallible;

/// In the first column of `input`, count the number of rows which match an element of the
/// dictionary.
fn encode<'f>(
&self,
input: &RecordBatch,
mut buffer: ArrayViewMut<'f, f32, Ix1>,
) -> Result<(), Self::Err> {
let column = input.column(0);
let count = match column.data_type() {
DataType::Utf8 => {
let sar: &GenericStringArray<i32> = column.as_any().downcast_ref().unwrap();
Some(self.count(sar))
}
DataType::LargeUtf8 => {
let sar: &GenericStringArray<i64> = column.as_any().downcast_ref().unwrap();
Some(self.count(sar))
}
_ => None,
}
.and_then(|matches| {
if column.len() != 0 {
Some(matches as f32)
} else {
None
}
});
*buffer.get_mut(0).unwrap() = count.unwrap_or(f32::NAN);
Ok(())
}

fn shape(&self) -> Ix1 {
Ix1(1)
}
}

/// An [`Encoder`](Encoder) that horizontally stacks the output of a collection of [`Encoder`](Encoder)s.
pub struct StackedEncoder<D> {
stack: Vec<D>,
shape: Ix1,
}

impl<D> StackedEncoder<D>
where
D: Encoder<Ix1>,
{
/// Construct a new [`StackedEncoder`](StackedEncoder) from a collection of [`Encoder`](Encoder)s.
pub fn from_vec(stack: Vec<D>) -> Self {
let shape = stack.iter().map(|encoder| encoder.shape()[0]).sum();
Self {
stack,
shape: Ix1(shape),
}
}
}

impl<D> Encoder<Ix1> for StackedEncoder<D>
where
D: Encoder<Ix1>,
{
type Err = D::Err;

fn encode<'f>(
&self,
input: &RecordBatch,
mut buffer: ArrayViewMut<'f, f32, Ix1>,
) -> Result<(), Self::Err> {
let mut idx = 0usize;
for encoder in self.stack.iter() {
let next = idx + encoder.shape()[0];
let sliced = buffer.slice_axis_mut(Axis(0), Slice::from(idx..next));
encoder.encode(input, sliced)?;
idx = next;
}
Ok(())
}

fn shape(&self) -> Ix1 {
self.shape
}
}

#[cfg(test)]
pub mod tests {
use ndarray::{array, Array, Ix2};

use std::iter::once;

use super::{Dictionary, Encoder, StackedEncoder};
use crate::tests::*;

macro_rules! encode {
($encoder:ident, $input:ident, $output:expr) => {{
$encoder.encode(&$input, $output).unwrap();
}};
($encoder:ident, $input:ident) => {{
let mut output = Array::zeros($encoder.shape());
encode!($encoder, $input, output.view_mut());
output
}};
}

#[test]
fn encoder_dictionary() {
let last_names = <fake::locales::EN as fake::locales::Data>::NAME_LAST_NAME;
let encoder = Dictionary::new(last_names.iter().copied());
let names_array = string_array_of(fake::faker::name::en::LastName(), 1000);
let input = record_batch_of(once(names_array));
let output = encode!(encoder, input);
assert_eq!(output, array![511.]);
}

#[test]
fn encoder_stacked() {
let data = [
<fake::locales::EN as fake::locales::Data>::NAME_LAST_NAME,
<fake::locales::EN as fake::locales::Data>::ADDRESS_COUNTRY_CODE,
<fake::locales::EN as fake::locales::Data>::JOB_FIELD,
];
let encoder = StackedEncoder::from_vec(
data.iter()
.map(|values| Dictionary::new(values.iter().copied()))
.collect(),
);

let mut output = Array::zeros(Ix2(encoder.stack.len(), encoder.shape()[0]));
let slices = output.outer_iter_mut();

let inputs = vec![
string_array_of(fake::faker::name::en::LastName(), 1000),
string_array_of(fake::faker::address::en::CountryCode(), 1000),
string_array_of(fake::faker::job::en::Field(), 1000),
];
for (slice, array) in slices.zip(inputs) {
let input = record_batch_of(vec![array]);
encode!(encoder, input, slice);
}

assert_eq!(
output,
array![[511.0, 0.0, 0.0], [0.0, 511.0, 1.0], [0.0, 21.0, 500.0]]
);
}
}
53 changes: 53 additions & 0 deletions semdet/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use arrow::error::ArrowError;
use ndarray::ShapeError;

#[derive(Debug)]
pub enum Error {
Arrow(ArrowError),
Shape(ShapeError),
Encoder(Box<dyn std::error::Error + 'static>),
Model(Box<dyn std::error::Error + 'static>),
Decoder(Box<dyn std::error::Error + 'static>),
Implementation(String),
}

impl Error {
pub fn encoder<E: std::error::Error + 'static>(err: E) -> Self {
Self::Encoder(Box::new(err))
}

pub fn model<E: std::error::Error + 'static>(err: E) -> Self {
Self::Model(Box::new(err))
}

pub fn decoder<E: std::error::Error + 'static>(err: E) -> Self {
Self::Decoder(Box::new(err))
}
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Arrow(arrow) => write!(f, "arrow error: {}", arrow),
Self::Shape(shape) => write!(f, "shape error: {}", shape),
Self::Encoder(encoder) => write!(f, "encoder error: {}", encoder),
Self::Model(model) => write!(f, "model error: {}", model),
Self::Decoder(decoder) => write!(f, "decoder error: {}", decoder),
Self::Implementation(msg) => write!(f, "implementation error: {}", msg),
}
}
}

impl std::error::Error for Error {}

impl From<ArrowError> for Error {
fn from(arrow: ArrowError) -> Self {
Self::Arrow(arrow)
}
}

impl From<ShapeError> for Error {
fn from(shape: ShapeError) -> Self {
Self::Shape(shape)
}
}
74 changes: 74 additions & 0 deletions semdet/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#![feature(total_cmp)]

pub mod encode;
pub use encode::Encoder;

pub mod decode;
pub use decode::Decoder;

pub mod module;
pub use module::Module;

pub mod error;
pub use error::Error;

#[cfg(feature = "train")]
#[pyo3::proc_macro::pymodule]
fn semantic_detection(py: pyo3::Python, m: &pyo3::types::PyModule) -> pyo3::PyResult<()> {
let dummy = module::python_bindings::bind(py)?;
m.add_submodule(dummy)?;
Ok(())
}

#[cfg(test)]
pub mod tests {
use fake::{Dummy, Fake};
use rand::{rngs::StdRng, SeedableRng};

use arrow::array::{ArrayRef, StringArray, StringBuilder};
use arrow::record_batch::RecordBatch;

use std::sync::Arc;

pub fn rng() -> StdRng {
<StdRng as SeedableRng>::seed_from_u64(0xAAAAAAAAAAAAAAAA)
}

pub fn string_array_of<F>(f: F, len: usize) -> StringArray
where
String: Dummy<F>,
{
let mut builder = StringBuilder::new(len);
let mut rng = rng();
(0..len)
.into_iter()
.try_for_each(|_| builder.append_option(f.fake_with_rng::<Option<String>, _>(&mut rng)))
.unwrap();
builder.finish()
}

pub fn record_batch_of_with_names<S, A, I>(iter: I) -> RecordBatch
where
I: IntoIterator<Item = (S, A)>,
S: AsRef<str>,
A: arrow::array::Array + 'static,
{
RecordBatch::try_from_iter(
iter.into_iter()
.map(|(idx, array)| (idx.as_ref().to_string(), Arc::new(array) as ArrayRef)),
)
.unwrap()
}

pub fn record_batch_of<A, I>(iter: I) -> RecordBatch
where
I: IntoIterator<Item = A>,
A: arrow::array::Array + 'static,
{
record_batch_of_with_names(
iter.into_iter()
.enumerate()
.map(|(k, v)| (k.to_string(), v)),
)
}
}
426 changes: 426 additions & 0 deletions semdet/src/module.rs

Large diffs are not rendered by default.

Binary file added semdet/train/dummy.tch
Binary file not shown.
3 changes: 3 additions & 0 deletions synth/Cargo.toml
Original file line number Diff line number Diff line change
@@ -53,6 +53,9 @@ ctrlc = { version = "3.0", features = ["termination"] }
synth-core = { path = "../core" }
synth-gen = { path = "../gen" }

arrow = "5.1.0"
semantic-detection = { path = "../semdet" }

rust_decimal = "1.10.3"
indicatif = "0.15.0"

57 changes: 55 additions & 2 deletions synth/src/cli/import_utils.rs
Original file line number Diff line number Diff line change
@@ -9,9 +9,14 @@ use std::str::FromStr;
use synth_core::schema::content::number_content::U64;
use synth_core::schema::{
ArrayContent, FieldRef, Id, NumberContent, ObjectContent, OptionalMergeStrategy, RangeStep,
SameAsContent,
SameAsContent, StringContent, FakerContent
};
use synth_core::graph::string::{FakerArgs, Locale};
use synth_core::{Content, Name, Namespace};
use arrow::record_batch::RecordBatch;
use arrow::array::{StringArray, ArrayRef};
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct Collection {
@@ -56,8 +61,56 @@ fn populate_namespace_collections<T: DataSource + RelationalDataSource>(

namespace.put_collection(
&Name::from_str(table_name)?,
Collection::try_from((datasource, column_infos))?.collection,
Collection::try_from((datasource, column_infos.clone()))?.collection,
)?;

let module = semantic_detection::module::dummy::module();
let values = task::block_on(datasource.get_deterministic_samples(table_name))?;
let mut pivoted = HashMap::new();
for value in values.iter() {
let row = value.as_object().unwrap();
for (column, field) in row.iter() {
if let Some(content) = field.as_str() {
pivoted
.entry(column.to_string())
.or_insert_with(Vec::new)
.push(Some(content));
}

if field.is_null() {
if let Some(values) = pivoted.get_mut(column) {
values.push(None);
}
}
}
}

let column_infos = column_infos
.into_iter()
.map(|ci| (ci.column_name.to_string(), ci))
.collect::<HashMap<_, _>>();
let pivoted = pivoted.into_iter().map(|(k, v)| (k, Arc::new(StringArray::from(v)) as ArrayRef));
let record_batch = RecordBatch::try_from_iter(pivoted).unwrap();
let target = module.forward(&record_batch).unwrap();
for (column, generator) in target {
let column_meta = column_infos.get(&column).unwrap();
if let Some(generator) = generator {
let field_ref = FieldRef::new(& if column_meta.is_nullable {
format!("{}.content.{}.0", table_name, &column_meta.column_name)
} else {
format!("{}.content.{}", table_name, &column_meta.column_name)
})?;
if let Content::String(string_content) = namespace.get_s_node_mut(&field_ref)? {
*string_content = StringContent::Faker(FakerContent {
generator: generator.to_string(),
locales: vec![],
args: FakerArgs {
locales: vec![Locale::EN]
}
})
}
}
}
}

Ok(())
2 changes: 1 addition & 1 deletion synth/src/datasource/relational_datasource.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use synth_core::Content;

const DEFAULT_INSERT_BATCH_SIZE: usize = 1000;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub(crate) column_name: String,
pub(crate) ordinal_position: i32,