Skip to content

Commit

Permalink
Merge branch 'master' into revert/105
Browse files Browse the repository at this point in the history
  • Loading branch information
neoaggelos committed Jul 5, 2024
2 parents 26a86da + 0a4b791 commit d1a6e1d
Show file tree
Hide file tree
Showing 32 changed files with 726 additions and 654 deletions.
3 changes: 2 additions & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
### Thank you for making MicroK8s better
### Thank you for making K8s-dqlite better

Please reference the issue this PR is fixing, or provide a description of the problem addressed.

*Also verify you have:*

* [ ] Read the [contributions](https://github.com/ubuntu/microk8s/blob/master/CONTRIBUTING.md) page.
* [ ] Submitted the [CLA form](https://ubuntu.com/legal/contributors/agreement), if you are a first time contributor.
5 changes: 3 additions & 2 deletions .github/workflows/trivy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ jobs:
strategy:
matrix:
branch: [master]

permissions:
security-events: write
steps:
- name: Checking out repo
uses: actions/checkout@v4
with:
ref: ${{ matrix.branch }}
- name: Run Trivy vulnerability scanner in repo mode
uses: aquasecurity/trivy-action@master
uses: aquasecurity/trivy-action@0.23.0
with:
scan-type: "fs"
ignore-unfixed: true
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ go.vet:
$(DQLITE_BUILD_SCRIPTS_DIR)/static-go-vet.sh ./...

go.test:
go test -tags=libsqlite3 -v ./test
go test -v -p 1 ./...

go.test.dqlite:
$(DQLITE_BUILD_SCRIPTS_DIR)/static-go-test.sh -v ./test
$(DQLITE_BUILD_SCRIPTS_DIR)/static-go-test.sh -v ./...

go.bench:
go test -tags=libsqlite3 -v ./test -run "^$$" -bench "Benchmark" -benchmem
go test -tags=libsqlite3 -v ./... -run "^$$" -bench "Benchmark" -benchmem

go.bench.dqlite:
$(DQLITE_BUILD_SCRIPTS_DIR)/static-go-test.sh -v ./test -run "^$$" -bench "Benchmark" -benchmem
$(DQLITE_BUILD_SCRIPTS_DIR)/static-go-test.sh -v ./... -run "^$$" -bench "Benchmark" -benchmem

## Static Builds
static: bin/static/k8s-dqlite bin/static/dqlite
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Prepare a directory containing:
- `cluster.crt` and `cluster.key` pair.

Here is a script to build you can use:

```
mkdir -p /var/data/
IP="127.0.0.1"
Expand All @@ -52,6 +53,7 @@ chmod -R o-rwX /var/data
```

You are now ready to start `k8s-dqlite` with:

```
k8s-dqlite --storage-dir=/var/data/
```
Expand All @@ -60,13 +62,15 @@ The `--listen` option allows you to set the endpoint where dqlite should listen
By default `k8s-dqlite` will be listening for connections at `tcp://127.0.0.1:12379`.

The snap package takes care of this installation step so you may want to use the snap you build above:

```
sudo snap install ./k8s-dqlite_latest_amd64.snap --classic --dangerous
```

## Configuring the API server

To point the API server to `k8s-dqlite` use the following arguments:

```
# Endpoint k8s-dqlite listens at
--etcd-servers="http://<IP>:<PORT>"
Expand All @@ -80,6 +84,7 @@ To point the API server to `k8s-dqlite` use the following arguments:
#--etcd-certfile=/var/snap/k8s-dqlite/current/var/data/cluster.crt
#--etcd-keyfile=/var/snap/k8s-dqlite/current/var/data/cluster.key
```

## Highly Available Dqlite

The following steps allows one to setup a highly available dqlite backend.
Expand All @@ -99,6 +104,7 @@ Steps:
- <the ip of the main node>:29001
Address: <ip of the joining node>:29001
```
6. Start `k8s-dqlite` process

```shell
Expand Down Expand Up @@ -157,4 +163,9 @@ Steps:
--listen unix:///var/snap/microk8s/current/var/kubernetes/backend/kine.sock:12379
```

7. While developing and making changes to `k8s-dqlite`, just restart k8s-dqlite
7. While developing and making changes to `k8s-dqlite`, just restart k8s-dqlite

Note: When develop k8s-dqlite against Canonical Kubernetes use the following flags:

- `--storage-dir /var/snap/k8s/common/var/lib/k8s-dqlite`
- `--listen unix:///var/snap/k8s/common/var/lib/k8s-dqlite/k8s-dqlite.sock`
5 changes: 5 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,9 @@ func init() {
// TODO(MK-1408): This value is highly dependend on underlying hardware, thus making the default value a bit useless. The linked card will implement a dynamic way to set this value.
rootCmd.Flags().Int64Var(&rootCmdOpts.acpLimitMaxConcurrentTxn, "admission-control-policy-limit-max-concurrent-transactions", 300, "Maximum number of transactions that are allowed to run concurrently. Transactions will not be admitted after the limit is reached.")
rootCmd.Flags().BoolVar(&rootCmdOpts.acpOnlyWriteQueries, "admission-control-only-for-write-queries", false, "If set, admission control will only be applied to write queries.")

rootCmd.AddCommand(&cobra.Command{
Use: "version",
RunE: func(cmd *cobra.Command, args []string) error { return printVersions() },
})
}
22 changes: 6 additions & 16 deletions cmd/version.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//go:build dqlite

package cmd

import (
"fmt"
"runtime"

"github.com/spf13/cobra"
)

/*
Expand Down Expand Up @@ -48,18 +48,8 @@ void print_dqlite_library_versions() {
*/
import "C"

var (
versionCmd = &cobra.Command{
Use: "version",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Println("go:", runtime.Version())
C.print_dqlite_library_versions()

return nil
},
}
)

func init() {
rootCmd.AddCommand(versionCmd)
func printVersions() error {
fmt.Println("go:", runtime.Version())
C.print_dqlite_library_versions()
return nil
}
9 changes: 9 additions & 0 deletions cmd/version_no_dqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build !dqlite

package cmd

import "fmt"

func printVersions() error {
return fmt.Errorf("dqlite is not supported, compile with \"-tags dqlite\"")
}
63 changes: 30 additions & 33 deletions pkg/kine/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,35 @@ type Broadcaster struct {
subs map[chan interface{}]struct{}
}

func (b *Broadcaster) Subscribe(ctx context.Context, connect ConnectFunc) (<-chan interface{}, error) {
func (b *Broadcaster) Subscribe(ctx context.Context) (<-chan interface{}, error) {
b.Lock()
defer b.Unlock()

if !b.running {
if err := b.start(connect); err != nil {
return nil, err
}
}

sub := make(chan interface{}, 100)
if b.subs == nil {
b.subs = map[chan interface{}]struct{}{}
}
b.subs[sub] = struct{}{}
go func() {
<-ctx.Done()
b.unsub(sub, true)
}()
context.AfterFunc(ctx, func() {
b.Lock()
defer b.Unlock()
b.unsub(sub)
})

return sub, nil
}

func (b *Broadcaster) unsub(sub chan interface{}, lock bool) {
if lock {
b.Lock()
}
func (b *Broadcaster) unsub(sub chan interface{}) {
if _, ok := b.subs[sub]; ok {
close(sub)
delete(b.subs, sub)
}
if lock {
b.Unlock()
}
}

func (b *Broadcaster) start(connect ConnectFunc) error {
func (b *Broadcaster) Start(connect ConnectFunc) error {
b.Lock()
defer b.Unlock()

c, err := connect()
if err != nil {
return err
Expand All @@ -60,24 +52,29 @@ func (b *Broadcaster) start(connect ConnectFunc) error {
return nil
}

func (b *Broadcaster) stream(input chan interface{}) {
for item := range input {
b.Lock()
for sub := range b.subs {
select {
case sub <- item:
default:
// Slow consumer, drop
go b.unsub(sub, true)
}
}
b.Unlock()
func (b *Broadcaster) stream(ch chan interface{}) {
for item := range ch {
b.publish(item)
}

b.Lock()
defer b.Unlock()
for sub := range b.subs {
b.unsub(sub, false)
b.unsub(sub)
}
b.running = false
b.Unlock()
}

func (b *Broadcaster) publish(item interface{}) {
b.Lock()
defer b.Unlock()

for sub := range b.subs {
select {
case sub <- item:
default:
// Slow consumer, drop
b.unsub(sub)
}
}
}
12 changes: 9 additions & 3 deletions pkg/kine/drivers/dqlite/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/driver"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/sqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/canonical/k8s-dqlite/pkg/kine/tls"
Expand All @@ -26,15 +27,20 @@ func init() {
}

func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server.Backend, error) {
backend, _, err := NewVariant(ctx, datasourceName)
return backend, err
}

func NewVariant(ctx context.Context, datasourceName string) (server.Backend, *generic.Generic, error) {
logrus.Printf("New kine for dqlite")

// Driver name will be extracted from query parameters
backend, generic, err := sqlite.NewVariant(ctx, "", datasourceName)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
return nil, nil, errors.Wrap(err, "sqlite client")
}
if err := migrate(ctx, generic.DB); err != nil {
return nil, errors.Wrap(err, "failed to migrate DB from sqlite")
return nil, nil, errors.Wrap(err, "failed to migrate DB from sqlite")
}
generic.LockWrites = true
generic.Retry = func(err error) bool {
Expand Down Expand Up @@ -74,7 +80,7 @@ func New(ctx context.Context, datasourceName string, tlsInfo tls.Config) (server
return err
}

return backend, nil
return backend, generic, nil
}

func migrate(ctx context.Context, newDB *sql.DB) (exitErr error) {
Expand Down
44 changes: 44 additions & 0 deletions pkg/kine/drivers/dqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build dqlite

package drivers_test

import (
"context"
"fmt"
"testing"

"github.com/canonical/go-dqlite/app"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/dqlite"
"github.com/canonical/k8s-dqlite/pkg/kine/drivers/generic"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
)

func TestDqliteCompaction(t *testing.T) {
testCompaction(t, newDqliteBackend)
}

func BenchmarkDqliteCompaction(b *testing.B) {
benchmarkCompaction(b, newDqliteBackend)
}

var (
nextIdx int
)

func newDqliteBackend(ctx context.Context, tb testing.TB) (server.Backend, *generic.Generic, error) {
nextIdx++

dir := tb.TempDir()
app, err := app.New(dir, app.WithAddress(fmt.Sprintf("127.0.0.1:%d", 59090+nextIdx)))
if err != nil {
panic(fmt.Errorf("failed to create dqlite app: %w", err))
}
if err := app.Ready(ctx); err != nil {
panic(fmt.Errorf("failed to initialize dqlite: %w", err))
}
tb.Cleanup(func() {
app.Close()
})

return dqlite.NewVariant(ctx, fmt.Sprintf("dqlite://k8s-%d?driver-name=%s", nextIdx, app.Driver()))
}
Loading

0 comments on commit d1a6e1d

Please sign in to comment.