Skip to content

Commit

Permalink
Merge pull request #1937 from keboola/k6-stream-benchmark
Browse files Browse the repository at this point in the history
feat: Add benchmark of stream API using k6 provisioning to k8s
  • Loading branch information
jachym-tousek-keboola authored Sep 4, 2024
2 parents 1cfc89b + 6d873a7 commit 406df2c
Show file tree
Hide file tree
Showing 81 changed files with 1,166 additions and 535 deletions.
9 changes: 9 additions & 0 deletions cmd/stream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"syscall"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -104,6 +105,14 @@ func run(ctx context.Context, cfg config.Config, posArgs []string) error {
return err
}

// Check max opened files limit
var limit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit); err != nil {
logger.Warnf(ctx, `cannot get opened file descriptors limit value: %s`, err)
} else if limit.Cur < 10000 {
logger.Warnf(ctx, `opened file descriptors limit is too small: %d`, limit.Cur)
}

// Create dependencies
d, err := dependencies.NewServiceScope(ctx, cfg, proc, logger, tel, os.Stdout, os.Stderr) //nolint:forbidigo
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ services:
volumes:
- ./:/code:z
- cache:/tmp/cache
ulimits:
nofile:
soft: 50000
hard: 50000
environment:
# For all
- TEST_KBC_PROJECTS_FILE=/code/projects.json
Expand All @@ -24,7 +28,6 @@ services:
# For Stream Service
- STREAM_NODE_ID=my-node
- STREAM_HOSTNAME=localhost
- STREAM_API_STORAGE_API_HOST=connection.keboola.com
- STREAM_API_LISTEN=0.0.0.0:8001
- STREAM_API_PUBLIC_URL=http://localhost:8001
- STREAM_SOURCE_HTTP_LISTEN=0.0.0.0:8010
Expand Down Expand Up @@ -103,6 +106,10 @@ services:
- ./scripts:/scripts
image: grafana/k6
network_mode: host
ulimits:
nofile:
soft: 50000
hard: 50000
environment:
- API_USE_HTTPS
- API_HOST
Expand Down
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ API development uses [Goa code generator](https://goa.design/).

To run the API locally:
1. Start `bash` in the dev container, run `docker compose run --rm -u "$UID:$GID" --service-ports dev bash`.
2. Set env var with Keboola stack: `export TEMPLATES_STORAGE_API_HOST=connection.keboola.com` (or `STREAM_API_STORAGE_API_HOST`)
2. Set env var with Keboola stack: `export TEMPLATES_STORAGE_API_HOST=connection.keboola.com` (or `STREAM_STORAGE_API_HOST`)
3. In the container run `make run-templates-api` (or `make run-stream-service`)
- The API is exposed to `http://localhost:8000/` (or `http://localhost:8001/`
- When the code changes, the API recompiles and restarts.
Expand Down
10 changes: 6 additions & 4 deletions docs/stream/benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
```sh
export STREAM_COMPONENTS="api http-source storage-writer storage-reader storage-coordinator"
export STREAM_ETCD_NAMESPACE=stream-bench-001
export STREAM_DEBUG_LOG=false
export STREAM_NODE_ID=my-node
export STREAM_HOSTNAME=localhost
export STREAM_API_STORAGE_API_HOST=connection.keboola.com
export STREAM_STORAGE_API_HOST=connection.keboola.com
export STREAM_API_LISTEN=0.0.0.0:10000
export STREAM_API_PUBLIC_URL=http://localhost:10000
export STREAM_SOURCE_HTTP_LISTEN=0.0.0.0:10001
Expand All @@ -17,23 +18,24 @@ export STREAM_PPROF_ENABLED=false
export STREAM_PPROF_LISTEN="0.0.0.0:4000"
mkdir -p "$STREAM_STORAGE_VOLUMES_PATH/hdd/001"
docker compose run \
--rm \
-u "$UID:$GID" \
-p 4000:4000 \
-p 10000:10000 \
-p 10001:10001 \
-v "$STREAM_STORAGE_VOLUMES_PATH:$STREAM_STORAGE_VOLUMES_PATH" \
-e STREAM_ETCD_NAMESPACE \
-e STREAM_DEBUG_LOG \
-e STREAM_NODE_ID \
-e STREAM_HOSTNAME \
-e STREAM_API_STORAGE_API_HOST \
-e STREAM_STORAGE_API_HOST \
-e STREAM_API_LISTEN \
-e STREAM_API_PUBLIC_URL \
-e STREAM_SOURCE_HTTP_LISTEN \
-e STREAM_SOURCE_HTTP_PUBLIC_URL \
-e STREAM_STORAGE_VOLUMES_PATH \
-e STREAM_PPROF_ENABLED \
-e STREAM_PPROF_LISTEN \
--rm \
dev bash -c "go run ./cmd/stream/main.go -- $STREAM_COMPONENTS | jl"
```

Expand All @@ -51,7 +53,7 @@ dev bash -c "go run ./cmd/stream/main.go -- $STREAM_COMPONENTS | jl"
```sh
export API_TOKEN=<token>
export API_HOST=$STREAM_API_PUBLIC_URL
docker compose run -u "$UID:$GID" k6 run /scripts/k6/stream-api/<name>
docker compose run --rm -u "$UID:$GID" k6 run /scripts/k6/stream-api/<name>
```

Where `<name>` is one of the following benchmark names:
Expand Down
36 changes: 36 additions & 0 deletions internal/pkg/service/common/etcdop/watch_mirror_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type MirrorMap[T any, K comparable, V any] struct {
onUpdate []func(update MirrorUpdate)
onChanges []func(changes MirrorUpdateChanges[K, V])

updatedLock sync.RWMutex
updated chan struct{}

mapLock sync.RWMutex
mapData map[K]V
revisionLock sync.RWMutex
Expand Down Expand Up @@ -64,6 +67,7 @@ func (s MirrorMapSetup[T, K, V]) BuildMirror() *MirrorMap[T, K, V] {
mapData: make(map[K]V),
onUpdate: s.onUpdate,
onChanges: s.onChanges,
updated: make(chan struct{}),
}
}

Expand Down Expand Up @@ -137,8 +141,15 @@ func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGr
m.revisionLock.Lock()
m.revision = header.Revision
m.revisionLock.Unlock()

logger.Debugf(ctx, `watch stream mirror synced to revision %d`, header.Revision)

// Unblock WaitForRevision loops
m.updatedLock.Lock()
close(m.updated)
m.updated = make(chan struct{})
m.updatedLock.Unlock()

// Call callbacks
for _, fn := range m.onUpdate {
go fn(update)
Expand Down Expand Up @@ -184,6 +195,31 @@ func (m *MirrorMap[T, K, V]) Revision() int64 {
return m.revision
}

func (m *MirrorMap[T, K, V]) WaitForRevision(ctx context.Context, expected int64) error {
for {
m.revisionLock.RLock()
actual := m.revision
m.revisionLock.RUnlock()

// Is the condition already met?
if actual >= expected {
return nil
}

// Get update notifier
m.updatedLock.RLock()
notifier := m.updated
m.updatedLock.RUnlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-notifier:
// try again
}
}
}

func (m *MirrorMap[T, K, V]) Len() int {
m.mapLock.RLock()
defer m.mapLock.RUnlock()
Expand Down
37 changes: 28 additions & 9 deletions internal/pkg/service/common/etcdop/watch_mirror_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
func TestMirrorMap(t *testing.T) {
t.Parallel()

wg := &sync.WaitGroup{}
defer wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

wg := &sync.WaitGroup{}
defer wg.Wait()

// Create a typed prefix with some keys
client := etcdhelper.ClientForTest(t, etcdhelper.TmpNamespace(t))
pfx := NewTypedPrefix[testUser]("my/prefix", serde.NewJSON(serde.NoValidation))
Expand Down Expand Up @@ -103,17 +103,36 @@ func TestMirrorMap(t *testing.T) {
"Jacob Brown": 15,
"Luke Blue": 30,
}, mirror.CloneMap())

// WaitForRevision - in the past
assert.NoError(t, mirror.WaitForRevision(ctx, header.Revision-1))
assert.NoError(t, mirror.WaitForRevision(ctx, header.Revision))

// WaitForRevision - in the future
revInFuture := header.Revision + 1
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, mirror.WaitForRevision(ctx, revInFuture))
assert.Equal(t, map[testUserFullName]int{
"Jacob Brown": 16, // <<<<<<<<
"Luke Blue": 30,
}, mirror.CloneMap())
}()
time.Sleep(50 * time.Millisecond)
header, err = pfx.Key("key1").Put(client, testUser{FirstName: "Jacob", LastName: "Brown", Age: 16}).Do(ctx).HeaderOrErr()
require.NoError(t, err)
}

