From b7e4e5e0272c8c51a2a8770b382ef88c43c4683f Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Wed, 9 Oct 2024 14:47:17 +0800 Subject: [PATCH 1/3] feat: PPT-1517 Add Azure Storage support --- OPENAPI_DOC.yml | 17 +++- shard.lock | 20 +++-- spec/controllers/uploads_spec.cr | 92 +++++++++++++++++++++ src/placeos-rest-api/controllers/uploads.cr | 43 ++++++++-- 4 files changed, 151 insertions(+), 21 deletions(-) diff --git a/OPENAPI_DOC.yml b/OPENAPI_DOC.yml index 454a6ba2..06848b06 100644 --- a/OPENAPI_DOC.yml +++ b/OPENAPI_DOC.yml @@ -17998,7 +17998,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil_' + $ref: '#/components/schemas/NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__body__String___Nil_' 409: description: Conflict content: @@ -18227,7 +18227,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/_NamedTuple_ok__Bool____NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__' + $ref: '#/components/schemas/_NamedTuple_ok__Bool____NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__body__String___Nil__' 409: description: Conflict content: @@ -26586,7 +26586,7 @@ components: - type - signature - residence - ? NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil_ + ? NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__body__String___Nil_ : type: object properties: type: @@ -26609,6 +26609,9 @@ components: upload_id: type: string nullable: true + body: + type: string + nullable: true required: - type - signature @@ -26635,6 +26638,9 @@ components: part: type: integer format: Int32 + block_id: + type: string + nullable: true required: - md5 - part @@ -26648,7 +26654,7 @@ components: part_update: type: boolean nullable: true - ? _NamedTuple_ok__Bool____NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__ + ? _NamedTuple_ok__Bool____NamedTuple_type__Symbol__signature__NamedTuple_verb__String__url__String__headers__Hash_String__String____upload_id__String___Nil__body__String___Nil__ : anyOf: - type: object properties: @@ -26678,6 +26684,9 @@ components: upload_id: type: string nullable: true + body: + type: string + nullable: true required: - type - signature diff --git a/shard.lock b/shard.lock index 2b176ed5..547455f5 100644 --- a/shard.lock +++ b/shard.lock @@ -29,6 +29,10 @@ shards: git: https://github.com/taylorfinnell/awscr-signer.git version: 0.8.2 + azblob: + git: https://github.com/spider-gazelle/azblob.cr.git + version: 0.1.0+git.commit.16b822289fca1703cafc9dc9206733e86656e01c + backtracer: git: https://github.com/sija/backtracer.cr.git version: 1.2.2 @@ -63,7 +67,7 @@ shards: csuuid: git: https://github.com/wyhaines/csuuid.cr.git - version: 1.0.1+git.commit.4cb8656a9214aede9c1840cad4acf8e55e658f2f + version: 1.0.2 db: git: https://github.com/crystal-lang/crystal-db.git @@ -163,7 +167,7 @@ shards: nbchannel: git: https://github.com/wyhaines/nbchannel.cr.git - version: 0.1.0+git.commit.a8f5be6aa198abfa9f1893e1156640b8ea526094 + version: 0.1.0 neuroplastic: git: https://github.com/spider-gazelle/neuroplastic.git @@ -183,7 +187,7 @@ shards: opentelemetry-api: git: https://github.com/wyhaines/opentelemetry-api.cr.git - version: 0.5.0 + version: 0.5.1 opentelemetry-instrumentation: git: https://github.com/wyhaines/opentelemetry-instrumentation.cr.git @@ -191,7 +195,7 @@ shards: opentelemetry-sdk: # Overridden git: https://github.com/wyhaines/opentelemetry-sdk.cr.git - version: 0.6.1+git.commit.addc3c740d5ea8e61ffd9500fe32ebf21210d66c + version: 0.6.3+git.commit.470e34105727b039aee2bb3650e907bf6eefc971 pars: # Overridden git: https://github.com/spider-gazelle/pars.git @@ -235,7 +239,7 @@ shards: placeos-driver: git: https://github.com/placeos/driver.git - version: 7.2.4 + version: 7.2.14 placeos-frontend-loader: git: https://github.com/placeos/frontend-loader.git @@ -335,7 +339,7 @@ shards: time-ext: git: https://github.com/wyhaines/time-ext.cr.git - version: 0.1.0+git.commit.175f658235fb6cdc9c804cb96da510fec27f4cd6 + version: 1.0.1 timecop: git: https://github.com/crystal-community/timecop.cr.git @@ -347,7 +351,7 @@ shards: tracer: git: https://github.com/wyhaines/tracer.cr.git - version: 0.3.1 + version: 0.3.2 ulid: # Overridden git: https://github.com/place-labs/ulid.git @@ -355,7 +359,7 @@ shards: upload-signer: git: https://github.com/spider-gazelle/upload-signer.git - version: 0.1.0+git.commit.4c7baf3fc72ca15035c827e16f2c8a15b5f39246 + version: 0.2.0+git.commit.fdf8a1b2886006777efd01d69450c2028e0cf21e webmock: git: https://github.com/manastech/webmock.cr.git diff --git a/spec/controllers/uploads_spec.cr b/spec/controllers/uploads_spec.cr index a5741747..84860676 100644 --- a/spec/controllers/uploads_spec.cr +++ b/spec/controllers/uploads_spec.cr @@ -172,5 +172,97 @@ module PlaceOS::Api headers: Spec::Authentication.headers(sys_admin: false)) resp.status_code.should eq(403) end + + it "should properly handle azure storage for direct uploads" do + storage = Model::Generator.storage(type: PlaceOS::Model::Storage::Type::Azure) + storage.access_key = "myteststorage" + storage.access_secret = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + storage.save! + params = { + "file_name" => "some_file_name.jpg", + "file_size" => "500", + "file_id" => "some_file_md5_hash", + "file_mime" => "image/jpeg", + "public" => false, + "permissions" => "admin", + } + + resp = client.post(Uploads.base_route, + body: params.to_json, + headers: Spec::Authentication.headers) + + resp.status_code.should eq(200) + info = JSON.parse(resp.body).as_h + info["type"].should eq("direct_upload") + sig = info["signature"].as_h + sig["verb"].as_s.should eq("PUT") + sig["url"].as_s.should_not be_nil + upload = Model::Upload.find?(info["upload_id"].as_s) + upload.should_not be_nil + end + + it "should properly handle azure storage for chunked uploads" do + storage = Model::Generator.storage(type: PlaceOS::Model::Storage::Type::Azure) + storage.access_key = "myteststorage" + storage.access_secret = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + storage.save! + params = { + "file_name" => "some_file_name.jpg", + "file_size" => (258 * 1024 * 1024).to_s, + "file_id" => "some_file_md5_hash", + "file_mime" => "image/jpeg", + "public" => false, + "permissions" => "admin", + } + + resp = client.post(Uploads.base_route, + body: params.to_json, + headers: Spec::Authentication.headers) + + resp.status_code.should eq(200) + info = JSON.parse(resp.body).as_h + info["type"].should eq("chunked_upload") + info["residence"].should eq("AzureStorage") + sig = info["signature"].as_h + sig["verb"].as_s.should eq("PUT") + sig["url"].as_s.size.should eq(0) + upload = Model::Upload.find!(info["upload_id"].as_s) + + params = { + "part" => Base64.strict_encode(UUID.random.to_s), + "file_id" => "some_file_md5_hash", + } + + pinfo = Uploads::PartInfo.new(params["file_id"], 1, params["part"]) + uinfo = Uploads::UpdateInfo.new(params["file_id"], 1, "some-random-resumable-id", [pinfo], [1], false) + + resp = client.patch( + path: "#{Uploads.base_route}/#{upload.id}?#{HTTP::Params.encode(params)}", + body: uinfo.to_json, + headers: Spec::Authentication.headers) + + resp.status_code.should eq(200) + info = JSON.parse(resp.body).as_h + info["type"].should eq("part_upload") + sig = info["signature"].as_h + sig["verb"].as_s.should eq("PUT") + sig["url"].as_s.size.should be > 0 + uri = URI.parse(sig["url"].as_s) + uri.host.should eq(sprintf("%s.blob.core.windows.net", "myteststorage")) + qparams = URI::Params.parse(uri.query || "") + qparams["blockid"].should eq(params["part"]) + + params = { + "part" => "finish", + } + resp = client.get( + path: "#{Uploads.base_route}/#{upload.id}/edit?#{HTTP::Params.encode(params)}", + headers: Spec::Authentication.headers) + + resp.status_code.should eq(200) + info = JSON.parse(resp.body).as_h + info["type"].should eq("finish") + info["body"].should_not be_nil + end end end diff --git a/src/placeos-rest-api/controllers/uploads.cr b/src/placeos-rest-api/controllers/uploads.cr index 75045f03..acff71d4 100644 --- a/src/placeos-rest-api/controllers/uploads.cr +++ b/src/placeos-rest-api/controllers/uploads.cr @@ -2,6 +2,7 @@ require "mime" require "upload-signer" require "placeos-models/storage" require "placeos-models/upload" +require "xml" require "./application" module PlaceOS::Api @@ -40,12 +41,12 @@ module PlaceOS::Api raise Error::NotFound.new(ex.message || "Authority storage configuration not found") end end - @signer = UploadSigner::AmazonS3.new(storage.access_key, storage.decrypt_secret, storage.region, endpoint: storage.endpoint) + @signer = UploadSigner.signer(UploadSigner::StorageType.from_value(storage.storage_type.value), storage.access_key, storage.decrypt_secret, storage.region, endpoint: storage.endpoint) end getter! authority : ::PlaceOS::Model::Authority? getter! storage : ::PlaceOS::Model::Storage? - getter! signer : UploadSigner::AmazonS3? + getter! signer : UploadSigner::Storage? getter! current_upload : ::PlaceOS::Model::Upload? # returns the list of uploads for current domain authority @@ -192,8 +193,9 @@ module PlaceOS::Api end end - s3 = UploadSigner::AmazonS3.new(storage.access_key, storage.decrypt_secret, storage.region, endpoint: storage.endpoint) - object_url = s3.get_object(storage.bucket_name, current_upload.object_key, expiry * 60) + us = UploadSigner.signer(UploadSigner::StorageType.from_value(storage.storage_type.value), storage.access_key, storage.decrypt_secret, storage.region, endpoint: storage.endpoint) + + object_url = us.get_object(storage.bucket_name, current_upload.object_key, expiry * 60) redirect_to object_url, status: :see_other end @@ -211,24 +213,37 @@ module PlaceOS::Api ) : NamedTuple( type: Symbol, signature: NamedTuple(verb: String, url: String, headers: Hash(String, String)), - upload_id: String | Nil) + upload_id: String | Nil, body: String | Nil) if (resumable_id = current_upload.resumable_id) && current_upload.resumable if part.strip == "finish" s3 = signer.commit_file(storage.bucket_name, current_upload.object_key, resumable_id, get_headers(current_upload)) - {type: :finish, signature: s3, upload_id: current_upload.id} + finish_body = nil + if storage.storage_type == PlaceOS::Model::Storage::Type::Azure + if part_data = current_upload.part_data + block_ids = [] of String + parts = part_data.keys.sort! + parts.each do |ppart| + block_ids << part_data[ppart].as_h["block_id"].as_s + end + finish_body = block_list_xml(block_ids) + else + raise AC::Route::Param::ValueError.new("missing part_data information. Required for AzureStorage") + end + end + {type: :finish, signature: s3, upload_id: current_upload.id, body: finish_body} else unless md5 = file_id raise AC::Route::Param::ValueError.new("Missing MD5 hash of file part", "file_id", "required except for the `finish` part") end s3 = signer.set_part(storage.bucket_name, current_upload.object_key, current_upload.file_size, md5, part, resumable_id, get_headers(current_upload)) - {type: :part_upload, signature: s3, upload_id: current_upload.id} + {type: :part_upload, signature: s3, upload_id: current_upload.id, body: nil} end else raise AC::Route::Param::ValueError.new("upload is not resumable, no part available") end end - record PartInfo, md5 : String, part : Int32 do + record PartInfo, md5 : String, part : Int32, block_id : String? do include JSON::Serializable end @@ -248,7 +263,7 @@ module PlaceOS::Api ) : NamedTuple( type: Symbol, signature: NamedTuple(verb: String, url: String, headers: Hash(String, String)), - upload_id: String | Nil) | NamedTuple(ok: Bool) + upload_id: String | Nil, body: String | Nil) | NamedTuple(ok: Bool) raise AC::Route::Param::ValueError.new("upload is not resumable") unless current_upload.resumable if part_list = info.part_list @@ -361,5 +376,15 @@ module PlaceOS::Api end end end + + private def block_list_xml(block_ids) + XML.build(encoding: "UTF-8") do |xml| + xml.element("BlockList") do + block_ids.each do |tag| + xml.element("Latest") { xml.text(tag) } + end + end + end + end end end From 760dba689d3ad71ef272bd146abb565ac4d66d3b Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Mon, 14 Oct 2024 16:53:57 +0800 Subject: [PATCH 2/3] feat: PPT-525 Added scoped signals --- spec/controllers/notifications_spec.cr | 109 +++++++++++++++++- spec/controllers/root_spec.cr | 38 +++++- src/placeos-rest-api/controllers/metadata.cr | 18 +-- .../controllers/notifications.cr | 7 +- src/placeos-rest-api/controllers/root.cr | 9 +- src/placeos-rest-api/controllers/systems.cr | 2 +- src/placeos-rest-api/controllers/users.cr | 2 +- src/placeos-rest-api/controllers/zones.cr | 2 +- 8 files changed, 161 insertions(+), 26 deletions(-) diff --git a/spec/controllers/notifications_spec.cr b/spec/controllers/notifications_spec.cr index 09d8e650..db2c29a9 100644 --- a/spec/controllers/notifications_spec.cr +++ b/spec/controllers/notifications_spec.cr @@ -20,7 +20,7 @@ module PlaceOS::Api result.status_code.should eq 202 end - it "should receive valid payload when google sends change notification" do + it "should receive valid payload when google sends change notification to unscoped destination" do PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! subscription_channel = "4ba78bf0-6a47-11e2-bcfd-0800200c9a66/event" @@ -63,6 +63,50 @@ module PlaceOS::Api subs.terminate end end + + it "should receive valid payload when google sends change notification to scoped destination" do + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/4ba78bf0-6a47-11e2-bcfd-0800200c9a66/event" + + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + result = client.post("#{PushNotifications.base_route}/google", headers: HTTP::Headers{ + "Host" => "localhost", + "Content-Type" => "application/json", + "Content-Length" => "0", + "X-Goog-Channel-ID" => "4ba78bf0-6a47-11e2-bcfd-0800200c9a66", + "X-Goog-Channel-Token" => "398348u3tu83ut8uu38", + "X-Goog-Channel-Expiration" => "Fri, 26 May 2023 01:13:52 GMT", + "X-Goog-Resource-ID" => "ret08u3rv24htgh289g", + "X-Goog-Resource-URI" => "https://www.googleapis.com/calendar/v3/calendars/my_calendar@gmail.com/events", + "X-Goog-Resource-State" => "exists", + "X-Goog-Message-Number" => "1", + }) + result.status_code.should eq 202 + + begin + select + when message = channel.receive + { + "event_type": "updated", + "resource_id": "ret08u3rv24htgh289g", + "resource_uri": "https://www.googleapis.com/calendar/v3/calendars/my_calendar@gmail.com/events", + "subscription_id": "4ba78bf0-6a47-11e2-bcfd-0800200c9a66", + "client_secret": "398348u3tu83ut8uu38", + "expiration_time": 1685063632, + }.to_json.should eq(message) + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end end describe "microsoft push notifications" do @@ -78,7 +122,64 @@ module PlaceOS::Api result.body.should eq(token) end - it "should receive valid payload when microsoft sends change notification" do + it "should receive valid payload when microsoft sends change notification on scoped channel" do + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" + + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + payload = <<-'JSON' + { + "value": [ + { + "subscriptionId": "f37536ac-b308-4bc7-b239-b2b51cd2ff24", + "subscriptionExpirationDateTime": "2023-05-26T23:29:18.2277768+00:00", + "changeType": "created", + "resource": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "resourceData": { + "@odata.type": "#Microsoft.Graph.Event", + "@odata.id": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "@odata.etag": "W/\"DwAAABYAAAAXxlVK8zI/TZLFIn9D86hXAAAEB/jr\"", + "id": "AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=" + }, + "clientState": "secretClientState", + "tenantId": "7f1d0cb7-93b9-405a-8dad-c21703b7af18" + } + ] + } + JSON + result = client.post("#{PushNotifications.base_route}/office365", body: payload, headers: HTTP::Headers{ + "Host" => "localhost", + "Content-Type" => "application/json", + }) + + result.status_code.should eq 202 + + begin + select + when message = channel.receive + { + "event_type": "created", + "resource_id": "AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "resource_uri": "Users/2189c720-90d5-44ff-818b-fe585706ee90/Events/AAMkADlhNjJjN2M1LTJiYWUtNGVhMS04ODEzLTRjNDlmYmZkYWMyYQBGAAAAAAA2241OoLZoSZGqNr4MvSZJBwAXxlVK8zI-TZLFIn9D86hXAAAAAAENAAAXxlVK8zI-TZLFIn9D86hXAAAECHE1AAA=", + "subscription_id": "f37536ac-b308-4bc7-b239-b2b51cd2ff24", + "client_secret": "secretClientState", + "expiration_time": 1685143758, + }.to_json.should eq(message) + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end + + it "should receive valid payload when microsoft sends change notification on unscoped channel" do PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! subscription_channel = "f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" @@ -136,8 +237,8 @@ module PlaceOS::Api end it "should receive valid payload when microsoft sends lifecycle notification" do - PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! - subscription_channel = "f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" + authority = PlaceOS::Model::Authority.find_by_domain("localhost").not_nil! + subscription_channel = "#{authority.id}/f37536ac-b308-4bc7-b239-b2b51cd2ff24/event" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new diff --git a/spec/controllers/root_spec.cr b/spec/controllers/root_spec.cr index 291c2c70..b5b14b74 100644 --- a/spec/controllers/root_spec.cr +++ b/spec/controllers/root_spec.cr @@ -52,7 +52,34 @@ module PlaceOS::Api end describe "POST /signal" do - it "writes an arbitrary payload to a redis subscription" do + it "writes an arbitrary payload to a redis subscription on scoped channel" do + user, headers = Spec::Authentication.authenticated + subscription_channel = "#{user.authority.as(Model::Authority).id}/test" + channel = Channel(String).new + subs = PlaceOS::Driver::Subscriptions.new + + _subscription = subs.channel subscription_channel do |_, message| + channel.send(message) + end + + params = HTTP::Params{"channel" => "test"} + result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: headers) + result.status_code.should eq 200 + + begin + select + when message = channel.receive + message.should eq "hello" + when timeout 2.seconds + raise "timeout" + end + ensure + subs.terminate + end + end + + it "writes an arbitrary payload to a redis subscription on un-scoped channel" do + _, headers = Spec::Authentication.authenticated subscription_channel = "test" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new @@ -62,7 +89,7 @@ module PlaceOS::Api end params = HTTP::Params{"channel" => subscription_channel} - result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: Spec::Authentication.headers) + result = client.post(File.join(Root.base_route, "signal?#{params}"), body: "hello", headers: headers) result.status_code.should eq 200 begin @@ -83,19 +110,20 @@ module PlaceOS::Api end context "guest users" do - _, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) - it "prevented access to non-guest channels " do + _, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) + result = client.post(File.join(Root.base_route, "signal?channel=dummy"), body: "hello", headers: guest_header) result.status_code.should eq 403 end it "allowed access to guest channels" do + guest, guest_header = Spec::Authentication.authentication(sys_admin: false, support: false, scope: [PlaceOS::Model::UserJWT::Scope::GUEST]) subscription_channel = "/guest/dummy" channel = Channel(String).new subs = PlaceOS::Driver::Subscriptions.new - _subscription = subs.channel subscription_channel do |_, message| + _subscription = subs.channel "#{guest.authority.as(Model::Authority).id}#{subscription_channel}" do |_, message| channel.send(message) end diff --git a/src/placeos-rest-api/controllers/metadata.cr b/src/placeos-rest-api/controllers/metadata.cr index 9a3cc935..2bd2e582 100644 --- a/src/placeos-rest-api/controllers/metadata.cr +++ b/src/placeos-rest-api/controllers/metadata.cr @@ -103,17 +103,21 @@ module PlaceOS::Api mutate(parent_id, meta, merge: false) end - SIGNAL_CHANNEL = "placeos/metadata/changed" + UNSCOPED_SIGNAL_CHANNEL = "placeos/metadata/changed" + SCOPED_SIGNAL_CHANNEL = "placeos/%s/metadata/changed" - protected def self.signal_metadata(action : Symbol, metadata) : Nil + protected def self.signal_metadata(authority : String, action : Symbol, metadata) : Nil payload = { action: action, metadata: metadata, }.to_json - Log.info { "signalling #{SIGNAL_CHANNEL} with #{payload.bytesize} bytes" } + Log.info { "signalling #{UNSCOPED_SIGNAL_CHANNEL} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(UNSCOPED_SIGNAL_CHANNEL, payload) - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(SIGNAL_CHANNEL, payload) + signal_channel = sprintf(SCOPED_SIGNAL_CHANNEL, authority) + Log.info { "signalling #{signal_channel} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(signal_channel, payload) end # Find (otherwise create) then update (or patch) the Metadata. @@ -126,7 +130,7 @@ module PlaceOS::Api metadata payload = metadata.interface - spawn { self.class.signal_metadata(:update, payload) } + spawn { self.class.signal_metadata(current_authority.not_nil!.id.to_s, :update, payload) } payload end @@ -142,11 +146,11 @@ module PlaceOS::Api spawn do if metadata_name.empty? - self.class.signal_metadata(:destroy_all, { + self.class.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, { parent_id: parent_id, }) else - self.class.signal_metadata(:destroy, { + self.class.signal_metadata(current_authority.not_nil!.id.to_s, :destroy, { parent_id: parent_id, name: metadata_name, }) diff --git a/src/placeos-rest-api/controllers/notifications.cr b/src/placeos-rest-api/controllers/notifications.cr index f7e292a0..eef8ed50 100644 --- a/src/placeos-rest-api/controllers/notifications.cr +++ b/src/placeos-rest-api/controllers/notifications.cr @@ -50,10 +50,11 @@ module PlaceOS::Api private def signal(notification) notification.notifications.each do |entry| payload = entry.to_payload - path = "placeos/#{entry.subscription_id}/event" - Log.info { "signalling #{path} with #{payload.bytesize} bytes" } - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + ["placeos/#{entry.subscription_id}/event", "placeos/#{current_authority.not_nil!.id}/#{entry.subscription_id}/event"].each do |path| + Log.info { "signalling #{path} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + end end end end diff --git a/src/placeos-rest-api/controllers/root.cr b/src/placeos-rest-api/controllers/root.cr index bb0ccdae..b5536085 100644 --- a/src/placeos-rest-api/controllers/root.cr +++ b/src/placeos-rest-api/controllers/root.cr @@ -208,10 +208,11 @@ module PlaceOS::Api "" end - path = Path["placeos/"].join(channel).to_s - Log.info { "signalling #{path} with #{payload.bytesize} bytes" } - - ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + [Path["placeos/"].join(channel).to_s, Path["placeos/#{current_authority.not_nil!.id}/"].join(channel).to_s].each do |path| + # path = Path["placeos/#{current_authority.not_nil!.id}/"].join(channel).to_s + Log.info { "signalling #{path} with #{payload.bytesize} bytes" } + ::PlaceOS::Driver::RedisStorage.with_redis &.publish(path, payload) + end end # maps the database tables to indexes in elasticsearch diff --git a/src/placeos-rest-api/controllers/systems.cr b/src/placeos-rest-api/controllers/systems.cr index 2a7b2252..eb103276 100644 --- a/src/placeos-rest-api/controllers/systems.cr +++ b/src/placeos-rest-api/controllers/systems.cr @@ -309,7 +309,7 @@ module PlaceOS::Api def destroy : Nil cs_id = current_control_system.id current_control_system.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: cs_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: cs_id}) } end # Return all zones for this system diff --git a/src/placeos-rest-api/controllers/users.cr b/src/placeos-rest-api/controllers/users.cr index 87b9bad3..d0a50e80 100644 --- a/src/placeos-rest-api/controllers/users.cr +++ b/src/placeos-rest-api/controllers/users.cr @@ -263,7 +263,7 @@ module PlaceOS::Api else user_id = user.id user.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: user_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: user_id}) } end end diff --git a/src/placeos-rest-api/controllers/zones.cr b/src/placeos-rest-api/controllers/zones.cr index 83801cfa..ad9c40dd 100644 --- a/src/placeos-rest-api/controllers/zones.cr +++ b/src/placeos-rest-api/controllers/zones.cr @@ -160,7 +160,7 @@ module PlaceOS::Api def destroy : Nil zone_id = current_zone.id current_zone.destroy - spawn { Api::Metadata.signal_metadata(:destroy_all, {parent_id: zone_id}) } + spawn { Api::Metadata.signal_metadata(current_authority.not_nil!.id.to_s, :destroy_all, {parent_id: zone_id}) } end # return metadata associcated with the selected zone From bfc843e3143ff288e318123e9c46ba3ccd4b5f76 Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Thu, 17 Oct 2024 11:50:04 +0800 Subject: [PATCH 3/3] fix(systems): PPT-1545 fix signage request --- src/placeos-rest-api/controllers/systems.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/placeos-rest-api/controllers/systems.cr b/src/placeos-rest-api/controllers/systems.cr index eb103276..0afa291a 100644 --- a/src/placeos-rest-api/controllers/systems.cr +++ b/src/placeos-rest-api/controllers/systems.cr @@ -223,7 +223,7 @@ module PlaceOS::Api end # filter by signage - if public + if signage query.should({ "signage" => [true], })