Skip to content

Commit

Permalink
NOISSUE - Fix Listing of connections and groups for clients and chann…
Browse files Browse the repository at this point in the history
…els (#2660)

Signed-off-by: Arvindh <[email protected]>
  • Loading branch information
arvindh123 authored Jan 23, 2025
1 parent 104c5b3 commit d77d005
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 74 deletions.
12 changes: 10 additions & 2 deletions channels/api/http/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,17 @@ func listChannelsEndpoint(svc channels.Service) endpoint.Endpoint {
Actions: req.actions,
AccessType: req.accessType,
}
page, err := svc.ListChannels(ctx, session, pm)

var page channels.Page
var err error
switch req.userID != "" {
case true:
page, err = svc.ListUserChannels(ctx, session, req.userID, pm)
default:
page, err = svc.ListChannels(ctx, session, pm)
}
if err != nil {
return nil, err
return channelsPageRes{}, err
}

res := channelsPageRes{
Expand Down
19 changes: 10 additions & 9 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ type Channel struct {
UpdatedBy string `json:"updated_by,omitempty"`
Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
// Extended
ParentGroupPath string `json:"parent_group_path,omitempty"`
RoleID string `json:"role_id,omitempty"`
RoleName string `json:"role_name,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
AccessProviderId string `json:"access_provider_id,omitempty"`
AccessProviderRoleId string `json:"access_provider_role_id,omitempty"`
AccessProviderRoleName string `json:"access_provider_role_name,omitempty"`
AccessProviderRoleActions []string `json:"access_provider_role_actions,omitempty"`
ParentGroupPath string `json:"parent_group_path,omitempty"`
RoleID string `json:"role_id,omitempty"`
RoleName string `json:"role_name,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
AccessProviderId string `json:"access_provider_id,omitempty"`
AccessProviderRoleId string `json:"access_provider_role_id,omitempty"`
AccessProviderRoleName string `json:"access_provider_role_name,omitempty"`
AccessProviderRoleActions []string `json:"access_provider_role_actions,omitempty"`
ConnectionTypes []connections.ConnType `json:"connection_types,omitempty"`
}

type PageMetadata struct {
Expand Down
156 changes: 128 additions & 28 deletions channels/postgres/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,60 @@ func (cr *channelRepository) RetrieveByID(ctx context.Context, id string) (chann
}

func (cr *channelRepository) RetrieveAll(ctx context.Context, pm channels.PageMetadata) (channels.Page, error) {
query, err := PageQuery(pm)
pageQuery, err := PageQuery(pm)
if err != nil {
return channels.Page{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
query = applyOrdering(query, pm)

q := fmt.Sprintf(`SELECT c.id, c.name, c.tags, c.metadata, COALESCE(c.domain_id, '') AS domain_id, COALESCE(parent_group_id, '') AS parent_group_id, c.status,
c.created_by, c.created_at, c.updated_at, COALESCE(c.updated_by, '') AS updated_by FROM channels c %s LIMIT :limit OFFSET :offset;`, query)
connJoinQuery := `
FROM
channels c
`

if pm.Client != "" {
connJoinQuery = `
,conn.connection_types
FROM
channels c
LEFT JOIN (
SELECT
conn.client_id,
conn.channel_id,
array_agg(conn."type") AS connection_types
FROM
connections AS conn
GROUP BY
conn.client_id, conn.channel_id
) conn ON c.id = conn.channel_id
`
}

comQuery := fmt.Sprintf(`WITH channels AS (
SELECT
c.id,
c.name,
c.tags,
c.metadata,
COALESCE(c.domain_id, '') AS domain_id,
COALESCE(parent_group_id, '') AS parent_group_id,
COALESCE((SELECT path FROM groups WHERE id = c.parent_group_id), ''::::ltree) AS parent_group_path,
c.status,
c.created_by,
c.created_at,
c.updated_at,
COALESCE(c.updated_by, '') AS updated_by
FROM
channels c
)
SELECT
*
%s
%s
`, connJoinQuery, pageQuery)

q := applyOrdering(comQuery, pm)

q = applyLimitOffset(q)

dbPage, err := toDBChannelsPage(pm)
if err != nil {
Expand All @@ -179,7 +225,11 @@ func (cr *channelRepository) RetrieveAll(ctx context.Context, pm channels.PageMe

items = append(items, ch)
}
cq := fmt.Sprintf(`SELECT COUNT(*) FROM channels c %s;`, query)
cq := fmt.Sprintf(`SELECT COUNT(*) AS total_count
FROM (
%s
) AS sub_query;
`, comQuery)

total, err := postgres.Total(ctx, cr.db, cq, dbPage)
if err != nil {
Expand Down Expand Up @@ -209,9 +259,27 @@ func (repo *channelRepository) retrieveClients(ctx context.Context, domainID, us

bq := repo.userChannelsBaseQuery(domainID, userID)

connJoinQuery := ""
connJoinQuery := `
FROM
final_channels c
`

if pm.Client != "" {
connJoinQuery = "JOIN connection conn ON conn.channel_id = c.id"
connJoinQuery = `
,conn.connection_types
FROM
final_channels c
LEFT JOIN (
SELECT
conn.client_id,
conn.channel_id,
array_agg(conn."type") AS connection_types
FROM
connections AS conn
GROUP BY
conn.client_id, conn.channel_id
) conn ON c.id = conn.channel_id
`
}

q := fmt.Sprintf(`
Expand All @@ -237,14 +305,14 @@ func (repo *channelRepository) retrieveClients(ctx context.Context, domainID, us
c.access_provider_role_id,
c.access_provider_role_name,
c.access_provider_role_actions
FROM
final_channels c
%s
%s
`, bq, connJoinQuery, pageQuery)

q = applyOrdering(q, pm)

q = applyLimitOffset(q)

dbPage, err := toDBChannelsPage(pm)
if err != nil {
return channels.Page{}, errors.Wrap(repoerr.ErrViewEntity, err)
Expand All @@ -271,10 +339,6 @@ func (repo *channelRepository) retrieveClients(ctx context.Context, domainID, us
items = append(items, c)
}

chJoinQuery := ""
if pm.Client != "" {
chJoinQuery = "JOIN connection conn ON conn.channel_id = c.id"
}
cq := fmt.Sprintf(`%s
SELECT COUNT(*) AS total_count
FROM (
Expand All @@ -299,12 +363,10 @@ func (repo *channelRepository) retrieveClients(ctx context.Context, domainID, us
c.access_provider_role_id,
c.access_provider_role_name,
c.access_provider_role_actions
FROM
final_channels c
%s
%s
) AS subquery;
`, bq, chJoinQuery, pageQuery)
`, bq, connJoinQuery, pageQuery)

total, err := postgres.Total(ctx, repo.db, cq, dbPage)
if err != nil {
Expand Down Expand Up @@ -338,7 +400,7 @@ WITH direct_channels AS (
c.updated_at,
c.updated_by,
c.status,
text2ltree('') as parent_group_path,
COALESCE((SELECT path FROM groups WHERE id = c.parent_group_id), ''::::ltree) AS parent_group_path,
cr.id AS role_id,
cr."name" AS role_name,
array_agg(cra."action") AS actions,
Expand Down Expand Up @@ -368,7 +430,7 @@ direct_groups AS (
grm.member_id AS member_id,
gr.id AS role_id,
gr."name" AS role_name,
array_agg(gra."action") AS actions
array_agg(DISTINCT all_actions."action") AS actions
FROM
groups_role_members grm
JOIN
Expand All @@ -377,21 +439,39 @@ direct_groups AS (
groups_roles gr ON gr.id = grm.role_id
JOIN
"groups" g ON g.id = gr.entity_id
JOIN
groups_role_actions all_actions ON all_actions.role_id = grm.role_id
WHERE
grm.member_id = '%s'
AND g.domain_id = '%s'
AND gra."action" LIKE 'channel%%'
GROUP BY
gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id
),
direct_groups_with_subgroup AS (
SELECT
*
FROM direct_groups
WHERE EXISTS (
SELECT 1
FROM unnest(direct_groups.actions) AS action
WHERE action LIKE 'subgroup_%%'
)
g.*,
gr.entity_id AS entity_id,
grm.member_id AS member_id,
gr.id AS role_id,
gr."name" AS role_name,
array_agg(DISTINCT all_actions."action") AS actions
FROM
groups_role_members grm
JOIN
groups_role_actions gra ON gra.role_id = grm.role_id
JOIN
groups_roles gr ON gr.id = grm.role_id
JOIN
"groups" g ON g.id = gr.entity_id
JOIN
groups_role_actions all_actions ON all_actions.role_id = grm.role_id
WHERE
grm.member_id = '%s'
AND g.domain_id = '%s'
AND gra."action" LIKE 'subgroup_channel%%'
GROUP BY
gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id
),
indirect_child_groups AS (
SELECT
Expand All @@ -409,8 +489,12 @@ indirect_child_groups AS (
indirect_child_groups.domain_id = '%s'
AND NOT EXISTS (
SELECT 1
FROM direct_groups_with_subgroup dgws
WHERE dgws.id = indirect_child_groups.id
FROM (
SELECT id FROM direct_groups_with_subgroup
UNION ALL
SELECT id FROM direct_groups
) excluded
WHERE excluded.id = indirect_child_groups.id
)
),
final_groups AS (
Expand Down Expand Up @@ -558,7 +642,7 @@ final_channels AS (
GROUP BY
dc.id, d.id, dr.id
)
`, userID, domainID, userID, domainID, domainID, userID, domainID)
`, userID, domainID, userID, domainID, userID, domainID, domainID, userID, domainID)
}

func (cr *channelRepository) Remove(ctx context.Context, ids ...string) error {
Expand Down Expand Up @@ -805,6 +889,7 @@ type dbChannel struct {
AccessProviderRoleId string `db:"access_provider_role_id,omitempty"`
AccessProviderRoleName string `db:"access_provider_role_name,omitempty"`
AccessProviderRoleActions pq.StringArray `db:"access_provider_role_actions,omitempty"`
ConnectionTypes pq.Int32Array `db:"connection_types,omitempty"`
}

func toDBChannel(ch channels.Channel) (dbChannel, error) {
Expand Down Expand Up @@ -889,6 +974,15 @@ func toChannel(ch dbChannel) (channels.Channel, error) {
updatedAt = ch.UpdatedAt.Time
}

connTypes := []connections.ConnType{}
for _, ct := range ch.ConnectionTypes {
connType, err := connections.NewType(uint(ct))
if err != nil {
return channels.Channel{}, err
}
connTypes = append(connTypes, connType)
}

newCh := channels.Channel{
ID: ch.ID,
Name: ch.Name,
Expand All @@ -910,6 +1004,7 @@ func toChannel(ch dbChannel) (channels.Channel, error) {
AccessProviderRoleId: ch.AccessProviderRoleId,
AccessProviderRoleName: ch.AccessProviderRoleName,
AccessProviderRoleActions: ch.AccessProviderRoleActions,
ConnectionTypes: connTypes,
}

return newCh, nil
Expand Down Expand Up @@ -989,6 +1084,11 @@ func applyOrdering(emq string, pm channels.PageMetadata) string {
return emq
}

func applyLimitOffset(query string) string {
return fmt.Sprintf(`%s
LIMIT :limit OFFSET :offset`, query)
}

func toDBChannelsPage(pm channels.PageMetadata) (dbChannelsPage, error) {
_, data, err := postgres.CreateMetadataQuery("", pm.Metadata)
if err != nil {
Expand Down
19 changes: 10 additions & 9 deletions clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,16 @@ type Client struct {
Status Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
Identity string `json:"identity,omitempty"`
// Extended
ParentGroupPath string `json:"parent_group_path,omitempty"`
RoleID string `json:"role_id,omitempty"`
RoleName string `json:"role_name,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
AccessProviderId string `json:"access_provider_id,omitempty"`
AccessProviderRoleId string `json:"access_provider_role_id,omitempty"`
AccessProviderRoleName string `json:"access_provider_role_name,omitempty"`
AccessProviderRoleActions []string `json:"access_provider_role_actions,omitempty"`
ParentGroupPath string `json:"parent_group_path,omitempty"`
RoleID string `json:"role_id,omitempty"`
RoleName string `json:"role_name,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
AccessProviderId string `json:"access_provider_id,omitempty"`
AccessProviderRoleId string `json:"access_provider_role_id,omitempty"`
AccessProviderRoleName string `json:"access_provider_role_name,omitempty"`
AccessProviderRoleActions []string `json:"access_provider_role_actions,omitempty"`
ConnectionTypes []connections.ConnType `json:"connection_types,omitempty"`
}

// ClientsPage contains page related metadata as well as list.
Expand Down
Loading

0 comments on commit d77d005

Please sign in to comment.