From 53e39a1dc60383864f8d4bc543f2251940c68ba7 Mon Sep 17 00:00:00 2001 From: Arvindh <30824765+arvindh123@users.noreply.github.com> Date: Tue, 21 Jan 2025 19:01:29 +0530 Subject: [PATCH] NOISSUE - Add domain events consumer, listing of clients and channels with domains (#2656) Signed-off-by: Arvindh --- channels/channels.go | 18 +- channels/postgres/channels.go | 403 ++++++++++++---------- clients/clients.go | 4 - clients/postgres/clients.go | 459 ++++++++++++++------------ cmd/channels/main.go | 11 + cmd/clients/main.go | 11 + cmd/groups/main.go | 12 + docker/spicedb/schema.zed | 23 +- domains/domains.go | 14 +- domains/mocks/repository.go | 18 +- domains/postgres/domains.go | 31 +- domains/postgres/domains_test.go | 2 +- domains/service.go | 14 +- groups/postgres/groups.go | 252 ++++++++------ groups/postgres/init.go | 7 + pkg/domains/events/consumer/decode.go | 271 +++++++++++++++ pkg/domains/events/consumer/doc.go | 4 + pkg/domains/events/consumer/stream.go | 234 +++++++++++++ pkg/events/events.go | 11 +- pkg/events/nats/subscriber.go | 2 +- pkg/roles/repo/postgres/init.go | 2 +- pkg/roles/repo/postgres/roles.go | 2 +- scripts/tables-copy.sh | 51 +++ 23 files changed, 1316 insertions(+), 540 deletions(-) create mode 100644 pkg/domains/events/consumer/decode.go create mode 100644 pkg/domains/events/consumer/doc.go create mode 100644 pkg/domains/events/consumer/stream.go create mode 100755 scripts/tables-copy.sh diff --git a/channels/channels.go b/channels/channels.go index ecf019edb7..c3570323d3 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -28,15 +28,15 @@ type Channel struct { UpdatedBy string `json:"updated_by,omitempty"` Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled // Extended - ParentGroupPath string `json:"parent_group_path"` - RoleID string `json:"role_id"` - RoleName string `json:"role_name"` - Actions []string `json:"actions"` - AccessType string `json:"access_type"` - AccessProviderId string `json:"access_provider_id"` - AccessProviderRoleId string `json:"access_provider_role_id"` - AccessProviderRoleName string `json:"access_provider_role_name"` - AccessProviderRoleActions []string `json:"access_provider_role_actions"` + ParentGroupPath string `json:"parent_group_path,omitempty"` + RoleID string `json:"role_id,omitempty"` + RoleName string `json:"role_name,omitempty"` + Actions []string `json:"actions,omitempty"` + AccessType string `json:"access_type,omitempty"` + AccessProviderId string `json:"access_provider_id,omitempty"` + AccessProviderRoleId string `json:"access_provider_role_id,omitempty"` + AccessProviderRoleName string `json:"access_provider_role_name,omitempty"` + AccessProviderRoleActions []string `json:"access_provider_role_actions,omitempty"` } type PageMetadata struct { diff --git a/channels/postgres/channels.go b/channels/postgres/channels.go index 1264561dbb..840073cfb5 100644 --- a/channels/postgres/channels.go +++ b/channels/postgres/channels.go @@ -325,173 +325,240 @@ func (repo *channelRepository) retrieveClients(ctx context.Context, domainID, us func (repo *channelRepository) userChannelsBaseQuery(domainID, userID string) string { return fmt.Sprintf(` - WITH direct_channels AS ( - select - c.id, - c.name, - c.domain_id, - c.parent_group_id, - c.tags, - c.metadata, - c.created_by, - c.created_at, - c.updated_at, - c.updated_by, - c.status, - text2ltree('') as parent_group_path, - cr.id AS role_id, - cr."name" AS role_name, - array_agg(cra."action") AS actions, - 'direct' as access_type, - '' AS access_provider_id, - '' AS access_provider_role_id, - '' AS access_provider_role_name, - array[]::::text[] AS access_provider_role_actions - FROM - channels_role_members crm - JOIN - channels_role_actions cra ON cra.role_id = crm.role_id - JOIN - channels_roles cr ON cr.id = crm.role_id - JOIN - channels c ON c.id = cr.entity_id - WHERE - crm.member_id = '%s' - AND c.domain_id = '%s' - GROUP BY - cr.entity_id, crm.member_id, cr.id, cr."name", c.id - ), - direct_groups AS ( - SELECT - g.*, - gr.entity_id AS entity_id, - grm.member_id AS member_id, - gr.id AS role_id, - gr."name" AS role_name, - array_agg(gra."action") AS actions - FROM - groups_role_members grm - JOIN - groups_role_actions gra ON gra.role_id = grm.role_id - JOIN - groups_roles gr ON gr.id = grm.role_id - JOIN - "groups" g ON g.id = gr.entity_id - WHERE - grm.member_id = '%s' - AND g.domain_id = '%s' - GROUP BY - gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id - ), - direct_groups_with_subgroup AS ( - SELECT - * - FROM direct_groups - WHERE EXISTS ( - SELECT 1 - FROM unnest(direct_groups.actions) AS action - WHERE action LIKE 'subgroup_%%' - ) - ), - indirect_child_groups AS ( - SELECT - DISTINCT indirect_child_groups.id as child_id, - indirect_child_groups.*, - dgws.id as access_provider_id, - dgws.role_id as access_provider_role_id, - dgws.role_name as access_provider_role_name, - dgws.actions as access_provider_role_actions - FROM - direct_groups_with_subgroup dgws - JOIN - groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path - WHERE - indirect_child_groups.domain_id = '%s' - AND NOT EXISTS ( - SELECT 1 - FROM direct_groups_with_subgroup dgws - WHERE dgws.id = indirect_child_groups.id - ) - ), - final_groups AS ( - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - role_id, - role_name, - actions, - 'direct_group' AS access_type, - '' AS access_provider_id, - '' AS access_provider_role_id, - '' AS access_provider_role_name, - array[]::::text[] AS access_provider_role_actions - FROM - direct_groups - UNION - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - '' AS role_id, - '' AS role_name, - array[]::::text[] AS actions, - 'indirect_group' AS access_type, - access_provider_id, - access_provider_role_id, - access_provider_role_name, - access_provider_role_actions - FROM - indirect_child_groups - ), - final_channels AS ( - SELECT - c.id, - c.name, - c.domain_id, - c.parent_group_id, - c.tags, - c.metadata, - c.created_by, - c.created_at, - c.updated_at, - c.updated_by, - c.status, - g.path AS parent_group_path, - g.role_id, - g.role_name, - g.actions, - g.access_type, - g.access_provider_id, - g.access_provider_role_id, - g.access_provider_role_name, - g.access_provider_role_actions - FROM - final_groups g - JOIN - channels c ON c.parent_group_id = g.id - WHERE - c.id NOT IN (SELECT id FROM direct_channels) - UNION - SELECT * FROM direct_channels - ) - `, userID, domainID, userID, domainID, domainID) +WITH direct_channels AS ( + select + c.id, + c.name, + c.domain_id, + c.parent_group_id, + c.tags, + c.metadata, + c.created_by, + c.created_at, + c.updated_at, + c.updated_by, + c.status, + text2ltree('') as parent_group_path, + cr.id AS role_id, + cr."name" AS role_name, + array_agg(cra."action") AS actions, + 'direct' as access_type, + '' AS access_provider_id, + '' AS access_provider_role_id, + '' AS access_provider_role_name, + array[]::::text[] AS access_provider_role_actions + FROM + channels_role_members crm + JOIN + channels_role_actions cra ON cra.role_id = crm.role_id + JOIN + channels_roles cr ON cr.id = crm.role_id + JOIN + channels c ON c.id = cr.entity_id + WHERE + crm.member_id = '%s' + AND c.domain_id = '%s' + GROUP BY + cr.entity_id, crm.member_id, cr.id, cr."name", c.id +), +direct_groups AS ( + SELECT + g.*, + gr.entity_id AS entity_id, + grm.member_id AS member_id, + gr.id AS role_id, + gr."name" AS role_name, + array_agg(gra."action") AS actions + FROM + groups_role_members grm + JOIN + groups_role_actions gra ON gra.role_id = grm.role_id + JOIN + groups_roles gr ON gr.id = grm.role_id + JOIN + "groups" g ON g.id = gr.entity_id + WHERE + grm.member_id = '%s' + AND g.domain_id = '%s' + GROUP BY + gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id +), +direct_groups_with_subgroup AS ( + SELECT + * + FROM direct_groups + WHERE EXISTS ( + SELECT 1 + FROM unnest(direct_groups.actions) AS action + WHERE action LIKE 'subgroup_%%' + ) +), +indirect_child_groups AS ( + SELECT + DISTINCT indirect_child_groups.id as child_id, + indirect_child_groups.*, + dgws.id as access_provider_id, + dgws.role_id as access_provider_role_id, + dgws.role_name as access_provider_role_name, + dgws.actions as access_provider_role_actions + FROM + direct_groups_with_subgroup dgws + JOIN + groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path + WHERE + indirect_child_groups.domain_id = '%s' + AND NOT EXISTS ( + SELECT 1 + FROM direct_groups_with_subgroup dgws + WHERE dgws.id = indirect_child_groups.id + ) +), +final_groups AS ( + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'direct_group' AS access_type, + id AS access_provider_id, + role_id AS access_provider_role_id, + role_name AS access_provider_role_name, + actions AS access_provider_role_actions + FROM + direct_groups + UNION + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'indirect_group' AS access_type, + access_provider_id, + access_provider_role_id, + access_provider_role_name, + access_provider_role_actions + FROM + indirect_child_groups +), +groups_channels AS ( + SELECT + c.id, + c.name, + c.domain_id, + c.parent_group_id, + c.tags, + c.metadata, + c.created_by, + c.created_at, + c.updated_at, + c.updated_by, + c.status, + g.path AS parent_group_path, + g.role_id, + g.role_name, + g.actions, + g.access_type, + g.access_provider_id, + g.access_provider_role_id, + g.access_provider_role_name, + g.access_provider_role_actions + FROM + final_groups g + JOIN + channels c ON c.parent_group_id = g.id + WHERE + c.id NOT IN (SELECT id FROM direct_channels) + UNION + SELECT * FROM direct_channels +), +final_channels AS ( + SELECT + gc.id, + gc."name", + gc.domain_id, + gc.parent_group_id, + gc.tags, + gc.metadata, + gc.created_by, + gc.created_at, + gc.updated_at, + gc.updated_by, + gc.status, + gc.parent_group_path, + gc.role_id, + gc.role_name, + gc.actions, + gc.access_type, + gc.access_provider_id, + gc.access_provider_role_id, + gc.access_provider_role_name, + gc.access_provider_role_actions + FROM + groups_channels AS gc + UNION + SELECT + dc.id, + dc."name", + dc.domain_id, + dc.parent_group_id, + dc.tags, + dc.metadata, + dc.created_by, + dc.created_at, + dc.updated_at, + dc.updated_by, + dc.status, + text2ltree('') AS parent_group_path, + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'domain' AS access_type, + d.id AS access_provider_id, + dr.id AS access_provider_role_id, + dr."name" AS access_provider_role_name, + array_agg(dra."action") as access_provider_role_actions + FROM + domains_role_members drm + JOIN + domains_role_actions dra ON dra.role_id = drm.role_id + JOIN + domains_roles dr ON dr.id = drm.role_id + JOIN + domains d ON d.id = dr.entity_id + JOIN + channels dc ON dc.domain_id = d.id + WHERE + drm.member_id = '%s' -- user_id + AND d.id = '%s' -- domain_id + AND dra."action" LIKE 'channel_%%' + AND NOT EXISTS ( -- Ensures that the direct and indirect channels are not in included. + SELECT 1 FROM groups_channels gc + WHERE gc.id = dc.id + ) + GROUP BY + dc.id, d.id, dr.id +) + `, userID, domainID, userID, domainID, domainID, userID, domainID) } func (cr *channelRepository) Remove(ctx context.Context, ids ...string) error { @@ -880,7 +947,7 @@ func PageQuery(pm channels.PageMetadata) (string, error) { query = append(query, "c.domain_id = :domain_id") } if pm.Group != "" { - query = append(query, "c.parent_group_path @> (SELECT path from groups where id = :group_id) ") + query = append(query, "c.parent_group_path <@ (SELECT path from groups where id = :group_id) ") } if pm.Client != "" { query = append(query, "conn.client_id = :client_id ") diff --git a/clients/clients.go b/clients/clients.go index 5c09f21e36..03323780c2 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -13,10 +13,6 @@ import ( "github.com/absmach/supermq/pkg/roles" ) -type CtxKey int - -const ListDomainClients CtxKey = iota - type Connection struct { ClientID string ChannelID string diff --git a/clients/postgres/clients.go b/clients/postgres/clients.go index 3fa6719312..0fedd81bfe 100644 --- a/clients/postgres/clients.go +++ b/clients/postgres/clients.go @@ -372,224 +372,244 @@ func (repo *clientRepo) retrieveClients(ctx context.Context, domainID, userID st func (repo *clientRepo) userClientBaseQuery(domainID, userID string) string { return fmt.Sprintf(` - WITH direct_clients AS ( - SELECT - c.id, - c.name, - c.domain_id, - c.parent_group_id, - c.identity, - c.secret, - c.tags, - c.metadata, - c.created_at, - c.updated_at, - c.updated_by, - c.status, - text2ltree('') as parent_group_path, - cr.id AS role_id, - cr."name" AS role_name, - array_agg(cra."action") AS actions, - 'direct' as access_type, - '' AS access_provider_id, - '' AS access_provider_role_id, - '' AS access_provider_role_name, - array[]::::text[] AS access_provider_role_actions - FROM - clients_role_members crm - JOIN - clients_role_actions cra ON cra.role_id = crm.role_id - JOIN - clients_roles cr ON cr.id = crm.role_id - JOIN - clients c ON c.id = cr.entity_id - WHERE - crm.member_id = '%s' - AND c.domain_id = '%s' - GROUP BY - cr.entity_id, crm.member_id, cr.id, cr."name", c.id - ), - direct_groups AS ( - SELECT - g.*, - gr.entity_id AS entity_id, - grm.member_id AS member_id, - gr.id AS role_id, - gr."name" AS role_name, - array_agg(gra."action") AS actions - FROM - groups_role_members grm - JOIN - groups_role_actions gra ON gra.role_id = grm.role_id - JOIN - groups_roles gr ON gr.id = grm.role_id - JOIN - "groups" g ON g.id = gr.entity_id - WHERE - grm.member_id = '%s' - AND g.domain_id = '%s' - GROUP BY - gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id - ), - direct_groups_with_subgroup AS ( - SELECT - * - FROM direct_groups - WHERE EXISTS ( - SELECT 1 - FROM unnest(direct_groups.actions) AS action - WHERE action LIKE 'subgroup_%%' - ) - ), - indirect_child_groups AS ( - SELECT - DISTINCT indirect_child_groups.id as child_id, - indirect_child_groups.*, - dgws.id as access_provider_id, - dgws.role_id as access_provider_role_id, - dgws.role_name as access_provider_role_name, - dgws.actions as access_provider_role_actions - FROM - direct_groups_with_subgroup dgws - JOIN - groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path - WHERE - indirect_child_groups.domain_id = '%s' - AND NOT EXISTS ( - SELECT 1 - FROM direct_groups_with_subgroup dgws - WHERE dgws.id = indirect_child_groups.id - ) - ), - final_groups AS ( - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - role_id, - role_name, - actions, - 'direct_group' AS access_type, - '' AS access_provider_id, - '' AS access_provider_role_id, - '' AS access_provider_role_name, - array[]::::text[] AS access_provider_role_actions - FROM - direct_groups - UNION - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - '' AS role_id, - '' AS role_name, - array[]::::text[] AS actions, - 'indirect_group' AS access_type, - access_provider_id, - access_provider_role_id, - access_provider_role_name, - access_provider_role_actions - FROM - indirect_child_groups - ), - group_direct_clients AS ( - SELECT - c.id, - c.name, - c.domain_id, - c.parent_group_id, - c.identity, - c.secret, - c.tags, - c.metadata, - c.created_at, - c.updated_at, - c.updated_by, - c.status, - g.path AS parent_group_path, - g.role_id, - g.role_name, - g.actions, - g.access_type, - g.access_provider_id, - g.access_provider_role_id, - g.access_provider_role_name, - g.access_provider_role_actions - FROM - final_groups g - JOIN - clients c ON c.parent_group_id = g.id - WHERE - c.id NOT IN (SELECT id FROM direct_clients) - UNION - SELECT - dc.id, - dc.name, - dc.domain_id, - dc.parent_group_id, - dc.identity, - dc.secret, - dc.tags, - dc.metadata, - dc.created_at, - dc.updated_at, - dc.updated_by, - dc.status, - dc.parent_group_path, - dc.role_id, - dc.role_name, - dc.actions, - dc.access_type, - dc.access_provider_id, - dc.access_provider_role_id, - dc.access_provider_role_name, - dc.access_provider_role_actions - FROM - direct_clients AS dc - ), - final_clients AS ( - SELECT - gdc.id, - gdc.name, - gdc.domain_id, - gdc.parent_group_id, - gdc.identity, - gdc.secret, - gdc.tags, - gdc.metadata, - gdc.created_at, - gdc.updated_at, - gdc.updated_by, - gdc.status, - gdc.parent_group_path, - gdc.role_id, - gdc.role_name, - gdc.actions, - gdc.access_type, - gdc.access_provider_id, - gdc.access_provider_role_id, - gdc.access_provider_role_name, - gdc.access_provider_role_actions - FROM - group_direct_clients AS gdc - ) - `, userID, domainID, userID, domainID, domainID) + WITH direct_clients AS ( + SELECT + c.id, + c.name, + c.domain_id, + c.parent_group_id, + c.tags, + c.metadata, + c.identity, + c.secret, + c.created_at, + c.updated_at, + c.updated_by, + c.status, + text2ltree('') as parent_group_path, + cr.id AS role_id, + cr."name" AS role_name, + array_agg(cra."action") AS actions, + 'direct' as access_type, + '' AS access_provider_id, + '' AS access_provider_role_id, + '' AS access_provider_role_name, + array[]::::text[] AS access_provider_role_actions + FROM + clients_role_members crm + JOIN + clients_role_actions cra ON cra.role_id = crm.role_id + JOIN + clients_roles cr ON cr.id = crm.role_id + JOIN + clients c ON c.id = cr.entity_id + WHERE + crm.member_id = '%s' + AND c.domain_id = '%s' + GROUP BY + cr.entity_id, crm.member_id, cr.id, cr."name", c.id + ), + direct_groups AS ( + SELECT + g.*, + gr.entity_id AS entity_id, + grm.member_id AS member_id, + gr.id AS role_id, + gr."name" AS role_name, + array_agg(gra."action") AS actions + FROM + groups_role_members grm + JOIN + groups_role_actions gra ON gra.role_id = grm.role_id + JOIN + groups_roles gr ON gr.id = grm.role_id + JOIN + "groups" g ON g.id = gr.entity_id + WHERE + grm.member_id = '%s' + AND g.domain_id = '%s' + GROUP BY + gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id + ), + direct_groups_with_subgroup AS ( + SELECT + * + FROM direct_groups + WHERE EXISTS ( + SELECT 1 + FROM unnest(direct_groups.actions) AS action + WHERE action LIKE 'subgroup_%%' + ) + ), + indirect_child_groups AS ( + SELECT + DISTINCT indirect_child_groups.id as child_id, + indirect_child_groups.*, + dgws.id as access_provider_id, + dgws.role_id as access_provider_role_id, + dgws.role_name as access_provider_role_name, + dgws.actions as access_provider_role_actions + FROM + direct_groups_with_subgroup dgws + JOIN + groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path + WHERE + indirect_child_groups.domain_id = '%s' + AND NOT EXISTS ( + SELECT 1 + FROM direct_groups_with_subgroup dgws + WHERE dgws.id = indirect_child_groups.id + ) + ), + final_groups AS ( + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'direct_group' AS access_type, + id AS access_provider_id, + role_id AS access_provider_role_id, + role_name AS access_provider_role_name, + actions AS access_provider_role_actions + FROM + direct_groups + UNION + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'indirect_group' AS access_type, + access_provider_id, + access_provider_role_id, + access_provider_role_name, + access_provider_role_actions + FROM + indirect_child_groups + ), + groups_clients AS ( + SELECT + c.id, + c.name, + c.domain_id, + c.parent_group_id, + c.tags, + c.metadata, + c.identity, + c.secret, + c.created_at, + c.updated_at, + c.updated_by, + c.status, + g.path AS parent_group_path, + g.role_id, + g.role_name, + g.actions, + g.access_type, + g.access_provider_id, + g.access_provider_role_id, + g.access_provider_role_name, + g.access_provider_role_actions + FROM + final_groups g + JOIN + clients c ON c.parent_group_id = g.id + WHERE + c.id NOT IN (SELECT id FROM direct_clients) + UNION + SELECT * FROM direct_clients + ), + final_clients AS ( + SELECT + gc.id, + gc."name", + gc.domain_id, + gc.parent_group_id, + gc.tags, + gc.metadata, + gc.identity, + gc.secret, + gc.created_at, + gc.updated_at, + gc.updated_by, + gc.status, + gc.parent_group_path, + gc.role_id, + gc.role_name, + gc.actions, + gc.access_type, + gc.access_provider_id, + gc.access_provider_role_id, + gc.access_provider_role_name, + gc.access_provider_role_actions + FROM + groups_clients AS gc + UNION + SELECT + dc.id, + dc."name", + dc.domain_id, + dc.parent_group_id, + dc.tags, + dc.metadata, + dc.identity, + dc.secret, + dc.created_at, + dc.updated_at, + dc.updated_by, + dc.status, + text2ltree('') AS parent_group_path, + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'domain' AS access_type, + d.id AS access_provider_id, + dr.id AS access_provider_role_id, + dr."name" AS access_provider_role_name, + array_agg(dra."action") as access_provider_role_actions + FROM + domains_role_members drm + JOIN + domains_role_actions dra ON dra.role_id = drm.role_id + JOIN + domains_roles dr ON dr.id = drm.role_id + JOIN + domains d ON d.id = dr.entity_id + JOIN + clients dc ON dc.domain_id = d.id + WHERE + drm.member_id = '%s' -- user_id + AND d.id = '%s' -- domain_id + AND dra."action" LIKE 'client_%%' + AND NOT EXISTS ( -- Ensures that the direct and indirect clients are not in included. + SELECT 1 FROM groups_clients gc + WHERE gc.id = dc.id + ) + GROUP BY + dc.id, d.id, dr.id + ) + `, userID, domainID, userID, domainID, domainID, userID, domainID) } func (repo *clientRepo) SearchClients(ctx context.Context, pm clients.Page) (clients.ClientsPage, error) { @@ -785,6 +805,7 @@ func ToClient(t DBClient) (clients.Client, error) { UpdatedAt: updatedAt, UpdatedBy: updatedBy, Status: t.Status, + ParentGroupPath: t.ParentGroupPath, RoleID: t.RoleID, RoleName: t.RoleName, Actions: t.Actions, @@ -876,7 +897,7 @@ func PageQuery(pm clients.Page) (string, error) { query = append(query, "c.domain_id = :domain_id") } if pm.Group != "" { - query = append(query, "c.parent_group_path @> (SELECT path from groups where id = :group_id) ") + query = append(query, "c.parent_group_path <@ (SELECT path from groups where id = :group_id) ") } if pm.Channel != "" { query = append(query, "conn.channel_id = :channel_id ") diff --git a/cmd/channels/main.go b/cmd/channels/main.go index 2afe764680..c4465a2249 100644 --- a/cmd/channels/main.go +++ b/cmd/channels/main.go @@ -25,11 +25,13 @@ import ( "github.com/absmach/supermq/channels/postgres" pChannels "github.com/absmach/supermq/channels/private" "github.com/absmach/supermq/channels/tracing" + dpostgres "github.com/absmach/supermq/domains/postgres" gpostgres "github.com/absmach/supermq/groups/postgres" smqlog "github.com/absmach/supermq/logger" authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc" smqauthz "github.com/absmach/supermq/pkg/authz" authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" + dconsumer "github.com/absmach/supermq/pkg/domains/events/consumer" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" gconsumer "github.com/absmach/supermq/pkg/groups/events/consumer" "github.com/absmach/supermq/pkg/grpcclient" @@ -227,6 +229,15 @@ func main() { return } + ddatabase := pg.NewDatabase(db, dbConfig, tracer) + drepo := dpostgres.New(ddatabase) + + if err := dconsumer.DomainsEventsSubscribe(ctx, drepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create domains event store : %s", err)) + exitCode = 1 + return + } + gdatabase := pg.NewDatabase(db, dbConfig, tracer) grepo := gpostgres.New(gdatabase) diff --git a/cmd/clients/main.go b/cmd/clients/main.go index de3b089f39..c33a00dd14 100644 --- a/cmd/clients/main.go +++ b/cmd/clients/main.go @@ -27,12 +27,14 @@ import ( "github.com/absmach/supermq/clients/postgres" pClients "github.com/absmach/supermq/clients/private" "github.com/absmach/supermq/clients/tracing" + dpostgres "github.com/absmach/supermq/domains/postgres" gpostgres "github.com/absmach/supermq/groups/postgres" redisclient "github.com/absmach/supermq/internal/clients/redis" smqlog "github.com/absmach/supermq/logger" authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc" smqauthz "github.com/absmach/supermq/pkg/authz" authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" + dconsumer "github.com/absmach/supermq/pkg/domains/events/consumer" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" gconsumer "github.com/absmach/supermq/pkg/groups/events/consumer" "github.com/absmach/supermq/pkg/grpcclient" @@ -244,6 +246,15 @@ func main() { return } + ddatabase := pg.NewDatabase(db, dbConfig, tracer) + drepo := dpostgres.New(ddatabase) + + if err := dconsumer.DomainsEventsSubscribe(ctx, drepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create domains event store : %s", err)) + exitCode = 1 + return + } + gdatabase := pg.NewDatabase(db, dbConfig, tracer) grepo := gpostgres.New(gdatabase) diff --git a/cmd/groups/main.go b/cmd/groups/main.go index 032b41b9e4..a415203859 100644 --- a/cmd/groups/main.go +++ b/cmd/groups/main.go @@ -17,6 +17,7 @@ import ( grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1" grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1" grpcGroupsV1 "github.com/absmach/supermq/api/grpc/groups/v1" + dpostgres "github.com/absmach/supermq/domains/postgres" "github.com/absmach/supermq/groups" gpsvc "github.com/absmach/supermq/groups" grpcapi "github.com/absmach/supermq/groups/api/grpc" @@ -30,6 +31,7 @@ import ( authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc" smqauthz "github.com/absmach/supermq/pkg/authz" authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" + dconsumer "github.com/absmach/supermq/pkg/domains/events/consumer" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" "github.com/absmach/supermq/pkg/grpcclient" jaegerclient "github.com/absmach/supermq/pkg/jaeger" @@ -77,6 +79,7 @@ type config struct { JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESConsumerName string `env:"SMQ_GROUPS_EVENT_CONSUMER" envDefault:"groups"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"` SpicedbPort string `env:"SMQ_SPICEDB_PORT" envDefault:"50051"` @@ -226,6 +229,15 @@ func main() { return } + ddatabase := pg.NewDatabase(db, dbConfig, tracer) + drepo := dpostgres.New(ddatabase) + + if err := dconsumer.DomainsEventsSubscribe(ctx, drepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create domains event store : %s", err)) + exitCode = 1 + return + } + httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error())) diff --git a/docker/spicedb/schema.zed b/docker/spicedb/schema.zed index c52099d1da..cb63ecba75 100644 --- a/docker/spicedb/schema.zed +++ b/docker/spicedb/schema.zed @@ -186,7 +186,7 @@ definition group { permission remove_role_users_permission = remove_role_users + parent_group->subgroup_remove_role_users_permission + domain->group_remove_role_users_permission permission view_role_users_permission = view_role_users + parent_group->subgroup_view_role_users_permission + domain->group_view_role_users_permission - // Subgroup clients permisssion + // Subgroup clients permission permission subgroup_client_update_permission = subgroup_client_update + parent_group->subgroup_client_update_permission permission subgroup_client_read_permission = subgroup_client_read + parent_group->subgroup_client_read_permission permission subgroup_client_delete_permission = subgroup_client_delete + parent_group->subgroup_client_delete_permission @@ -198,7 +198,7 @@ definition group { permission subgroup_client_remove_role_users_permission = subgroup_client_remove_role_users + parent_group->subgroup_client_remove_role_users_permission permission subgroup_client_view_role_users_permission = subgroup_client_view_role_users + parent_group->subgroup_client_view_role_users_permission - // Group clients permisssion + // Group clients permission permission client_create_permission = client_create + parent_group->subgroup_client_create + domain->client_create_permission permission client_update_permission = client_update + parent_group->subgroup_client_update + domain->client_update_permission permission client_read_permission = client_read + parent_group->subgroup_client_read + domain->client_read_permission @@ -211,7 +211,7 @@ definition group { permission client_remove_role_users_permission = client_remove_role_users + parent_group->subgroup_client_remove_role_users + domain->client_remove_role_users_permission permission client_view_role_users_permission = client_view_role_users + parent_group->subgroup_client_view_role_users + domain->client_view_role_users_permission - // Subgroup channels permisssion + // Subgroup channels permission permission subgroup_channel_update_permission = subgroup_channel_update + parent_group->subgroup_channel_update_permission permission subgroup_channel_read_permission = subgroup_channel_read + parent_group->subgroup_channel_read_permission permission subgroup_channel_delete_permission = subgroup_channel_delete + parent_group->subgroup_channel_delete_permission @@ -225,7 +225,7 @@ definition group { permission subgroup_channel_remove_role_users_permission = subgroup_channel_remove_role_users + parent_group->subgroup_channel_remove_role_users_permission permission subgroup_channel_view_role_users_permission = subgroup_channel_view_role_users + parent_group->subgroup_channel_view_role_users_permission - // Group channels permisssion + // Group channels permission permission channel_create_permission = channel_create + parent_group->subgroup_channel_create_permission + domain->channel_create_permission permission channel_update_permission = channel_update + parent_group->subgroup_channel_update + domain->channel_update_permission permission channel_read_permission = channel_read + parent_group->subgroup_channel_read + domain->channel_read_permission @@ -244,7 +244,7 @@ definition group { } definition domain { - //Replace platoform with organization in future + //Replace platform with organization in future relation organization: platform relation team: team @@ -310,7 +310,16 @@ definition domain { permission remove_role_users_permission = remove_role_users + team->domain_remove_role_users + organization->admin permission view_role_users_permission = view_role_users + team->domain_view_role_users + organization->admin - permission membership = read + update + enable + disable + delete + manage_role + add_role_users + remove_role_users + view_role_users + permission membership = read + update + enable + disable + delete + + manage_role + add_role_users + remove_role_users + view_role_users + + client_create + channel_create + group_create + + client_update + client_read + client_delete + client_set_parent_group + client_connect_to_channel + + client_manage_role + client_add_role_users + client_remove_role_users + client_view_role_users + + channel_update + channel_read + channel_delete + channel_set_parent_group + channel_connect_to_client + channel_publish + channel_subscribe + + channel_manage_role + channel_add_role_users + channel_remove_role_users + channel_view_role_users + + group_update + group_membership + group_read + group_delete + group_set_child + group_set_parent + + group_manage_role + group_add_role_users + group_remove_role_users + group_view_role_users + permission admin = read & update & enable & disable & delete & manage_role & add_role_users & remove_role_users & view_role_users permission client_create_permission = client_create + team->client_create + organization->admin @@ -355,7 +364,7 @@ definition domain { } -// Add this realtion and permission in future while adding orgnaization +// Add this relation and permission in future while adding organization definition team { relation organization: organization relation parent_team: team diff --git a/domains/domains.go b/domains/domains.go index a601608491..5df2acf23b 100644 --- a/domains/domains.go +++ b/domains/domains.go @@ -97,11 +97,13 @@ func (s *Status) UnmarshalJSON(data []byte) error { type Metadata map[string]interface{} type DomainReq struct { - Name *string `json:"name,omitempty"` - Metadata *Metadata `json:"metadata,omitempty"` - Tags *[]string `json:"tags,omitempty"` - Alias *string `json:"alias,omitempty"` - Status *Status `json:"status,omitempty"` + Name *string `json:"name,omitempty"` + Metadata *Metadata `json:"metadata,omitempty"` + Tags *[]string `json:"tags,omitempty"` + Alias *string `json:"alias,omitempty"` + Status *Status `json:"status,omitempty"` + UpdatedBy *string `json:"updated_by,omitempty"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` } type Domain struct { ID string `json:"id"` @@ -188,7 +190,7 @@ type Repository interface { RetrieveAllByIDs(ctx context.Context, pm Page) (DomainsPage, error) // Update updates the client name and metadata. - Update(ctx context.Context, id string, userID string, d DomainReq) (Domain, error) + Update(ctx context.Context, id string, d DomainReq) (Domain, error) // Delete Delete(ctx context.Context, id string) error diff --git a/domains/mocks/repository.go b/domains/mocks/repository.go index 54674fe453..4ea469638b 100644 --- a/domains/mocks/repository.go +++ b/domains/mocks/repository.go @@ -611,9 +611,9 @@ func (_m *Repository) Save(ctx context.Context, d domains.Domain) (domains.Domai return r0, r1 } -// Update provides a mock function with given fields: ctx, id, userID, d -func (_m *Repository) Update(ctx context.Context, id string, userID string, d domains.DomainReq) (domains.Domain, error) { - ret := _m.Called(ctx, id, userID, d) +// Update provides a mock function with given fields: ctx, id, d +func (_m *Repository) Update(ctx context.Context, id string, d domains.DomainReq) (domains.Domain, error) { + ret := _m.Called(ctx, id, d) if len(ret) == 0 { panic("no return value specified for Update") @@ -621,17 +621,17 @@ func (_m *Repository) Update(ctx context.Context, id string, userID string, d do var r0 domains.Domain var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, domains.DomainReq) (domains.Domain, error)); ok { - return rf(ctx, id, userID, d) + if rf, ok := ret.Get(0).(func(context.Context, string, domains.DomainReq) (domains.Domain, error)); ok { + return rf(ctx, id, d) } - if rf, ok := ret.Get(0).(func(context.Context, string, string, domains.DomainReq) domains.Domain); ok { - r0 = rf(ctx, id, userID, d) + if rf, ok := ret.Get(0).(func(context.Context, string, domains.DomainReq) domains.Domain); ok { + r0 = rf(ctx, id, d) } else { r0 = ret.Get(0).(domains.Domain) } - if rf, ok := ret.Get(1).(func(context.Context, string, string, domains.DomainReq) error); ok { - r1 = rf(ctx, id, userID, d) + if rf, ok := ret.Get(1).(func(context.Context, string, domains.DomainReq) error); ok { + r1 = rf(ctx, id, d) } else { r1 = ret.Error(1) } diff --git a/domains/postgres/domains.go b/domains/postgres/domains.go index 8610576774..d6cd2be831 100644 --- a/domains/postgres/domains.go +++ b/domains/postgres/domains.go @@ -294,41 +294,46 @@ func (repo domainRepo) ListDomains(ctx context.Context, pm domains.Page) (domain } // Update updates the client name and metadata. -func (repo domainRepo) Update(ctx context.Context, id, userID string, dr domains.DomainReq) (domains.Domain, error) { +func (repo domainRepo) Update(ctx context.Context, id string, dr domains.DomainReq) (domains.Domain, error) { var query []string var upq string - var ws string = "AND status = :status" d := domains.Domain{ID: id} if dr.Name != nil && *dr.Name != "" { - query = append(query, "name = :name, ") + query = append(query, "name = :name") d.Name = *dr.Name } if dr.Metadata != nil { - query = append(query, "metadata = :metadata, ") + query = append(query, "metadata = :metadata") d.Metadata = *dr.Metadata } if dr.Tags != nil { - query = append(query, "tags = :tags, ") + query = append(query, "tags = :tags") d.Tags = *dr.Tags } if dr.Status != nil { - ws = "" - query = append(query, "status = :status, ") + query = append(query, "status = :status") d.Status = *dr.Status } if dr.Alias != nil { - query = append(query, "alias = :alias, ") + query = append(query, "alias = :alias") d.Alias = *dr.Alias } d.UpdatedAt = time.Now() - d.UpdatedBy = userID + if dr.UpdatedAt != nil { + query = append(query, "updated_at = :updated_at") + d.UpdatedAt = *dr.UpdatedAt + } + if dr.UpdatedBy != nil { + query = append(query, "updated_by = :updated_by") + d.UpdatedAt = *dr.UpdatedAt + } if len(query) > 0 { - upq = strings.Join(query, " ") + upq = strings.Join(query, ", ") } - q := fmt.Sprintf(`UPDATE domains SET %s updated_at = :updated_at, updated_by = :updated_by - WHERE id = :id %s + q := fmt.Sprintf(`UPDATE domains SET %s + WHERE id = :id RETURNING id, name, tags, alias, metadata, created_at, updated_at, updated_by, created_by, status;`, - upq, ws) + upq) dbd, err := toDBDomain(d) if err != nil { diff --git a/domains/postgres/domains_test.go b/domains/postgres/domains_test.go index 426fcdc1a1..1c4cc8c229 100644 --- a/domains/postgres/domains_test.go +++ b/domains/postgres/domains_test.go @@ -561,7 +561,7 @@ func TestUpdate(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - d, err := repo.Update(context.Background(), tc.domainID, userID, tc.d) + d, err := repo.Update(context.Background(), tc.domainID, tc.d) d.UpdatedAt = tc.response.UpdatedAt assert.Equal(t, tc.response, d, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.response, d)) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) diff --git a/domains/service.go b/domains/service.go index 1483f048e3..7f6cba1342 100644 --- a/domains/service.go +++ b/domains/service.go @@ -111,7 +111,10 @@ func (svc service) RetrieveDomain(ctx context.Context, session authn.Session, id } func (svc service) UpdateDomain(ctx context.Context, session authn.Session, id string, d DomainReq) (Domain, error) { - dom, err := svc.repo.Update(ctx, id, session.UserID, d) + updatedAt := time.Now() + d.UpdatedAt = &updatedAt + d.UpdatedBy = &session.UserID + dom, err := svc.repo.Update(ctx, id, d) if err != nil { return Domain{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } @@ -120,7 +123,8 @@ func (svc service) UpdateDomain(ctx context.Context, session authn.Session, id s func (svc service) EnableDomain(ctx context.Context, session authn.Session, id string) (Domain, error) { status := EnabledStatus - dom, err := svc.repo.Update(ctx, id, session.UserID, DomainReq{Status: &status}) + updatedAt := time.Now() + dom, err := svc.repo.Update(ctx, id, DomainReq{Status: &status, UpdatedBy: &session.UserID, UpdatedAt: &updatedAt}) if err != nil { return Domain{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } @@ -133,7 +137,8 @@ func (svc service) EnableDomain(ctx context.Context, session authn.Session, id s func (svc service) DisableDomain(ctx context.Context, session authn.Session, id string) (Domain, error) { status := DisabledStatus - dom, err := svc.repo.Update(ctx, id, session.UserID, DomainReq{Status: &status}) + updatedAt := time.Now() + dom, err := svc.repo.Update(ctx, id, DomainReq{Status: &status, UpdatedBy: &session.UserID, UpdatedAt: &updatedAt}) if err != nil { return Domain{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } @@ -147,7 +152,8 @@ func (svc service) DisableDomain(ctx context.Context, session authn.Session, id // Only SuperAdmin can freeze the domain. func (svc service) FreezeDomain(ctx context.Context, session authn.Session, id string) (Domain, error) { status := FreezeStatus - dom, err := svc.repo.Update(ctx, id, session.UserID, DomainReq{Status: &status}) + updatedAt := time.Now() + dom, err := svc.repo.Update(ctx, id, DomainReq{Status: &status, UpdatedBy: &session.UserID, UpdatedAt: &updatedAt}) if err != nil { return Domain{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } diff --git a/groups/postgres/groups.go b/groups/postgres/groups.go index d9155004d5..5202ebf9ef 100644 --- a/groups/postgres/groups.go +++ b/groups/postgres/groups.go @@ -721,106 +721,172 @@ func (repo groupRepository) retrieveGroups(ctx context.Context, domainID, userID func (repo groupRepository) userGroupsBaseQuery(domainID, userID string) string { return fmt.Sprintf(` - WITH direct_groups AS ( +WITH direct_groups AS ( +SELECT + g.*, + gr.entity_id AS entity_id, + grm.member_id AS member_id, + gr.id AS role_id, + gr."name" AS role_name, + array_agg(gra."action") AS actions +FROM + groups_role_members grm +JOIN + groups_role_actions gra ON gra.role_id = grm.role_id +JOIN + groups_roles gr ON gr.id = grm.role_id +JOIN + "groups" g ON g.id = gr.entity_id +WHERE + grm.member_id = '%s' + AND g.domain_id = '%s' +GROUP BY + gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id +), +direct_groups_with_subgroup AS ( SELECT - g.*, - gr.entity_id AS entity_id, - grm.member_id AS member_id, - gr.id AS role_id, - gr."name" AS role_name, - array_agg(gra."action") AS actions + * + FROM direct_groups + WHERE EXISTS ( + SELECT 1 + FROM unnest(direct_groups.actions) AS action + WHERE action LIKE 'subgroup_%%' + ) +), +indirect_child_groups AS ( + SELECT + DISTINCT indirect_child_groups.id as child_id, + indirect_child_groups.*, + dgws.id as access_provider_id, + dgws.role_id as access_provider_role_id, + dgws.role_name as access_provider_role_name, + dgws.actions as access_provider_role_actions + FROM + direct_groups_with_subgroup dgws + JOIN + groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path -- Finds all children of entity_id based on ltree path + WHERE + indirect_child_groups.domain_id = '%s' + AND + NOT EXISTS ( -- Ensures that the indirect_child_groups.id is not already in the direct_groups_with_subgroup table + SELECT 1 + FROM direct_groups_with_subgroup dgws + WHERE dgws.id = indirect_child_groups.id + ) +), +direct_indirect_groups as ( + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + role_id, + role_name, + actions, + 'direct' AS access_type, + '' AS access_provider_id, + '' AS access_provider_role_id, + '' AS access_provider_role_name, + array[]::::text[] AS access_provider_role_actions + FROM + direct_groups + UNION + SELECT + id, + parent_id, + domain_id, + "name", + description, + metadata, + created_at, + updated_at, + updated_by, + status, + "path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'indirect' AS access_type, + access_provider_id, + access_provider_role_id, + access_provider_role_name, + access_provider_role_actions + FROM + indirect_child_groups +), +final_groups AS ( + SELECT + dig.id, + dig.parent_id, + dig.domain_id, + dig."name", + dig.description, + dig.metadata, + dig.created_at, + dig.updated_at, + dig.updated_by, + dig.status, + dig."path", + dig.role_id, + dig.role_name, + dig.actions, + dig.access_type, + dig.access_provider_id, + dig.access_provider_role_id, + dig.access_provider_role_name, + dig.access_provider_role_actions + FROM + direct_indirect_groups as dig + UNION + SELECT + dg.id, + dg.parent_id, + dg.domain_id, + dg."name", + dg.description, + dg.metadata, + dg.created_at, + dg.updated_at, + dg.updated_by, + dg.status, + dg."path", + '' AS role_id, + '' AS role_name, + array[]::::text[] AS actions, + 'domain' AS access_type, + d.id AS access_provider_id, + dr.id AS access_provider_role_id, + dr."name" AS access_provider_role_name, + array_agg(dra."action") as actions FROM - groups_role_members grm + domains_role_members drm + JOIN + domains_role_actions dra ON dra.role_id = drm.role_id JOIN - groups_role_actions gra ON gra.role_id = grm.role_id + domains_roles dr ON dr.id = drm.role_id JOIN - groups_roles gr ON gr.id = grm.role_id + domains d ON d.id = dr.entity_id JOIN - "groups" g ON g.id = gr.entity_id + "groups" dg ON dg.domain_id = d.id WHERE - grm.member_id = '%s' - AND g.domain_id = '%s' - GROUP BY - gr.entity_id, grm.member_id, gr.id, gr."name", g."path", g.id - ), - direct_groups_with_subgroup AS ( - SELECT - * - FROM direct_groups - WHERE EXISTS ( - SELECT 1 - FROM unnest(direct_groups.actions) AS action - WHERE action LIKE 'subgroup_%%' + drm.member_id = '%s' -- user_id + AND d.id = '%s' -- domain_id + AND dra."action" LIKE 'group_%%' + AND NOT EXISTS ( -- Ensures that the direct and indirect groups are not in included. + SELECT 1 FROM direct_indirect_groups dig + WHERE dig.id = dg.id ) - ), - indirect_child_groups AS ( - SELECT - DISTINCT indirect_child_groups.id as child_id, - indirect_child_groups.*, - dgws.id as access_provider_id, - dgws.role_id as access_provider_role_id, - dgws.role_name as access_provider_role_name, - dgws.actions as access_provider_role_actions - FROM - direct_groups_with_subgroup dgws - JOIN - groups indirect_child_groups ON indirect_child_groups.path <@ dgws.path -- Finds all children of entity_id based on ltree path - WHERE - indirect_child_groups.domain_id = '%s' - AND - NOT EXISTS ( -- Ensures that the indirect_child_groups.id is not already in the direct_groups_with_subgroup table - SELECT 1 - FROM direct_groups_with_subgroup dgws - WHERE dgws.id = indirect_child_groups.id - ) - ), - final_groups as ( - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - role_id, - role_name, - actions, - 'direct' AS access_type, - '' AS access_provider_id, - '' AS access_provider_role_id, - '' AS access_provider_role_name, - array[]::::text[] AS access_provider_role_actions - FROM - direct_groups - UNION - SELECT - id, - parent_id, - domain_id, - "name", - description, - metadata, - created_at, - updated_at, - updated_by, - status, - "path", - '' AS role_id, - '' AS role_name, - array[]::::text[] AS actions, - 'indirect' AS access_type, - access_provider_id, - access_provider_role_id, - access_provider_role_name, - access_provider_role_actions - FROM - indirect_child_groups - )`, userID, domainID, domainID) + GROUP BY + dg.id, d.id, dr.id +) + `, userID, domainID, domainID, userID, domainID) } func buildQuery(gm groups.PageMeta, ids ...string) string { diff --git a/groups/postgres/init.go b/groups/postgres/init.go index ed9ed56dd7..9ad9b63dc6 100644 --- a/groups/postgres/init.go +++ b/groups/postgres/init.go @@ -4,6 +4,7 @@ package postgres import ( + dpostgres "github.com/absmach/supermq/domains/postgres" "github.com/absmach/supermq/pkg/errors" repoerr "github.com/absmach/supermq/pkg/errors/repository" rolesPostgres "github.com/absmach/supermq/pkg/roles/repo/postgres" @@ -59,5 +60,11 @@ func Migration() (*migrate.MemoryMigrationSource, error) { groupsMigration.Migrations = append(groupsMigration.Migrations, rolesMigration.Migrations...) + domainsMigrations, err := dpostgres.Migration() + if err != nil { + return nil, err + } + groupsMigration.Migrations = append(groupsMigration.Migrations, domainsMigrations.Migrations...) + return groupsMigration, nil } diff --git a/pkg/domains/events/consumer/decode.go b/pkg/domains/events/consumer/decode.go new file mode 100644 index 0000000000..bc6db48d25 --- /dev/null +++ b/pkg/domains/events/consumer/decode.go @@ -0,0 +1,271 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package consumer + +import ( + "fmt" + "time" + + "github.com/absmach/supermq/domains" + "github.com/absmach/supermq/pkg/errors" + "github.com/absmach/supermq/pkg/roles" + rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer" +) + +const ( + layout = "2006-01-02T15:04:05.999999Z" +) + +var ( + errDecodeCreateDomainEvent = errors.New("failed to decode domain create event") + errDecodeUpdateDomainEvent = errors.New("failed to decode domain update event") + errDecodeEnableDomainEvent = errors.New("failed to decode domain enable event") + errDecodeDisableDomainEvent = errors.New("failed to decode domain disable event") + errDecodeFreezeDomainEvent = errors.New("failed to decode domain freeze event") + errDecodeRemoveDomainsEvent = errors.New("failed to decode domain remove event") + + errID = errors.New("missing or invalid 'id'") + errName = errors.New("missing or invalid 'name'") + errAlias = errors.New("missing or invalid 'alias'") + errTags = errors.New("invalid 'tags'") + errStatus = errors.New("missing or invalid 'status'") + errConvertStatus = errors.New("failed to convert status") + errCreatedBy = errors.New("missing or invalid 'created_by'") + errCreatedAt = errors.New("failed to parse 'created_at' time") + errUpdatedAt = errors.New("failed to parse 'updated_at' time") +) + +func ToDomains(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errID + } + d.ID = id + + name, ok := data["name"].(string) + if !ok { + return domains.Domain{}, errName + } + d.Name = name + + stat, ok := data["status"].(string) + if !ok { + return domains.Domain{}, errStatus + } + st, err := domains.ToStatus(stat) + if err != nil { + return domains.Domain{}, errors.Wrap(errConvertStatus, err) + } + d.Status = st + + alias, ok := data["alias"].(string) + if !ok { + return domains.Domain{}, errAlias + } + d.Alias = alias + + cby, ok := data["created_by"].(string) + if !ok { + return domains.Domain{}, errCreatedBy + } + d.CreatedBy = cby + + cat, ok := data["created_at"].(string) + if !ok { + return domains.Domain{}, errCreatedAt + } + ct, err := time.Parse(layout, cat) + if err != nil { + return domains.Domain{}, errors.Wrap(errCreatedAt, err) + } + d.CreatedAt = ct + + // Following fields of groups are allowed to be empty. + itags, ok := data["tags"].([]interface{}) + if ok { + tags, err := rconsumer.ToStrings(itags) + if err != nil { + return domains.Domain{}, errors.Wrap(errTags, err) + } + d.Tags = tags + } + + meta, ok := data["metadata"].(map[string]interface{}) + if ok { + d.Metadata = meta + } + + uby, ok := data["updated_by"].(string) + if ok { + d.UpdatedBy = uby + } + + uat, ok := data["updated_at"].(string) + if ok { + ut, err := time.Parse(layout, uat) + if err != nil { + return domains.Domain{}, errors.Wrap(errUpdatedAt, err) + } + d.UpdatedAt = ut + } + + return d, nil +} + +func decodeCreateDomainEvent(data map[string]interface{}) (domains.Domain, []roles.RoleProvision, error) { + d, err := ToDomains(data) + if err != nil { + return domains.Domain{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateDomainEvent, err) + } + irps, ok := data["roles_provisioned"].([]interface{}) + if !ok { + return domains.Domain{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateDomainEvent, errors.New("missing or invalid 'roles_provisioned'")) + } + rps, err := rconsumer.ToRoleProvisions(irps) + if err != nil { + return domains.Domain{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateDomainEvent, err) + } + + return d, rps, nil +} + +func decodeUpdateDomainEvent(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errors.Wrap(errDecodeUpdateDomainEvent, errID) + } + d.ID = id + + name, ok := data["name"].(string) + if ok { + d.Name = name + } + + alias, ok := data["alias"].(string) + if ok { + d.Alias = alias + } + + itags, ok := data["tags"].([]interface{}) + if ok { + tags, err := rconsumer.ToStrings(itags) + if err != nil { + return domains.Domain{}, errors.Wrap(errDecodeUpdateDomainEvent, err) + } + d.Tags = tags + } + + meta, ok := data["metadata"].(map[string]interface{}) + if ok { + d.Metadata = meta + } + + uby, ok := data["updated_by"].(string) + if ok { + d.UpdatedBy = uby + } + + uat, ok := data["updated_at"].(string) + if ok { + ut, err := time.Parse(layout, uat) + if err != nil { + return domains.Domain{}, errors.Wrap(errDecodeUpdateDomainEvent, errors.Wrap(errUpdatedAt, err)) + } + d.UpdatedAt = ut + } + + return d, nil +} + +func decodeEnableDomainEvent(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errors.Wrap(errDecodeEnableDomainEvent, errID) + } + d.ID = id + + uby, ok := data["updated_by"].(string) + if ok { + d.UpdatedBy = uby + } + + uat, ok := data["updated_at"].(string) + if ok { + ut, err := time.Parse(layout, uat) + if err != nil { + return domains.Domain{}, errors.Wrap(errDecodeEnableDomainEvent, errors.Wrap(errUpdatedAt, err)) + } + d.UpdatedAt = ut + } + + return d, nil +} + +func decodeDisableDomainEvent(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errors.Wrap(errDecodeDisableDomainEvent, errID) + } + d.ID = id + + uby, ok := data["updated_by"].(string) + if ok { + d.UpdatedBy = uby + } + + uat, ok := data["updated_at"].(string) + if ok { + ut, err := time.Parse(layout, uat) + if err != nil { + return domains.Domain{}, errors.Wrap(errDecodeDisableDomainEvent, errors.Wrap(errUpdatedAt, err)) + } + d.UpdatedAt = ut + } + + return d, nil +} + +func decodeFreezeDomainEvent(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errors.Wrap(errDecodeFreezeDomainEvent, errID) + } + d.ID = id + + uby, ok := data["updated_by"].(string) + if ok { + d.UpdatedBy = uby + } + + uat, ok := data["updated_at"].(string) + if ok { + ut, err := time.Parse(layout, uat) + if err != nil { + return domains.Domain{}, errors.Wrap(errDecodeFreezeDomainEvent, errors.Wrap(errUpdatedAt, err)) + } + d.UpdatedAt = ut + } + + return d, nil +} + +func decodeUserDeleteDomainEvent(_ map[string]interface{}) (domains.Domain, error) { + return domains.Domain{}, fmt.Errorf("not implemented decode domain user delete event ") +} + +func decodeDeleteDomainEvent(data map[string]interface{}) (domains.Domain, error) { + var d domains.Domain + id, ok := data["id"].(string) + if !ok { + return domains.Domain{}, errors.Wrap(errDecodeRemoveDomainsEvent, errID) + } + d.ID = id + return d, nil +} diff --git a/pkg/domains/events/consumer/doc.go b/pkg/domains/events/consumer/doc.go new file mode 100644 index 0000000000..a99efc936e --- /dev/null +++ b/pkg/domains/events/consumer/doc.go @@ -0,0 +1,4 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package consumer diff --git a/pkg/domains/events/consumer/stream.go b/pkg/domains/events/consumer/stream.go new file mode 100644 index 0000000000..ec89234b40 --- /dev/null +++ b/pkg/domains/events/consumer/stream.go @@ -0,0 +1,234 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package consumer + +import ( + "context" + "fmt" + "log/slog" + + "github.com/absmach/supermq/domains" + "github.com/absmach/supermq/pkg/errors" + "github.com/absmach/supermq/pkg/events" + "github.com/absmach/supermq/pkg/events/store" + "github.com/absmach/supermq/pkg/messaging" + rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer" +) + +const ( + stream = "events.supermq.domains" + + create = "domain.create" + update = "domain.update" + enable = "domain.enable" + disable = "domain.disable" + freeze = "domain.freeze" + delete = "domain.delete" + userDelete = "domain.user_delete" + addRole = "domain.role.add" + removeRole = "domain.role.remove" + updateRole = "domain.role.update" + addRoleActions = "domain.role.actions.add" + removeRoleActions = "domain.role.actions.remove" + removeAllRoleActions = "domain.role.actions.remove_all" + addRoleMembers = "domain.role.members.add" + removeRoleMembers = "domain.role.members.remove" + removeRoleAllMembers = "domain.role.members.remove_all" + removeMemberFromAllRoles = "domain.role.members.remove_from_all_roles" +) + +var ( + errNoOperationKey = errors.New("operation key is not found in event message") + errCreateDomainEvent = errors.New("failed to consume domain create event") + errUpdateDomainEvent = errors.New("failed to consume domain update event") + errEnableDomainGroupEvent = errors.New("failed to consume domain enable event") + errDisableDomainGroupEvent = errors.New("failed to consume domain disable event") + errFreezeDomainGroupEvent = errors.New("failed to consume domain freeze event") + errUserDeleteDomainEvent = errors.New("failed to consume domain user delete event") + errDeleteDomainEvent = errors.New("failed to consume domain delete event") +) + +type eventHandler struct { + repo domains.Repository + rolesEventHandler rconsumer.EventHandler +} + +func DomainsEventsSubscribe(ctx context.Context, repo domains.Repository, esURL, esConsumerName string, logger *slog.Logger) error { + subscriber, err := store.NewSubscriber(ctx, esURL, logger) + if err != nil { + return err + } + + subConfig := events.SubscriberConfig{ + Stream: stream, + Consumer: esConsumerName, + Handler: NewEventHandler(repo), + DeliveryPolicy: messaging.DeliverNewPolicy, + Ordered: true, + } + return subscriber.Subscribe(ctx, subConfig) +} + +// NewEventHandler returns new event store handler. +func NewEventHandler(repo domains.Repository) events.EventHandler { + reh := rconsumer.NewEventHandler("domain", repo) + return &eventHandler{ + repo: repo, + rolesEventHandler: reh, + } +} + +func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { + msg, err := event.Encode() + if err != nil { + return err + } + + op, ok := msg["operation"] + + if !ok { + return errNoOperationKey + } + switch op { + case create: + return es.createDomainHandler(ctx, msg) + case update: + return es.updateDomainHandler(ctx, msg) + case enable: + return es.enableDomainHandler(ctx, msg) + case disable: + return es.disableDomainHandler(ctx, msg) + case freeze: + return es.freezeDomainHandler(ctx, msg) + case userDelete: + return es.userDeleteDomainHandler(ctx, msg) + case delete: + return es.deleteDomainHandler(ctx, msg) + case addRole: + return es.rolesEventHandler.AddEntityRoleHandler(ctx, msg) + case updateRole: + return es.rolesEventHandler.UpdateEntityRoleHandler(ctx, msg) + case removeRole: + return es.rolesEventHandler.RemoveEntityRoleHandler(ctx, msg) + case addRoleActions: + return es.rolesEventHandler.AddEntityRoleActionsHandler(ctx, msg) + case removeRoleActions: + return es.rolesEventHandler.RemoveEntityRoleActionsHandler(ctx, msg) + case removeAllRoleActions: + return es.rolesEventHandler.RemoveAllEntityRoleActionsHandler(ctx, msg) + case addRoleMembers: + return es.rolesEventHandler.AddEntityRoleMembersHandler(ctx, msg) + case removeRoleMembers: + return es.rolesEventHandler.RemoveEntityRoleMembersHandler(ctx, msg) + case removeRoleAllMembers: + return es.rolesEventHandler.RemoveAllEntityRoleMembersHandler(ctx, msg) + case removeMemberFromAllRoles: + return es.rolesEventHandler.RemoveMemberFromAllEntityHandler(ctx, msg) + } + return nil +} + +func (es *eventHandler) createDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, rps, err := decodeCreateDomainEvent(data) + if err != nil { + return errors.Wrap(errCreateDomainEvent, err) + } + + if _, err := es.repo.Save(ctx, d); err != nil { + return errors.Wrap(errCreateDomainEvent, err) + } + if _, err := es.repo.AddRoles(ctx, rps); err != nil { + return errors.Wrap(errCreateDomainEvent, err) + } + + return nil +} + +func (es *eventHandler) updateDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, err := decodeUpdateDomainEvent(data) + if err != nil { + return errors.Wrap(errUpdateDomainEvent, err) + } + + if _, err := es.repo.Update( + ctx, + d.ID, + domains.DomainReq{ + Name: &d.Name, + Metadata: &d.Metadata, + Alias: &d.Alias, + Tags: &d.Tags, + UpdatedBy: &d.UpdatedBy, + UpdatedAt: &d.UpdatedAt, + }, + ); err != nil { + return errors.Wrap(errUpdateDomainEvent, err) + } + + return nil +} + +func (es *eventHandler) enableDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, err := decodeEnableDomainEvent(data) + if err != nil { + return errors.Wrap(errEnableDomainGroupEvent, err) + } + + enabled := domains.EnabledStatus + if _, err := es.repo.Update(ctx, d.ID, domains.DomainReq{Status: &enabled, UpdatedBy: &d.UpdatedBy, UpdatedAt: &d.UpdatedAt}); err != nil { + return errors.Wrap(errEnableDomainGroupEvent, err) + } + + return nil +} + +func (es *eventHandler) disableDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, err := decodeDisableDomainEvent(data) + if err != nil { + return errors.Wrap(errDisableDomainGroupEvent, err) + } + + disabled := domains.DisabledStatus + if _, err := es.repo.Update(ctx, d.ID, domains.DomainReq{Status: &disabled, UpdatedBy: &d.UpdatedBy, UpdatedAt: &d.UpdatedAt}); err != nil { + return errors.Wrap(errDisableDomainGroupEvent, err) + } + + return nil +} + +func (es *eventHandler) freezeDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, err := decodeFreezeDomainEvent(data) + if err != nil { + return errors.Wrap(errFreezeDomainGroupEvent, err) + } + + freeze := domains.FreezeStatus + if _, err := es.repo.Update(ctx, d.ID, domains.DomainReq{Status: &freeze, UpdatedBy: &d.UpdatedBy, UpdatedAt: &d.UpdatedAt}); err != nil { + return errors.Wrap(errFreezeDomainGroupEvent, err) + } + + return nil +} + +func (es *eventHandler) userDeleteDomainHandler(_ context.Context, data map[string]interface{}) error { + _, err := decodeUserDeleteDomainEvent(data) + if err != nil { + return errors.Wrap(errUserDeleteDomainEvent, err) + } + + return fmt.Errorf("not implemented user delete domain handler") +} + +func (es *eventHandler) deleteDomainHandler(ctx context.Context, data map[string]interface{}) error { + d, err := decodeDeleteDomainEvent(data) + if err != nil { + return errors.Wrap(errDeleteDomainEvent, err) + } + + if err := es.repo.Delete(ctx, d.ID); err != nil { + return errors.Wrap(errDeleteDomainEvent, err) + } + + return nil +} diff --git a/pkg/events/events.go b/pkg/events/events.go index e695099733..ccbeb0012b 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -6,6 +6,8 @@ package events import ( "context" "time" + + "github.com/absmach/supermq/pkg/messaging" ) const ( @@ -40,10 +42,11 @@ type EventHandler interface { // SubscriberConfig represents event subscriber configuration. type SubscriberConfig struct { - Consumer string - Stream string - Handler EventHandler - Ordered bool + Consumer string + Stream string + Handler EventHandler + Ordered bool + DeliveryPolicy messaging.DeliveryPolicy } // Subscriber specifies event subscription API. diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 2ebb9849f9..72ec9f9131 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -92,7 +92,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon ctx: ctx, logger: es.logger, }, - DeliveryPolicy: messaging.DeliverNewPolicy, + DeliveryPolicy: cfg.DeliveryPolicy, Ordered: cfg.Ordered, } diff --git a/pkg/roles/repo/postgres/init.go b/pkg/roles/repo/postgres/init.go index 83af3bdfd3..ebe1a4da43 100644 --- a/pkg/roles/repo/postgres/init.go +++ b/pkg/roles/repo/postgres/init.go @@ -36,7 +36,7 @@ func Migration(rolesTableNamePrefix, entityTableName, entityIDColumnName string) fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s_role_actions ( role_id VARCHAR(254) NOT NULL, action VARCHAR(254) NOT NULL, - CONSTRAINT %s_role_actions_unique_domain_role_action_constraint UNIQUE ( role_id, action), + CONSTRAINT %s_role_actions_unique_role_action_constraint UNIQUE ( role_id, action), CONSTRAINT %s_role_actions_fk_roles_id FOREIGN KEY(role_id) REFERENCES %s_roles(id) ON DELETE CASCADE );`, rolesTableNamePrefix, rolesTableNamePrefix, rolesTableNamePrefix, rolesTableNamePrefix), diff --git a/pkg/roles/repo/postgres/roles.go b/pkg/roles/repo/postgres/roles.go index bef5902efd..0695da91a0 100644 --- a/pkg/roles/repo/postgres/roles.go +++ b/pkg/roles/repo/postgres/roles.go @@ -103,7 +103,7 @@ type dbRoleMember struct { func toDBRoles(role roles.Role) dbRole { var createdBy *string if role.CreatedBy != "" { - createdBy = &role.UpdatedBy + createdBy = &role.CreatedBy } var createdAt sql.NullTime if role.CreatedAt != (time.Time{}) && !role.CreatedAt.IsZero() { diff --git a/scripts/tables-copy.sh b/scripts/tables-copy.sh new file mode 100755 index 0000000000..6483619791 --- /dev/null +++ b/scripts/tables-copy.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +#### +## The script helps to copy tables from one database to another database +## This script can be used to synchronize domains and domains roles tables to groups, channels and clients database +## and groups and groups roles to channels and clients database +#### + +set -e +set -o pipefail + + +# Define source and target database connection details +SRC_DB_HOST="localhost" +SRC_DB_PORT="6003" +SRC_DB_USER="supermq" +SRC_DB_PASSWORD="supermq" +SRC_DB_NAME="domains" +TABLENAME_PREFIX=domains + +DEST_DB_HOST="localhost" +DEST_DB_PORT="6005" +DEST_DB_USER="supermq" +DEST_DB_PASSWORD="supermq" +DEST_DB_NAME="channels" + +# List of tables to copy +TABLES=("$TABLENAME_PREFIX" $TABLENAME_PREFIX"_roles" $TABLENAME_PREFIX"_role_actions" $TABLENAME_PREFIX"_role_members" ) + + +# Loop through each table and copy data +for TABLE_NAME in "${TABLES[@]}"; do + echo "Copying data from $SRC_DB_NAME.$TABLE_NAME to $DEST_DB_NAME.$TABLE_NAME..." + + # Set the source password and execute the COPY command + PGPASSWORD="$SRC_DB_PASSWORD" psql -h "$SRC_DB_HOST" -p "$SRC_DB_PORT" -U "$SRC_DB_USER" -d "$SRC_DB_NAME" -c "COPY $TABLE_NAME TO STDOUT" | \ + # Set the target password and execute the TRUNCATE table and COPY commands + PGPASSWORD="$DEST_DB_PASSWORD" psql -h "$DEST_DB_HOST" -p "$DEST_DB_PORT" -U "$DEST_DB_USER" -d "$DEST_DB_NAME" -c "TRUNCATE TABLE $TABLE_NAME CASCADE; COPY $TABLE_NAME FROM STDIN" + + # Check for errors + if [ $? -ne 0 ]; then + echo "Error: Failed copy data for table $TABLE_NAME. Exiting." + exit 1 + fi + + echo "Table $TABLE_NAME successfully copied." +done + +echo "All tables copied successfully!"