Skip to content

Commit

Permalink
SMQ-2605 - Groups replication with groups events consumer & listing o…
Browse files Browse the repository at this point in the history
…f things and channels (#2639)

Signed-off-by: Arvindh <[email protected]>
  • Loading branch information
arvindh123 authored Jan 20, 2025
1 parent a8b12e4 commit 88d583b
Show file tree
Hide file tree
Showing 66 changed files with 2,643 additions and 1,575 deletions.
1 change: 1 addition & 0 deletions api/http/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
UserKey = "user"
DomainKey = "domain"
ChannelKey = "channel"
ConnTypeKey = "connection_type"
DefPermission = "read_permission"
DefTotal = uint64(100)
DefOffset = 0
Expand Down
108 changes: 78 additions & 30 deletions channels/api/http/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
smqclients "github.com/absmach/supermq/clients"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi/v5"
)
Expand Down Expand Up @@ -51,58 +51,106 @@ func decodeCreateChannelsReq(_ context.Context, r *http.Request) (interface{}, e
}

func decodeListChannels(_ context.Context, r *http.Request) (interface{}, error) {
s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefClientStatus)
name, err := apiutil.ReadStringQuery(r, api.NameKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
o, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)

tag, err := apiutil.ReadStringQuery(r, api.TagKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
l, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit)

s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefGroupStatus)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
m, err := apiutil.ReadMetadataQuery(r, api.MetadataKey, nil)
status, err := clients.ToStatus(s)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
n, err := apiutil.ReadStringQuery(r, api.NameKey, "")

meta, err := apiutil.ReadMetadataQuery(r, api.MetadataKey, nil)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
t, err := apiutil.ReadStringQuery(r, api.TagKey, "")

offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
id, err := apiutil.ReadStringQuery(r, api.IDOrder, "")
limit, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
p, err := apiutil.ReadStringQuery(r, api.PermissionKey, api.DefPermission)

dir, err := apiutil.ReadStringQuery(r, api.DirKey, api.DefDir)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

lp, err := apiutil.ReadBoolQuery(r, api.ListPerms, api.DefListPerms)
order, err := apiutil.ReadStringQuery(r, api.OrderKey, api.DefOrder)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
st, err := smqclients.ToStatus(s)

