Skip to content

Commit

Permalink
refactor(xds): split MeshContext in 3 to be able to improve caching (#…
Browse files Browse the repository at this point in the history
…8182)

* refactor(xds): split MeshContext in 3 to be able to improve caching

Currently MeshContext contains everything. Which means that
whenever something changes on the mesh we pull a lot of things from
the DB.

In practice there are 3 types of resources:

- global resources zone proxies, meshes...
- zone resources that rarely change policies, externalServices ...
- the rest which changes often dataplanes

We now have 3 different things built independently. This will enable us
to:

- Not pull down policies when a dataplane changes if we make things
  event based.
- Change cache invalidation speed depending on the type (Global stuff
  could be cached longer or we could decide to propagate policy changes
less quickly to improve caching).
- Write inspect APIs without getting the full mesh context (only the
  policies are useful).

In its current implementation we're not expecting any strong perf
improvements as we don't use the new context independently

Also use fnv128 instead of sha256. The probability of collision
is low and using a non crypto hash will be quicker

Signed-off-by: Charly Molter <[email protected]>
  • Loading branch information
lahabana authored Nov 21, 2023
1 parent 99bc030 commit b249dab
Show file tree
Hide file tree
Showing 65 changed files with 1,458 additions and 402 deletions.
13 changes: 3 additions & 10 deletions pkg/api-server/api_server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
"github.com/kumahq/kuma/pkg/test"
"github.com/kumahq/kuma/pkg/test/matchers"
test_runtime "github.com/kumahq/kuma/pkg/test/runtime"
test_store "github.com/kumahq/kuma/pkg/test/store"
"github.com/kumahq/kuma/pkg/tokens/builtin"
util_yaml "github.com/kumahq/kuma/pkg/util/yaml"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/server"
)
Expand Down Expand Up @@ -257,7 +257,7 @@ func tryStartApiServer(t *testApiServerConfigurer) (*api_server.ApiServer, kuma_
resManager,
xds_context.NewMeshContextBuilder(
resManager,
server.MeshResourceTypes(server.HashMeshExcludedResources),
server.MeshResourceTypes(),
net.LookupIP,
cfg.Multizone.Zone.Name,
vips.NewPersistence(resManager, config_manager.NewConfigManager(t.store), false),
Expand Down Expand Up @@ -338,14 +338,7 @@ func apiTest(inputResourceFile string, apiServer *api_server.ApiServer, resource
status, err := strconv.Atoi(actions[1])
Expect(err).NotTo(HaveOccurred(), "status is not an int")

rawResources := util_yaml.SplitYAML(string(inputs))
Expect(rawResources).ToNot(BeEmpty())
for i, rawResource := range rawResources {
resource, err := rest.YAML.UnmarshalCore([]byte(rawResource))
Expect(err).ToNot(HaveOccurred())
err = resourceStore.Create(context.Background(), resource, store.CreateByKey(resource.GetMeta().GetName(), resource.GetMeta().GetMesh()))
Expect(err).NotTo(HaveOccurred(), "failed with resource %d %v", i, resource.GetMeta())
}
Expect(test_store.LoadResources(context.Background(), resourceStore, string(inputs))).To(Succeed())

req, err := http.NewRequest("GET", url, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion pkg/api-server/customization/customization_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func createTestApiServer(store store.ResourceStore, config *config_api_server.Ap
resManager,
xds_context.NewMeshContextBuilder(
resManager,
server.MeshResourceTypes(server.HashMeshExcludedResources),
server.MeshResourceTypes(),
net.LookupIP,
cfg.Multizone.Zone.Name,
vips.NewPersistence(resManager, config_manager.NewConfigManager(store), false),
Expand Down
2 changes: 1 addition & 1 deletion pkg/api-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func SetupServer(rt runtime.Runtime) error {
rt.ResourceManager(),
xds_context.NewMeshContextBuilder(
rt.ResourceManager(),
server.MeshResourceTypes(server.HashMeshExcludedResources),
server.MeshResourceTypes(),
net.LookupIP,
cfg.Multizone.Zone.Name,
vips.NewPersistence(rt.ResourceManager(), rt.ConfigManager(), cfg.Experimental.UseTagFirstVirtualOutboundModel),
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func initializeMeshCache(builder *core_runtime.Builder) error {
}
meshContextBuilder := xds_context.NewMeshContextBuilder(
builder.ReadOnlyResourceManager(),
xds_server.MeshResourceTypes(xds_server.HashMeshExcludedResources),
xds_server.MeshResourceTypes(),
builder.LookupIP(),
builder.Config().Multizone.Zone.Name,
vips.NewPersistence(builder.ReadOnlyResourceManager(), builder.ConfigManager(), builder.Config().Experimental.UseTagFirstVirtualOutboundModel),
Expand Down
10 changes: 10 additions & 0 deletions pkg/core/resources/apis/mesh/dataplane_helpers.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package mesh

import (
"hash/fnv"
"net"
"strconv"
"strings"

"google.golang.org/protobuf/proto"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/model"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
)

Expand Down Expand Up @@ -221,3 +223,11 @@ func (d *DataplaneResource) AdminPort(defaultAdminPort uint32) uint32 {
}
return defaultAdminPort
}

func (d *DataplaneResource) Hash() []byte {
hasher := fnv.New128a()
_, _ = hasher.Write(model.HashMeta(d))
_, _ = hasher.Write([]byte(d.Spec.GetNetworking().GetAddress()))
_, _ = hasher.Write([]byte(d.Spec.GetNetworking().GetAdvertisedAddress()))
return hasher.Sum(nil)
}
32 changes: 6 additions & 26 deletions pkg/core/resources/apis/mesh/zone_egress_helpers.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package mesh

import (
"hash/fnv"
"net"
"strconv"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/model"
)

Expand Down Expand Up @@ -45,29 +45,9 @@ func (r *ZoneEgressResource) AdminAddress(defaultAdminPort uint32) string {
return net.JoinHostPort(ip, strconv.FormatUint(uint64(adminPort), 10))
}

func NewZoneEgressOverviews(zoneEgresses ZoneEgressResourceList, insights ZoneEgressInsightResourceList) ZoneEgressOverviewResourceList {
insightsByKey := map[model.ResourceKey]*ZoneEgressInsightResource{}
for _, insight := range insights.Items {
insightsByKey[model.MetaToResourceKey(insight.Meta)] = insight
}

var items []*ZoneEgressOverviewResource
for _, zoneEgress := range zoneEgresses.Items {
overview := ZoneEgressOverviewResource{
Meta: zoneEgress.Meta,
Spec: &mesh_proto.ZoneEgressOverview{
ZoneEgress: zoneEgress.Spec,
ZoneEgressInsight: nil,
},
}
insight, exists := insightsByKey[model.MetaToResourceKey(overview.Meta)]
if exists {
overview.Spec.ZoneEgressInsight = insight.Spec
}
items = append(items, &overview)
}
return ZoneEgressOverviewResourceList{
Pagination: zoneEgresses.Pagination,
Items: items,
}
func (r *ZoneEgressResource) Hash() []byte {
hasher := fnv.New128a()
_, _ = hasher.Write(model.HashMeta(r))
_, _ = hasher.Write([]byte(r.Spec.GetNetworking().GetAddress()))
return hasher.Sum(nil)
}
33 changes: 7 additions & 26 deletions pkg/core/resources/apis/mesh/zone_ingress_helpers.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package mesh

import (
"hash/fnv"
"net"
"strconv"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/model"
)

Expand Down Expand Up @@ -47,29 +47,10 @@ func (r *ZoneIngressResource) AdminAddress(defaultAdminPort uint32) string {
return net.JoinHostPort(ip, strconv.FormatUint(uint64(adminPort), 10))
}

func NewZoneIngressOverviews(zoneIngresses ZoneIngressResourceList, insights ZoneIngressInsightResourceList) ZoneIngressOverviewResourceList {
insightsByKey := map[model.ResourceKey]*ZoneIngressInsightResource{}
for _, insight := range insights.Items {
insightsByKey[model.MetaToResourceKey(insight.Meta)] = insight
}

var items []*ZoneIngressOverviewResource
for _, zoneIngress := range zoneIngresses.Items {
overview := ZoneIngressOverviewResource{
Meta: zoneIngress.Meta,
Spec: &mesh_proto.ZoneIngressOverview{
ZoneIngress: zoneIngress.Spec,
ZoneIngressInsight: nil,
},
}
insight, exists := insightsByKey[model.MetaToResourceKey(overview.Meta)]
if exists {
overview.Spec.ZoneIngressInsight = insight.Spec
}
items = append(items, &overview)
}
return ZoneIngressOverviewResourceList{
Pagination: zoneIngresses.Pagination,
Items: items,
}
func (r *ZoneIngressResource) Hash() []byte {
hasher := fnv.New128a()
_, _ = hasher.Write(model.HashMeta(r))
_, _ = hasher.Write([]byte(r.Spec.GetNetworking().GetAddress()))
_, _ = hasher.Write([]byte(r.Spec.GetNetworking().GetAdvertisedAddress()))
return hasher.Sum(nil)
}
45 changes: 45 additions & 0 deletions pkg/core/resources/model/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package model

import (
"fmt"
"hash/fnv"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -71,6 +72,27 @@ type Resource interface {
Descriptor() ResourceTypeDescriptor
}

type ResourceHasher interface {
Hash() []byte
}

func Hash(resource Resource) []byte {
if r, ok := resource.(ResourceHasher); ok {
return r.Hash()
}
return HashMeta(resource)
}

func HashMeta(r Resource) []byte {
meta := r.GetMeta()
hasher := fnv.New128a()
_, _ = hasher.Write([]byte(r.Descriptor().Name))
_, _ = hasher.Write([]byte(meta.GetMesh()))
_, _ = hasher.Write([]byte(meta.GetName()))
_, _ = hasher.Write([]byte(meta.GetVersion()))
return hasher.Sum(nil)
}

type ResourceValidator interface {
Validate() error
}
Expand Down Expand Up @@ -374,6 +396,29 @@ func ResourceListToResourceKeys(rl ResourceList) []ResourceKey {
return rkey
}

func ResourceListByMesh(rl ResourceList) (map[string]ResourceList, error) {
res := map[string]ResourceList{}
for _, r := range rl.GetItems() {
mrl, ok := res[r.GetMeta().GetMesh()]
if !ok {
mrl = r.Descriptor().NewList()
res[r.GetMeta().GetMesh()] = mrl
}
if err := mrl.AddItem(r); err != nil {
return nil, err
}
}
return res, nil
}

func ResourceListHash(rl ResourceList) []byte {
hasher := fnv.New128()
for _, entity := range rl.GetItems() {
_, _ = hasher.Write(Hash(entity))
}
return hasher.Sum(nil)
}

type ResourceList interface {
GetItemType() ResourceType
GetItems() []Resource
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugins/policies/core/matchers/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ func MatchedPolicies(rType core_model.ResourceType, dpp *core_mesh.DataplaneReso

fr, err := core_rules.BuildFromRules(matchedPoliciesByInbound)
if err != nil {
return core_xds.TypedMatchingPolicies{}, err
warnings = append(warnings, fmt.Sprintf("couldn't create From rules: %s", err.Error()))
}

tr, err := core_rules.BuildToRules(dpPolicies, resources.ListOrEmpty(meshhttproute_api.MeshHTTPRouteType).GetItems())
if err != nil {
return core_xds.TypedMatchingPolicies{}, err
warnings = append(warnings, fmt.Sprintf("couldn't create To rules: %s", err.Error()))
}

sr, err := core_rules.BuildSingleItemRules(dpPolicies)
if err != nil {
return core_xds.TypedMatchingPolicies{}, err
warnings = append(warnings, fmt.Sprintf("couldn't create top level rules: %s", err.Error()))
}

return core_xds.TypedMatchingPolicies{
Expand Down
5 changes: 5 additions & 0 deletions pkg/plugins/resources/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/resources/store"
Expand Down Expand Up @@ -102,6 +104,9 @@ func (c *memoryStore) Create(_ context.Context, r core_model.Resource, fs ...sto

opts := store.NewCreateOptions(fs...)
// Name must be provided via CreateOptions
if opts.Name == "" && opts.Mesh == "" {
return errors.New("you must pass store.CreateBy or store.CreateByKey as a parameter")
}
if _, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh); record != nil {
return store.ErrorResourceAlreadyExists(r.Descriptor().Name, opts.Name, opts.Mesh)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/runtime/gateway/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func MakeGeneratorContext(rt runtime.Runtime, key core_model.ResourceKey) (*xds_

meshCtxBuilder := xds_context.NewMeshContextBuilder(
rt.ReadOnlyResourceManager(),
server.MeshResourceTypes(server.HashMeshExcludedResources),
server.MeshResourceTypes(),
rt.LookupIP(),
rt.Config().Multizone.Zone.Name,
vips.NewPersistence(rt.ReadOnlyResourceManager(), rt.ConfigManager(), false),
Expand Down
4 changes: 3 additions & 1 deletion pkg/test/ginkgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func EntriesForFolder(folder string) []ginkgo.TableEntry {
var entries []ginkgo.TableEntry
testDir := path.Join("testdata", folder)
files, err := os.ReadDir(testDir)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
if err != nil {
panic(err)
}
for _, f := range files {
if !f.IsDir() && strings.HasSuffix(f.Name(), ".input.yaml") {
input := path.Join(testDir, f.Name())
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func newResourceManager(builder *core_runtime.Builder) core_manager.Customizable
func initializeMeshCache(builder *core_runtime.Builder) error {
meshContextBuilder := xds_context.NewMeshContextBuilder(
builder.ReadOnlyResourceManager(),
xds_server.MeshResourceTypes(xds_server.HashMeshExcludedResources),
xds_server.MeshResourceTypes(),
builder.LookupIP(),
builder.Config().Multizone.Zone.Name,
vips.NewPersistence(builder.ReadOnlyResourceManager(), builder.ConfigManager(), builder.Config().Experimental.UseTagFirstVirtualOutboundModel),
Expand Down
49 changes: 49 additions & 0 deletions pkg/test/store/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package store

import (
"context"
"os"

"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/core/resources/model/rest"
"github.com/kumahq/kuma/pkg/core/resources/store"
util_yaml "github.com/kumahq/kuma/pkg/util/yaml"
)

func LoadResourcesFromFile(ctx context.Context, rs store.ResourceStore, fileName string) error {
d, err := os.ReadFile(fileName)
if err != nil {
return err
}
return LoadResources(ctx, rs, string(d))
}

func LoadResources(ctx context.Context, rs store.ResourceStore, inputs string) error {
rawResources := util_yaml.SplitYAML(inputs)
for i, rawResource := range rawResources {
resource, err := rest.YAML.UnmarshalCore([]byte(rawResource))
if err != nil {
return errors.Wrapf(err, "failed to parse yaml %d", i)
}
curResource := resource.Descriptor().NewObject()
create := false
if err := rs.Get(ctx, curResource, store.GetByKey(resource.GetMeta().GetName(), resource.GetMeta().GetMesh())); err != nil {
if !store.IsResourceNotFound(err) {
return err
}
create = true
}

if create {
err = rs.Create(ctx, resource, store.CreateByKey(resource.GetMeta().GetName(), resource.GetMeta().GetMesh()))
} else {
_ = curResource.SetSpec(resource.GetSpec())
err = rs.Update(ctx, curResource)
}
if err != nil {
return errors.Wrapf(err, "failed with entry %d meta: %s", i, resource.GetMeta())
}
}
return nil
}
Loading

0 comments on commit b249dab

Please sign in to comment.