diff --git a/controller/controller.go b/controller/controller.go index b71ad2f..e270b54 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -2,10 +2,16 @@ package entitygraph_controller import ( "context" + "sync" "github.com/aperturerobotics/controllerbus/bus" "github.com/aperturerobotics/controllerbus/controller" "github.com/aperturerobotics/controllerbus/directive" + + "github.com/aperturerobotics/entitygraph" + "github.com/aperturerobotics/entitygraph/entity" + "github.com/aperturerobotics/entitygraph/store" + "github.com/blang/semver" "github.com/sirupsen/logrus" ) @@ -26,6 +32,16 @@ type Controller struct { bus bus.Bus // conf is the configuration conf *Config + // store is the store + store *store.Store + + // mtx guards entities + mtx sync.Mutex + // entities is the map of known entities + // we have to store it here so we can emit when the directive is first made + entities map[store.EntityMapKey]entity.Entity + // observer is the map of entity observers + observers map[*entityObserver]func(val entity.Entity) } // NewController constructs a new entity graph controller. @@ -34,11 +50,15 @@ func NewController( bus bus.Bus, conf *Config, ) *Controller { - return &Controller{ - le: le, - bus: bus, - conf: conf, + c := &Controller{ + le: le, + bus: bus, + conf: conf, + entities: make(map[store.EntityMapKey]entity.Entity), + observers: make(map[*entityObserver]func(val entity.Entity)), } + c.store = store.NewStore(newStoreHandler(c)) + return c } // GetControllerInfo returns information about the controller. @@ -58,9 +78,46 @@ func (c *Controller) HandleDirective( ctx context.Context, inst directive.Instance, ) (directive.Resolver, error) { + dir := inst.GetDirective() + if d, ok := dir.(entitygraph.CollectEntityGraph); ok { + return c.resolveCollectEntityGraph(ctx, inst, d) + } + return nil, nil } +// resolveCollectEntityGraph resolves a CollectEntityGraph directive +func (c *Controller) resolveCollectEntityGraph( + ctx context.Context, + inst directive.Instance, + d entitygraph.CollectEntityGraph, +) (directive.Resolver, error) { + return newEntityObserver(c), nil +} + +// registerObserver registers an observer and returns the initial set +func (c *Controller) registerObserver(obs *entityObserver, cb func(val entity.Entity)) []entity.Entity { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.observers[obs] = cb + initialSet := make([]entity.Entity, len(c.entities)) + i := 0 + for _, ent := range c.entities { + initialSet[i] = ent + i++ + } + return initialSet +} + +// clearObserver removes an observer +func (c *Controller) clearObserver(obs *entityObserver) { + c.mtx.Lock() + defer c.mtx.Unlock() + + delete(c.observers, obs) +} + // Execute executes the given controller. // Returning nil ends execution. // Returning an error triggers a retry with backoff. @@ -68,14 +125,13 @@ func (c *Controller) Execute(ctx context.Context) error { // Register collect entity graph directive di, diRef, err := c.bus.AddDirective( NewCollectEntityGraphDirective(), - newReferenceHandler(c), + newReferenceHandler(c, store), ) if err != nil { return err } defer diRef.Release() - // TODO _ = di c.le.Info("entitygraph aggregation controller running") <-ctx.Done() diff --git a/controller/observer.go b/controller/observer.go new file mode 100644 index 0000000..93f45ee --- /dev/null +++ b/controller/observer.go @@ -0,0 +1,73 @@ +package entitygraph_controller + +import ( + "context" + "sync" + + "github.com/aperturerobotics/entitygraph/entity" + + "github.com/aperturerobotics/controllerbus/directive" +) + +// entityObserver is an entity observer. +type entityObserver struct { + // c is the controller + c *Controller +} + +// newEntityObserver constructs a new entityObserver +func newEntityObserver(c *Controller) *entityObserver { + return &entityObserver{c: c} +} + +// Resolve resolves the values, emitting them to the handler. +// The resolver may be canceled and restarted multiple times. +// Any fatal error resolving the value is returned. +// The resolver will not be retried after returning an error. +// Values will be maintained from the previous call. +func (e *entityObserver) Resolve(ctx context.Context, handler directive.ResolverHandler) error { + // Register the observer with the system and get the initial set + var valueIDS []uint32 + var valueMtx sync.Mutex + var disposed bool + addVal := func(ent entity.Entity) { + if disposed { + return + } + + id, ok := handler.AddValue(ent) + if ok { + valueIDS = append(valueIDS, id) + } + } + + valueMtx.Lock() + initialSet := e.c.registerObserver(e, func(ent entity.Entity) { + valueMtx.Lock() + defer valueMtx.Unlock() + addVal(ent) + }) + for _, is := range initialSet { + addVal(is) + } + valueMtx.Unlock() + + defer func() { + valueMtx.Lock() + disposed = true + vids := valueIDS + valueIDS = nil + valueMtx.Unlock() + + e.c.clearObserver(e) + for _, valID := range vids { + _, _ = handler.RemoveValue(valID) + } + }() + + <-ctx.Err() + return nil +} + +// _ is a type assertion +var _ directive.Resolver = ((*entityObserver)(nil)) diff --git a/controller/ref_handler.go b/controller/ref_handler.go index fbc0589..d7553f6 100644 --- a/controller/ref_handler.go +++ b/controller/ref_handler.go @@ -2,17 +2,21 @@ package entitygraph_controller import ( "github.com/aperturerobotics/controllerbus/directive" + "github.com/aperturerobotics/entitygraph/entity" + "github.com/aperturerobotics/entitygraph/store" ) // referenceHandler handles collect events. type referenceHandler struct { // c is the controller c *Controller + // store is the store + store *store.Store } // newReferenceHandler constructs a new referenceHandler. -func newReferenceHandler(c *Controller) *referenceHandler { - return &referenceHandler{c: c} +func newReferenceHandler(c *Controller, store *store.Store) *referenceHandler { + return &referenceHandler{c: c, store: store} } // HandleValueAdded is called when a value is added to the directive. @@ -20,6 +24,13 @@ func (r *referenceHandler) HandleValueAdded( inst directive.Instance, val directive.Value, ) { + valEnt, valEntOk := val.(entity.Entity) + if !valEntOk { + r.c.le.Warn("ignoring non-entity directive value added") + return + } + + r.store.AddEntityObj(valEnt) } // HandleValueRemoved is called when a value is removed from the directive. @@ -27,11 +38,19 @@ func (r *referenceHandler) HandleValueRemoved( inst directive.Instance, val directive.Value, ) { + valEnt, valEntOk := val.(entity.Entity) + if !valEntOk { + r.c.le.Warn("ignoring non-entity directive value removed") + return + } + + r.store.RemoveEntityObject(valEnt) } // HandleInstanceDisposed is called when a directive instance is disposed. // This will occur if Close() is called on the directive instance. func (r *referenceHandler) HandleInstanceDisposed(inst directive.Instance) { + // noop } var _ directive.ReferenceHandler = ((*referenceHandler)(nil)) diff --git a/controller/store_handler.go b/controller/store_handler.go new file mode 100644 index 0000000..333363a --- /dev/null +++ b/controller/store_handler.go @@ -0,0 +1,30 @@ +package entitygraph_controller + +import ( + "github.com/aperturerobotics/entitygraph/entity" + "github.com/aperturerobotics/entitygraph/store" +) + +// storeHandler handles collect events. +type storeHandler struct { + // c is the controller + c *Controller +} + +// newStoreHandler constructs a new storeHandler. +func newStoreHandler(c *Controller) *storeHandler { + return &storeHandler{c: c} +} + +// HandleEntityAdded handles a new entity being added to the store. +func (h *storeHandler) HandleEntityAdded(ent entity.Entity) { + // TODO: emit to directive +} + +// HandleEntityRemoved handles a entity being removed from the store. +func (h *storeHandler) HandleEntityRemoved(ent entity.Entity) { + // TODO: emit to directive +} + +// _ is a type assertion +var _ store.Handler = ((*storeHandler)(nil)) diff --git a/directive.go b/directive.go index e7bd225..d31c37a 100644 --- a/directive.go +++ b/directive.go @@ -13,3 +13,13 @@ type CollectEntityGraph interface { // noop CollectEntityGraphDirective() } + +// ObserveEntityGraph is a directive to observe the entity graph. +type ObserveEntityGraph interface { + // Directive indicates CollectEntityGraph is a directive. + directive.Directive + + // ObserveEntityGraphDirective is a marker function. + // noop + ObserveEntityGraphDirective() +} diff --git a/directive_obs.go b/directive_obs.go new file mode 100644 index 0000000..9b9c792 --- /dev/null +++ b/directive_obs.go @@ -0,0 +1,46 @@ +package entitygraph + +import ( + "github.com/aperturerobotics/controllerbus/directive" +) + +// observeEntityGraph implements ObserveEntityGraph +type observeEntityGraph struct{} + +// ObserveEntityGraphDirective is a marker function. +func (g *observeEntityGraph) ObserveEntityGraphDirective() { + // noop +} + +// NewObserveEntityGraph constructs a new ObserveEntityGraph directive. +func NewObserveEntityGraph() ObserveEntityGraph { + return &observeEntityGraph{} +} + +// Validate validates the directive. +// This is a cursory validation to see if the values "look correct." +func (g *observeEntityGraph) Validate() error { + return nil +} + +// GetValueOptions returns options relating to value handling. +func (g *observeEntityGraph) GetValueOptions() directive.ValueOptions { + return directive.ValueOptions{} +} + +// IsEquivalent checks if the other directive is equivalent. If two +// directives are equivalent, and the new directive does not superceed the +// old, then the new directive will be merged (de-duplicated) into the old. +func (g *observeEntityGraph) IsEquivalent(other directive.Directive) bool { + _, ok := other.(ObserveEntityGraph) + return ok +} + +// Superceeds checks if the directive overrides another. +// The other directive will be canceled if superceded. +func (g *observeEntityGraph) Superceeds(other directive.Directive) bool { + return false +} + +// _ is a type assertion +var _ ObserveEntityGraph = ((*observeEntityGraph)(nil)) diff --git a/entity/entity.go b/entity/entity.go new file mode 100644 index 0000000..e721989 --- /dev/null +++ b/entity/entity.go @@ -0,0 +1,9 @@ +package entity + +// Entity is a string-ID identified node in the entity graph. +type Entity interface { + // GetEntityID returns the entity identifier. + GetEntityID() string + // GetEntityTypeName returns the entity type name. + GetEntityTypeName() string +} diff --git a/entity/entityref.proto b/entity/entityref.proto new file mode 100644 index 0000000..e69de29 diff --git a/entity/entityref_obj.go b/entity/entityref_obj.go new file mode 100644 index 0000000..0f77d39 --- /dev/null +++ b/entity/entityref_obj.go @@ -0,0 +1,38 @@ +package entity + +// Ref is a reference to another entity. +type Ref interface { + // GetEntityRefId returns the referenced entity ID. + GetEntityRefId() string + // GetEntityRefTypeName returns the referenced entity type name. + GetEntityRefTypeName() string +} + +// entityRef implements Ref +type entityRef struct { + refID string + refTypeName string +} + +// NewEntityRef builds a new EntityRef with an entity handle. +func NewEntityRef(ent Entity) Ref { + return NewEntityRefWithID(ent.GetEntityID(), ent.GetEntityTypeName()) +} + +// NewEntityRefWithID builds a new EntityRef with an id and type name. +func NewEntityRefWithID(refID, refTypeName string) Ref { + return &entityRef{refID: refID, refTypeName: refTypeName} +} + +// GetEntityRefId returns the referenced entity ID. +func (e *entityRef) GetEntityRefId() string { + return e.refID +} + +// GetEntityRefTypeName returns the referenced entity type name. +func (e *entityRef) GetEntityRefTypeName() string { + return e.refTypeName +} + +// _ is a type assertion +var _ Ref = ((*entityRef)(nil)) diff --git a/link/link.go b/link/link.go new file mode 100644 index 0000000..a847404 --- /dev/null +++ b/link/link.go @@ -0,0 +1,14 @@ +package link + +import "github.com/aperturerobotics/entitygraph/entity" + +// Link is an edge/relationship between two nodes. +type Link interface { + // Entity indicates that Link is a subclass of Entity. + entity.Entity + + // GetEdgeFrom returns the reference to the entity this link starts at. + GetEdgeFrom() entity.Ref + // GetEdgeTo returns the reference to the entity this link ends at. + GetEdgeTo() entity.Ref +} diff --git a/store/store.go b/store/store.go new file mode 100644 index 0000000..8425072 --- /dev/null +++ b/store/store.go @@ -0,0 +1,86 @@ +package store + +import ( + "sync" + + "github.com/aperturerobotics/entitygraph/entity" +) + +// Handler handles store events. +type Handler interface { + // HandleEntityAdded handles a new entity being added to the store. + HandleEntityAdded(ent entity.Entity) + // HandleEntityRemoved handles a entity being removed from the store. + HandleEntityRemoved(ent entity.Entity) +} + +// Store tracks, refcounts, and de-duplicates known entities. +type Store struct { + // mtx guards the store + mtx sync.Mutex + // entities is the entities map, keyed by ID + entities map[EntityMapKey]*storeEntity + // handler is the store handler + handler Handler +} + +// NewStore constructs a new Store. +func NewStore(handler Handler) *Store { + return &Store{ + entities: make(map[EntityMapKey]*storeEntity), + handler: handler, + } +} + +// AddEntityObj adds an entity object to the store. +func (s *Store) AddEntityObj(ent entity.Entity) { + key := newEntityMapKey(ent) + s.mtx.Lock() + defer s.mtx.Unlock() + + sent := s.entities[key] + if sent == nil { + sent = newStoreEntity(ent) + s.entities[key] = sent + s.emitStoreEntityAdd(sent) + return + } + + sent.PushEntity(ent) +} + +// RemoveEntityObj removes an entity object from the store. +// Returns if found. +func (s *Store) RemoveEntityObj(ent entity.Entity) bool { + key := newEntityMapKey(ent) + s.mtx.Lock() + defer s.mtx.Unlock() + + sent := s.entities[key] + if sent == nil { + return false + } + + f := sent.DelEntity(ent) + if len(sent.Values) == 0 { + delete(s.entities, key) + s.emitStoreEntityRm(sent) + } + return f +} + +// emitStoreEntityAdd emits a new store entity when added. +// mtx is locked +func (s *Store) emitStoreEntityAdd(storeEnt *storeEntity) { + if s.handler != nil { + s.handler.HandleEntityAdded(storeEnt) + } +} + +// emitStoreEntityRm emits a old store entity when removed. +// mtx is locked +func (s *Store) emitStoreEntityRm(storeEnt *storeEntity) { + if s.handler != nil { + s.handler.HandleEntityRemoved(storeEnt) + } +} diff --git a/store/store_entity.go b/store/store_entity.go new file mode 100644 index 0000000..5e9a5ea --- /dev/null +++ b/store/store_entity.go @@ -0,0 +1,78 @@ +package store + +import "github.com/aperturerobotics/entitygraph/entity" + +// EntityMapKey is the tuple key for the entities map +type EntityMapKey struct { + id string + typeName string +} + +// newEntityMapKey constructs a new EntityMapKey from an Entity +func newEntityMapKey(ent entity.Entity) EntityMapKey { + return EntityMapKey{ + id: ent.GetEntityID(), + typeName: ent.GetEntityTypeName(), + } +} + +// storeEntity is an entity in the store. +type storeEntity struct { + Key EntityMapKey + Values []storeEntityValue +} + +// newStoreEntity constructs a new storeEntity from an Entity. +func newStoreEntity(ent entity.Entity) *storeEntity { + return &storeEntity{ + Key: newEntityMapKey(ent), + Values: []storeEntityValue{ + newStoreEntityValue(ent), + }, + } +} + +// PushEntity pushes an entity to the store value. +func (s *storeEntity) PushEntity(ent entity.Entity) { + val := newStoreEntityValue(ent) + s.Values = append(s.Values, val) +} + +// DelEntity removes an entity from the store value. +// Returns if found. +func (s *storeEntity) DelEntity(ent entity.Entity) bool { + for i := range s.Values { + if s.Values[i].Entity == ent { + s.Values[i] = s.Values[len(s.Values)-1] + s.Values[len(s.Values)-1] = nil + s.Values = s.Values[:len(s.Values)-1] + return true + } + } + + return false +} + +// GetEntityID returns the entity id. +func (s *storeEntity) GetEntityID() string { + return s.Key.id +} + +// GetEntityTypeName returns the entity type name. +func (s *storeEntity) GetEntityTypeName() string { + return s.Key.typeName +} + +// _ is a type assertion +var _ entity.Entity = ((*storeEntity)(nil)) + +// storeEntityValue is a value representing an entity. +type storeEntityValue struct { + // Entity is the entity object. + Entity entity.Entity +} + +// newStoreEntityValue constructs a new storeEntityValue +func newStoreEntityValue(ent entity.Entity) storeEntityValue { + return storeEntityValue{Entity: ent} +}