allActions, err := apiutil.ReadStringQuery(r, api.ActionsKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

actions := []string{}

allActions = strings.TrimSpace(allActions)
if allActions != "" {
actions = strings.Split(allActions, ",")
}
roleID, err := apiutil.ReadStringQuery(r, api.RoleIDKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

roleName, err := apiutil.ReadStringQuery(r, api.RoleNameKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

accessType, err := apiutil.ReadStringQuery(r, api.AccessTypeKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

userID, err := apiutil.ReadStringQuery(r, api.UserKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

groupID, err := apiutil.ReadStringQuery(r, api.GroupKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

clientID, err := apiutil.ReadStringQuery(r, api.ClientKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

req := listChannelsReq{
status: st,
offset: o,
limit: l,
metadata: m,
name: n,
tag: t,
permission: p,
listPerms: lp,
userID: chi.URLParam(r, "userID"),
id: id,
name: name,
tag: tag,
status: status,
metadata: meta,
roleName: roleName,
roleID: roleID,
actions: actions,
accessType: accessType,
order: order,
dir: dir,
offset: offset,
limit: limit,
groupID: groupID,
clientID: clientID,
userID: userID,
}
return req, nil
}
Expand Down
24 changes: 15 additions & 9 deletions channels/api/http/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,21 @@ func listChannelsEndpoint(svc channels.Service) endpoint.Endpoint {
}

pm := channels.PageMetadata{
Status: req.status,
Offset: req.offset,
Limit: req.limit,
Name: req.name,
Tag: req.tag,
Permission: req.permission,
Metadata: req.metadata,
ListPerms: req.listPerms,
Id: req.id,
Offset: req.offset,
Limit: req.limit,
Name: req.name,
Order: req.order,
Dir: req.dir,
Metadata: req.metadata,
Tag: req.tag,
Status: req.status,
Group: req.groupID,
Client: req.clientID,
ConnectionType: req.connType,
RoleName: req.roleName,
RoleID: req.roleID,
Actions: req.actions,
AccessType: req.accessType,
}
page, err := svc.ListChannels(ctx, session, pm)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions channels/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/channels"
smqclients "github.com/absmach/supermq/clients"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/connections"
)

Expand Down Expand Up @@ -64,29 +64,29 @@ func (req viewChannelReq) validate() error {
}

type listChannelsReq struct {
status smqclients.Status
offset uint64
limit uint64
name string
tag string
permission string
visibility string
status clients.Status
metadata clients.Metadata
roleName string
roleID string
actions []string
accessType string
order string
dir string
offset uint64
limit uint64
groupID string
clientID string
connType string
userID string
listPerms bool
metadata smqclients.Metadata
id string
}

func (req listChannelsReq) validate() error {
if req.limit > api.MaxLimitSize || req.limit < 1 {
return apiutil.ErrLimitSize
}
if req.visibility != "" &&
req.visibility != api.AllVisibility &&
req.visibility != api.MyVisibility &&
req.visibility != api.SharedVisibility {
return apiutil.ErrInvalidVisibilityType
}

if len(req.name) > api.MaxNameSize {
return apiutil.ErrNameSize
}
Expand Down
8 changes: 0 additions & 8 deletions channels/api/http/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,6 @@ func TestListChannelsReqValidation(t *testing.T) {
},
err: apiutil.ErrNameSize,
},
{
desc: "invalid visibility",
req: listChannelsReq{
limit: 10,
visibility: "invalid",
},
err: apiutil.ErrInvalidVisibilityType,
},
}
for _, tc := range cases {
err := tc.req.validate()
Expand Down
68 changes: 41 additions & 27 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,43 @@ type Channel struct {
ParentGroup string `json:"parent_group_id,omitempty"`
Domain string `json:"domain_id,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
Permissions []string `json:"permissions,omitempty"` // 1 for enabled, 0 for disabled
Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
// Extended
ParentGroupPath string `json:"parent_group_path"`
RoleID string `json:"role_id"`
RoleName string `json:"role_name"`
Actions []string `json:"actions"`
AccessType string `json:"access_type"`
AccessProviderId string `json:"access_provider_id"`
AccessProviderRoleId string `json:"access_provider_role_id"`
AccessProviderRoleName string `json:"access_provider_role_name"`
AccessProviderRoleActions []string `json:"access_provider_role_actions"`
}

type PageMetadata struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Name string `json:"name,omitempty"`
Id string `json:"id,omitempty"`
Order string `json:"order,omitempty"`
Dir string `json:"dir,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
Domain string `json:"domain,omitempty"`
Tag string `json:"tag,omitempty"`
Permission string `json:"permission,omitempty"`
Status clients.Status `json:"status,omitempty"`
IDs []string `json:"ids,omitempty"`
ListPerms bool `json:"-"`
ClientID string `json:"-"`
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Order string `json:"order,omitempty"`
Dir string `json:"dir,omitempty"`
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
Domain string `json:"domain,omitempty"`
Tag string `json:"tag,omitempty"`
Status clients.Status `json:"status,omitempty"`
Group string `json:"group,omitempty"`
Client string `json:"client,omitempty"`
ConnectionType string `json:"connection_type,omitempty"`
RoleName string `json:"role_name,omitempty"`
RoleID string `json:"role_id,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
IDs []string `json:"-"`
}

// ChannelsPage contains page related metadata as well as list of channels that
Expand All @@ -71,15 +85,15 @@ type AuthzReq struct {

//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
type Service interface {
// CreateChannels adds channels to the user identified by the provided key.
// CreateChannels adds channels to the user.
CreateChannels(ctx context.Context, session authn.Session, channels ...Channel) ([]Channel, []roles.RoleProvision, error)

// ViewChannel retrieves data about the channel identified by the provided
// ID, that belongs to the user identified by the provided key.
// ID, that belongs to the user.
ViewChannel(ctx context.Context, session authn.Session, id string) (Channel, error)

// UpdateChannel updates the channel identified by the provided ID, that
// belongs to the user identified by the provided key.
// belongs to the user.
UpdateChannel(ctx context.Context, session authn.Session, channel Channel) (Channel, error)

// UpdateChannelTags updates the channel's tags.
Expand All @@ -89,17 +103,14 @@ type Service interface {

DisableChannel(ctx context.Context, session authn.Session, id string) (Channel, error)

// ListChannels retrieves data about subset of channels that belongs to the
// user identified by the provided key.
// ListChannels retrieves data about subset of channels that belongs to the user.
ListChannels(ctx context.Context, session authn.Session, pm PageMetadata) (Page, error)

// ListChannelsByClient retrieves data about subset of channels that have
// specified client connected or not connected to them and belong to the user identified by
// the provided key.
ListChannelsByClient(ctx context.Context, session authn.Session, id string, pm PageMetadata) (Page, error)
// ListUserChannels retrieves data about subset of channels that belong to the specified user.
ListUserChannels(ctx context.Context, session authn.Session, userID string, pm PageMetadata) (Page, error)

// RemoveChannel removes the client identified by the provided ID, that
// belongs to the user identified by the provided key.
// belongs to the user.
RemoveChannel(ctx context.Context, session authn.Session, id string) error

// Connect adds clients to the channels list of connected clients.
Expand Down Expand Up @@ -131,6 +142,9 @@ type Repository interface {

ChangeStatus(ctx context.Context, channel Channel) (Channel, error)

// RetrieveUserChannels retrieves the channel of given domainID and userID.
RetrieveUserChannels(ctx context.Context, domainID, userID string, pm PageMetadata) (Page, error)

// RetrieveByID retrieves the channel having the provided identifier
RetrieveByID(ctx context.Context, id string) (Channel, error)

Expand Down
Loading

0 comments on commit 88d583b

Please sign in to comment.