Skip to content

Commit

Permalink
feat: implement entity types and controller
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Stewart <[email protected]>
  • Loading branch information
paralin committed Nov 7, 2018
1 parent e6fcb72 commit 28e48e4
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 8 deletions.
68 changes: 62 additions & 6 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package entitygraph_controller

import (
"context"
"sync"

"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/controller"
"github.com/aperturerobotics/controllerbus/directive"

"github.com/aperturerobotics/entitygraph"
"github.com/aperturerobotics/entitygraph/entity"
"github.com/aperturerobotics/entitygraph/store"

"github.com/blang/semver"
"github.com/sirupsen/logrus"
)
Expand All @@ -26,6 +32,16 @@ type Controller struct {
bus bus.Bus
// conf is the configuration
conf *Config
// store is the store
store *store.Store

// mtx guards entities
mtx sync.Mutex
// entities is the map of known entities
// we have to store it here so we can emit when the directive is first made
entities map[store.EntityMapKey]entity.Entity
// observer is the map of entity observers
observers map[*entityObserver]func(val entity.Entity)
}

// NewController constructs a new entity graph controller.
Expand All @@ -34,11 +50,15 @@ func NewController(
bus bus.Bus,
conf *Config,
) *Controller {
return &Controller{
le: le,
bus: bus,
conf: conf,
c := &Controller{
le: le,
bus: bus,
conf: conf,
entities: make(map[store.EntityMapKey]entity.Entity),
observers: make(map[*entityObserver]func(val entity.Entity)),
}
c.store = store.NewStore(newStoreHandler(c))
return c
}

// GetControllerInfo returns information about the controller.
Expand All @@ -58,24 +78,60 @@ func (c *Controller) HandleDirective(
ctx context.Context,
inst directive.Instance,
) (directive.Resolver, error) {
dir := inst.GetDirective()
if d, ok := dir.(entitygraph.CollectEntityGraph); ok {
return c.resolveCollectEntityGraph(ctx, inst, d)
}

return nil, nil
}

// resolveCollectEntityGraph resolves a CollectEntityGraph directive
func (c *Controller) resolveCollectEntityGraph(
ctx context.Context,
inst directive.Instance,
d entitygraph.CollectEntityGraph,
) (directive.Resolver, error) {
return newEntityObserver(c), nil
}

// registerObserver registers an observer and returns the initial set
func (c *Controller) registerObserver(obs *entityObserver, cb func(val entity.Entity)) []entity.Entity {
c.mtx.Lock()
defer c.mtx.Unlock()

c.observers[obs] = cb
initialSet := make([]entity.Entity, len(c.entities))
i := 0
for _, ent := range c.entities {
initialSet[i] = ent
i++
}
return initialSet
}

// clearObserver removes an observer
func (c *Controller) clearObserver(obs *entityObserver) {
c.mtx.Lock()
defer c.mtx.Unlock()

delete(c.observers, obs)
}

// Execute executes the given controller.
// Returning nil ends execution.
// Returning an error triggers a retry with backoff.
func (c *Controller) Execute(ctx context.Context) error {
// Register collect entity graph directive
di, diRef, err := c.bus.AddDirective(
NewCollectEntityGraphDirective(),
newReferenceHandler(c),
newReferenceHandler(c, store),
)
if err != nil {
return err
}
defer diRef.Release()

// TODO
_ = di
c.le.Info("entitygraph aggregation controller running")
<-ctx.Done()
Expand Down
73 changes: 73 additions & 0 deletions controller/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package entitygraph_controller

import (
"context"
"sync"

"github.com/aperturerobotics/entitygraph/entity"

"github.com/aperturerobotics/controllerbus/directive"
)

// entityObserver is an entity observer.
type entityObserver struct {
// c is the controller
c *Controller
}

// newEntityObserver constructs a new entityObserver
func newEntityObserver(c *Controller) *entityObserver {
return &entityObserver{c: c}
}

// Resolve resolves the values, emitting them to the handler.
// The resolver may be canceled and restarted multiple times.
// Any fatal error resolving the value is returned.
// The resolver will not be retried after returning an error.
// Values will be maintained from the previous call.
func (e *entityObserver) Resolve(ctx context.Context, handler directive.ResolverHandler) error {
// Register the observer with the system and get the initial set
var valueIDS []uint32
var valueMtx sync.Mutex
var disposed bool
addVal := func(ent entity.Entity) {
if disposed {
return
}

id, ok := handler.AddValue(ent)
if ok {
valueIDS = append(valueIDS, id)
}
}

valueMtx.Lock()
initialSet := e.c.registerObserver(e, func(ent entity.Entity) {
valueMtx.Lock()
defer valueMtx.Unlock()
addVal(ent)
})
for _, is := range initialSet {
addVal(is)
}
valueMtx.Unlock()

defer func() {
valueMtx.Lock()
disposed = true
vids := valueIDS
valueIDS = nil
valueMtx.Unlock()

e.c.clearObserver(e)
for _, valID := range vids {
_, _ = handler.RemoveValue(valID)
}
}()

<-ctx.Err()
return nil
}

// _ is a type assertion
var _ directive.Resolver = ((*entityObserver)(nil))
23 changes: 21 additions & 2 deletions controller/ref_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,55 @@ package entitygraph_controller

import (
"github.com/aperturerobotics/controllerbus/directive"
"github.com/aperturerobotics/entitygraph/entity"
"github.com/aperturerobotics/entitygraph/store"
)

// referenceHandler handles collect events.
type referenceHandler struct {
// c is the controller
c *Controller
// store is the store
store *store.Store
}

// newReferenceHandler constructs a new referenceHandler.
func newReferenceHandler(c *Controller) *referenceHandler {
return &referenceHandler{c: c}
func newReferenceHandler(c *Controller, store *store.Store) *referenceHandler {
return &referenceHandler{c: c, store: store}
}

// HandleValueAdded is called when a value is added to the directive.
func (r *referenceHandler) HandleValueAdded(
inst directive.Instance,
val directive.Value,
) {
valEnt, valEntOk := val.(entity.Entity)
if !valEntOk {
r.c.le.Warn("ignoring non-entity directive value added")
return
}

r.store.AddEntityObj(valEnt)
}

// HandleValueRemoved is called when a value is removed from the directive.
func (r *referenceHandler) HandleValueRemoved(
inst directive.Instance,
val directive.Value,
) {
valEnt, valEntOk := val.(entity.Entity)
if !valEntOk {
r.c.le.Warn("ignoring non-entity directive value removed")
return
}

r.store.RemoveEntityObject(valEnt)
}

// HandleInstanceDisposed is called when a directive instance is disposed.
// This will occur if Close() is called on the directive instance.
func (r *referenceHandler) HandleInstanceDisposed(inst directive.Instance) {
// noop
}

var _ directive.ReferenceHandler = ((*referenceHandler)(nil))
30 changes: 30 additions & 0 deletions controller/store_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package entitygraph_controller

import (
"github.com/aperturerobotics/entitygraph/entity"
"github.com/aperturerobotics/entitygraph/store"
)

// storeHandler handles collect events.
type storeHandler struct {
// c is the controller
c *Controller
}

// newStoreHandler constructs a new storeHandler.
func newStoreHandler(c *Controller) *storeHandler {
return &storeHandler{c: c}
}

// HandleEntityAdded handles a new entity being added to the store.
func (h *storeHandler) HandleEntityAdded(ent entity.Entity) {
// TODO: emit to directive
}

// HandleEntityRemoved handles a entity being removed from the store.
func (h *storeHandler) HandleEntityRemoved(ent entity.Entity) {
// TODO: emit to directive
}

// _ is a type assertion
var _ store.Handler = ((*storeHandler)(nil))
10 changes: 10 additions & 0 deletions directive.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,13 @@ type CollectEntityGraph interface {
// noop
CollectEntityGraphDirective()
}

// ObserveEntityGraph is a directive to observe the entity graph.
type ObserveEntityGraph interface {
// Directive indicates CollectEntityGraph is a directive.
directive.Directive

// ObserveEntityGraphDirective is a marker function.
// noop
ObserveEntityGraphDirective()
}
46 changes: 46 additions & 0 deletions directive_obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package entitygraph

import (
"github.com/aperturerobotics/controllerbus/directive"
)

// observeEntityGraph implements ObserveEntityGraph
type observeEntityGraph struct{}

// ObserveEntityGraphDirective is a marker function.
func (g *observeEntityGraph) ObserveEntityGraphDirective() {
// noop
}

// NewObserveEntityGraph constructs a new ObserveEntityGraph directive.
func NewObserveEntityGraph() ObserveEntityGraph {
return &observeEntityGraph{}
}

// Validate validates the directive.
// This is a cursory validation to see if the values "look correct."
func (g *observeEntityGraph) Validate() error {
return nil
}

// GetValueOptions returns options relating to value handling.
func (g *observeEntityGraph) GetValueOptions() directive.ValueOptions {
return directive.ValueOptions{}
}

// IsEquivalent checks if the other directive is equivalent. If two
// directives are equivalent, and the new directive does not superceed the
// old, then the new directive will be merged (de-duplicated) into the old.
func (g *observeEntityGraph) IsEquivalent(other directive.Directive) bool {
_, ok := other.(ObserveEntityGraph)
return ok
}

// Superceeds checks if the directive overrides another.
// The other directive will be canceled if superceded.
func (g *observeEntityGraph) Superceeds(other directive.Directive) bool {
return false
}

// _ is a type assertion
var _ ObserveEntityGraph = ((*observeEntityGraph)(nil))
9 changes: 9 additions & 0 deletions entity/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package entity

// Entity is a string-ID identified node in the entity graph.
type Entity interface {
// GetEntityID returns the entity identifier.
GetEntityID() string
// GetEntityTypeName returns the entity type name.
GetEntityTypeName() string
}
Empty file added entity/entityref.proto
Empty file.
38 changes: 38 additions & 0 deletions entity/entityref_obj.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package entity

// Ref is a reference to another entity.
type Ref interface {
// GetEntityRefId returns the referenced entity ID.
GetEntityRefId() string
// GetEntityRefTypeName returns the referenced entity type name.
GetEntityRefTypeName() string
}

// entityRef implements Ref
type entityRef struct {
refID string
refTypeName string
}

// NewEntityRef builds a new EntityRef with an entity handle.
func NewEntityRef(ent Entity) Ref {
return NewEntityRefWithID(ent.GetEntityID(), ent.GetEntityTypeName())
}

// NewEntityRefWithID builds a new EntityRef with an id and type name.
func NewEntityRefWithID(refID, refTypeName string) Ref {
return &entityRef{refID: refID, refTypeName: refTypeName}
}

// GetEntityRefId returns the referenced entity ID.
func (e *entityRef) GetEntityRefId() string {
return e.refID
}

// GetEntityRefTypeName returns the referenced entity type name.
func (e *entityRef) GetEntityRefTypeName() string {
return e.refTypeName
}

// _ is a type assertion
var _ Ref = ((*entityRef)(nil))
Loading

0 comments on commit 28e48e4

Please sign in to comment.