Skip to content

Commit

Permalink
Starting example rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirvivien committed Jan 26, 2025
1 parent 7428643 commit 52c7cb0
Show file tree
Hide file tree
Showing 30 changed files with 55 additions and 30 deletions.
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
Automi Examples
===============

* [Batch operators](./batch) - examples of the different automi operators.
* [Collectors](./collectors) - examples of different collector components.
* [Custom type](./csutomtype) - shows how to stream values of custom types.
* [Emitters](./emitters) - examples of different emitter compoennts.
* [Error handling](./error) - examples showing how to setup error handling.
* [gRPC streaming](./grpc) - examples using automi to stream from gPRC streaming services.
* [Logging](./logging) - examples of logging stream events at runtime.
* [MD5](./md5) - implementation of the MD5 example from Sameer Ajmani blog on [Concurrency Pattern](https://blog.golang.org/pipelines)
* [Network](/.net) - examples showing streaming data from TCP sockets and HTTP requests.
* [Sinks](./sinks) - examples of built-in sink components.
* [Sources](./sources) - examples of built-in source compoennts.
* [Wordcount](./wordcount) - a simple wordcount example using Automi opertors.
* [Window operators](./windowing) - Window and aggregation examples.
79 changes: 52 additions & 27 deletions examples/customtype/process.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/vladimirvivien/automi/operators/exec"
"github.com/vladimirvivien/automi/sinks"
"github.com/vladimirvivien/automi/sources"
"github.com/vladimirvivien/automi/stream"
)

Expand All @@ -22,35 +26,56 @@ type scientist struct {
// - filter based on selected value
// - write out result to a file
func main() {
stream := stream.New("./data.txt")

// Map csv row to type scientist
stream.Map(func(cs []string) scientist {
yr, _ := strconv.Atoi(cs[3])
return scientist{
FirstName: cs[1],
LastName: cs[0],
Title: cs[2],
BornYear: yr,
}
})

// Filters out scientst born after 1930
stream.Filter(func(cs scientist) bool {
return (cs.BornYear > 1930)
})

// remap value of type scientst to []string
stream.Map(func(cs scientist) []string {
return []string{cs.FirstName, cs.LastName, cs.Title}
})

// stream []string to sink out
stream.Into("./result.txt")

if err := <-stream.Open(); err != nil {
// prepare data source
src, err := os.Open("./data.txt")
if err != nil {
fmt.Println("Unable to open source:", err)
os.Exit(1)
}
defer src.Close()
source := sources.CSV(src)

// prepare data
snk, err := os.Create("./result.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer snk.Close()
sink := sinks.CSV(snk)

// start stream definition
stream := stream.From(source)

stream.Flow(
// map csv row to struct scientist
exec.Map(func(ctx context.Context, cs []string) scientist {
yr, _ := strconv.Atoi(cs[3])
return scientist{
FirstName: cs[1],
LastName: cs[0],
Title: cs[2],
BornYear: yr,
}
}),

// apply data filter
exec.Filter(func(ctx context.Context, cs scientist) bool {
return (cs.BornYear > 1930)
}),

// remap value of type scientst to []string
exec.Map(func(ctx context.Context, cs scientist) []string {
return []string{cs.FirstName, cs.LastName, cs.Title}
}),
)
// stream result into sink
stream.Into(sink)

if err := <-stream.Open(context.TODO()); err != nil {
fmt.Println(err)
os.Exit(1)
}

fmt.Println("wrote result to file result.txt")
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 52c7cb0

Please sign in to comment.