Skip to content

Commit

Permalink
Merge branch 'main' of github.com:numaproj/numaflow into flatmap
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 committed Jan 15, 2025
2 parents 19b1a2b + fd4a0aa commit 96807ae
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 124 deletions.
27 changes: 15 additions & 12 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
use std::sync::Arc;
use std::time::Duration;

use numaflow_pb::clients::map::map_client::MapClient;
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;

use crate::config::pipeline::map::MapMode;
use crate::error;
use crate::error::Error;
Expand All @@ -6,14 +16,6 @@ use crate::mapper::map::user_defined::{
};
use crate::message::Message;
use crate::tracker::TrackerHandle;
use numaflow_pb::clients::map::map_client::MapClient;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
pub(super) mod user_defined;

/// UnaryActorMessage is a message that is sent to the UnaryMapperActor.
Expand Down Expand Up @@ -481,19 +483,20 @@ impl MapHandle {

#[cfg(test)]
mod tests {
use super::*;
use crate::Result;
use std::time::Duration;

use crate::message::{MessageID, Offset, StringOffset};
use crate::shared::grpc::create_rpc_channel;
use numaflow::mapstream;
use numaflow::{batchmap, map};
use numaflow_pb::clients::map::map_client::MapClient;
use tempfile::TempDir;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;

use super::*;
use crate::message::{MessageID, Offset, StringOffset};
use crate::shared::grpc::create_rpc_channel;
use crate::Result;

struct SimpleMapper;

#[tonic::async_trait]
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/mapper/map/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,12 @@ impl UserDefinedStreamMap {

#[cfg(test)]
mod tests {
use numaflow::mapstream;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use numaflow::batchmap::Server;
use numaflow::mapstream;
use numaflow::{batchmap, map};
use numaflow_pb::clients::map::map_client::MapClient;
use tempfile::TempDir;
Expand Down
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,15 @@ impl TryFrom<Bytes> for Message {

#[cfg(test)]
mod tests {
use crate::error::Result;
use std::collections::HashMap;

use chrono::TimeZone;
use numaflow_pb::objects::isb::{
Body, Header, Message as ProtoMessage, MessageId, MessageInfo,
};
use std::collections::HashMap;

use super::*;
use crate::error::Result;

#[test]
fn test_offset_display() {
Expand Down
Loading

0 comments on commit 96807ae

Please sign in to comment.