Skip to content

Commit

Permalink
test: Unit tests code coverage improvement (#1781)
Browse files Browse the repository at this point in the history
Signed-off-by: Samhith Kakarla <[email protected]>
  • Loading branch information
samhith-kakarla authored Jul 12, 2024
1 parent b6ab99f commit 41e7878
Show file tree
Hide file tree
Showing 25 changed files with 1,684 additions and 55 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
7 changes: 4 additions & 3 deletions pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
natsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test"
)

Expand All @@ -44,7 +45,7 @@ func TestJetStreamBufferRead(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestGetName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
defer defaultJetStreamClient.Close()
Expand All @@ -162,7 +163,7 @@ func TestClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
natsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test"

"github.com/numaproj/numaflow/pkg/udf/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down Expand Up @@ -208,7 +210,7 @@ func TestJetStreamBufferWriterBufferFull(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down Expand Up @@ -265,7 +267,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down Expand Up @@ -321,7 +323,7 @@ func TestWriteGetName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand All @@ -347,7 +349,7 @@ func TestWriteClose(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

defaultJetStreamClient := natstest.JetStreamClient(t, s)
defaultJetStreamClient := natsclient.NewTestClientWithServer(t, s)
defer defaultJetStreamClient.Close()
js, err := defaultJetStreamClient.JetStreamContext()
assert.NoError(t, err)
Expand Down
62 changes: 62 additions & 0 deletions pkg/shared/clients/nats/client_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package nats

import (
"context"
"os"
"testing"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/stretchr/testify/assert"
)

func TestNewClientPool_Success(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
pool, err := NewClientPool(ctx)

assert.NoError(t, err)
assert.NotNil(t, pool)
assert.Equal(t, 3, pool.clients.Len()) // Check if the pool size matches the default clientPoolSize
}

func TestClientPool_NextAvailableClient(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
pool, err := NewClientPool(ctx)
assert.NoError(t, err)
assert.NotNil(t, pool)

client1 := pool.NextAvailableClient()
assert.NotNil(t, client1)

client2 := pool.NextAvailableClient()
assert.NotNil(t, client2)

client3 := pool.NextAvailableClient()
assert.NotNil(t, client3)
}

func TestClientPool_CloseAll(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
pool, err := NewClientPool(ctx)
assert.NoError(t, err)
assert.NotNil(t, pool)

for e := pool.clients.Front(); e != nil; e = e.Next() {
client := e.Value.(*Client)
assert.False(t, client.nc.IsClosed())
}

pool.CloseAll()
for e := pool.clients.Front(); e != nil; e = e.Next() {
client := e.Value.(*Client)
assert.True(t, client.nc.IsClosed())
}
}
6 changes: 6 additions & 0 deletions pkg/shared/clients/nats/nats_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"go.uber.org/zap"

Expand Down Expand Up @@ -181,3 +182,8 @@ func NewTestClient(t *testing.T, url string) *Client {
}
return &Client{nc: nc}
}

// JetStreamClient is used to get a testing JetStream client instance
func NewTestClientWithServer(t *testing.T, s *server.Server) *Client {
return NewTestClient(t, s.ClientURL())
}
127 changes: 127 additions & 0 deletions pkg/shared/clients/nats/nats_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package nats

import (
"context"
"os"
"testing"

"github.com/nats-io/nats.go"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test"
)

func TestNewNATSClient(t *testing.T) {
// Setting up environment variables for the test
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
defer os.Clearenv()

log := zap.NewNop().Sugar()

ctx := logging.WithLogger(context.Background(), log)

client, err := NewNATSClient(ctx)
assert.NoError(t, err)
assert.NotNil(t, client)

// Cleanup
client.Close()
}

func TestNewNATSClient_Failure(t *testing.T) {
// Simulating environment variable absence
os.Clearenv()

log := zap.NewNop().Sugar()
ctx := logging.WithLogger(context.Background(), log)

client, err := NewNATSClient(ctx)
assert.Error(t, err)
assert.Nil(t, client)
}

func TestSubscribe(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer s.Shutdown()

client := NewTestClient(t, s.ClientURL())
defer client.Close()

// Create a stream
js, err := client.nc.JetStream()
assert.NoError(t, err)
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST_STREAM",
Subjects: []string{"test.subject"},
})
assert.NoError(t, err)

// Subscribe to a subject
sub, err := client.Subscribe("test.subject", "TEST_STREAM")
assert.NoError(t, err)
assert.NotNil(t, sub)

// Test failure case: Invalid stream
_, err = client.Subscribe("balh", "INVALID_STREAM")
assert.Error(t, err)
}

func TestBindKVStore(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer s.Shutdown()

client := NewTestClient(t, s.ClientURL())
defer client.Close()

// Create a KeyValue store
js, err := client.nc.JetStream()
assert.NoError(t, err)
_, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KV_TEST",
})
assert.NoError(t, err)

// Bind to the KeyValue store
kvStore, err := client.BindKVStore("KV_TEST")
assert.NoError(t, err)
assert.NotNil(t, kvStore)

// Test failure case: Invalid KeyValue store
_, err = client.BindKVStore("INVALID_KV")
assert.Error(t, err)
}

func TestJetStreamContext(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer s.Shutdown()

client := NewTestClient(t, s.ClientURL())
defer client.Close()

jsCtx, err := client.JetStreamContext()
assert.NoError(t, err)
assert.NotNil(t, jsCtx)
}

func TestNewTestClient(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer s.Shutdown()

client := NewTestClient(t, s.ClientURL())
assert.NotNil(t, client)
defer client.Close()
}

func TestClose(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer s.Shutdown()

client := NewTestClient(t, s.ClientURL())
assert.NotNil(t, client)
client.Close()
}
33 changes: 33 additions & 0 deletions pkg/shared/clients/nats/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package nats

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDefaultOptions(t *testing.T) {
opts := defaultOptions()
assert.NotNil(t, opts)
assert.Equal(t, 3, opts.clientPoolSize, "default client pool size should be 3")
}

func TestWithClientPoolSize(t *testing.T) {
opts := defaultOptions()
assert.Equal(t, 3, opts.clientPoolSize, "default client pool size should be 3")

option := WithClientPoolSize(10)
option(opts)

assert.Equal(t, 10, opts.clientPoolSize, "client pool size should be set to 10")
}

func TestCombinedOptions(t *testing.T) {
opts := defaultOptions()
assert.Equal(t, 3, opts.clientPoolSize, "default client pool size should be 3")

option1 := WithClientPoolSize(5)
option1(opts)

assert.Equal(t, 5, opts.clientPoolSize, "client pool size should be set to 5")
}
30 changes: 0 additions & 30 deletions pkg/shared/clients/nats/test/client.go

This file was deleted.

Loading

0 comments on commit 41e7878

Please sign in to comment.