func TestMirror_WithOnUpdate(t *testing.T) {
t.Parallel()

wg := &sync.WaitGroup{}
defer wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

wg := &sync.WaitGroup{}
defer wg.Wait()

// Create a typed prefix with some keys
client := etcdhelper.ClientForTest(t, etcdhelper.TmpNamespace(t))
pfx := NewTypedPrefix[testUser]("my/prefix", serde.NewJSON(serde.NoValidation))
Expand Down Expand Up @@ -199,12 +218,12 @@ func TestMirror_WithOnUpdate(t *testing.T) {
func TestMirrorMap_WithOnChanges(t *testing.T) {
t.Parallel()

wg := &sync.WaitGroup{}
defer wg.Wait()

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

wg := &sync.WaitGroup{}
defer wg.Wait()

// Create a typed prefix with some keys
client := etcdhelper.ClientForTest(t, etcdhelper.TmpNamespace(t))
pfx := NewTypedPrefix[testUser]("my/prefix", serde.NewJSON(serde.NoValidation))
Expand Down
36 changes: 36 additions & 0 deletions internal/pkg/service/common/etcdop/watch_mirror_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type MirrorTree[T any, V any] struct {
onUpdate []func(update MirrorUpdate)
onChanges []func(changes MirrorUpdateChanges[string, V])

updatedLock sync.RWMutex
updated chan struct{}

tree *prefixtree.AtomicTree[V]
revisionLock sync.RWMutex
revision int64
Expand Down Expand Up @@ -78,6 +81,7 @@ func (s MirrorTreeSetup[T, V]) BuildMirror() *MirrorTree[T, V] {
tree: prefixtree.New[V](),
onUpdate: s.onUpdate,
onChanges: s.onChanges,
updated: make(chan struct{}),
}
}

Expand Down Expand Up @@ -147,7 +151,14 @@ func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGrou
m.revisionLock.Lock()
m.revision = header.Revision
m.revisionLock.Unlock()

logger.Debugf(ctx, `watch stream mirror synced to revision %d`, header.Revision)

// Unblock WaitForRevision loops
m.updatedLock.Lock()
close(m.updated)
m.updated = make(chan struct{})
m.updatedLock.Unlock()
})

// Call callbacks
Expand Down Expand Up @@ -195,6 +206,31 @@ func (m *MirrorTree[T, V]) Revision() int64 {
return m.revision
}

func (m *MirrorTree[T, V]) WaitForRevision(ctx context.Context, expected int64) error {
for {
m.revisionLock.RLock()
actual := m.revision
m.revisionLock.RUnlock()

// Is the condition already met?
if actual >= expected {
return nil
}

// Get update notifier
m.updatedLock.RLock()
notifier := m.updated
m.updatedLock.RUnlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-notifier:
// try again
}
}
}

func (m *MirrorTree[T, V]) Atomic(do func(t prefixtree.TreeReadOnly[V])) {
m.tree.AtomicReadOnly(do)
}
Expand Down
Loading

0 comments on commit 406df2c

Please sign in to comment.