diff --git a/apps/transport/lib/S3/aggregates_uploader.ex b/apps/transport/lib/S3/aggregates_uploader.ex new file mode 100644 index 0000000000..75cf406ff1 --- /dev/null +++ b/apps/transport/lib/S3/aggregates_uploader.ex @@ -0,0 +1,85 @@ +defmodule Transport.S3.AggregatesUploader do + @moduledoc """ + Helpers to upload a file, computes its sha256, and update a "latest" file. + """ + + @spec upload_aggregate!(Path.t(), String.t(), String.t()) :: :ok + @doc """ + Example + + file = mk_tmp_file() + File.write(file, "some relevant data") + upload_aggregate!(file, "aggregate-20250127193035.csv", "aggregate-latest.csv") + """ + def upload_aggregate!(file, remote_file, remote_latest_file) do + checksum_file = sha256!(file) + + try do + upload_files!(file, checksum_file, remote_file) + |> update_latest_files!(remote_latest_file) + after + :ok = File.rm(checksum_file) + end + end + + def with_tmp_file(cb) do + file = mk_tmp_file() + + try do + cb.(file) + after + :ok = File.rm(file) + end + end + + defp mk_tmp_file, do: System.tmp_dir!() |> Path.join(Ecto.UUID.generate()) + + defp sha256!(file) do + hash_state = :crypto.hash_init(:sha256) + + hash = + File.stream!(file, 2048) + |> Enum.reduce(hash_state, fn chunk, prev_state -> + :crypto.hash_update(prev_state, chunk) + end) + |> :crypto.hash_final() + |> Base.encode16() + |> String.downcase() + + tmp_file = mk_tmp_file() + + :ok = File.write(tmp_file, hash) + + tmp_file + end + + defp upload_files!(file, checksum_file, remote_file) do + remote_checksum_file = checksum_filename(remote_file) + + stream_upload!(file, remote_file) + stream_upload!(checksum_file, remote_checksum_file) + + {remote_file, remote_checksum_file} + end + + defp update_latest_files!({remote_file, remote_checksum_file}, remote_latest_file) do + remote_latest_checksum_file = checksum_filename(remote_latest_file) + + copy!(remote_file, remote_latest_file) + copy!(remote_checksum_file, remote_latest_checksum_file) + + :ok + end + + defp checksum_filename(base_filename) do + "#{base_filename}.sha256sum" + end + + defp stream_upload!(file, filename) do + Transport.S3.stream_to_s3!(:aggregates, file, filename) + end + + defp copy!(s3_path, filename) do + Transport.S3.remote_copy_file!(:aggregates, s3_path, filename) + end +end diff --git a/apps/transport/lib/S3/s3.ex b/apps/transport/lib/S3/s3.ex index 64c17a93af..0a3b5b14ec 100644 --- a/apps/transport/lib/S3/s3.ex +++ b/apps/transport/lib/S3/s3.ex @@ -3,7 +3,7 @@ defmodule Transport.S3 do This module contains common code related to S3 object storage. """ require Logger - @type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos + @type bucket_feature :: :history | :on_demand_validation | :gtfs_diff | :logos | :aggregates @spec bucket_name(bucket_feature()) :: binary() def bucket_name(feature) do @@ -55,4 +55,12 @@ defmodule Transport.S3 do |> ExAws.S3.download_file(remote_path, local_path) |> Transport.Wrapper.ExAWS.impl().request!() end + + @spec remote_copy_file!(bucket_feature(), binary(), binary()) :: any() + def remote_copy_file!(feature, remote_path_src, remote_path_dest) do + bucket = Transport.S3.bucket_name(feature) + + ExAws.S3.put_object_copy(bucket, remote_path_dest, bucket, remote_path_src) + |> Transport.Wrapper.ExAWS.impl().request!() + end end diff --git a/apps/transport/lib/jobs/stops_registry_snapshot_job.ex b/apps/transport/lib/jobs/stops_registry_snapshot_job.ex index 79df69089c..2d82739965 100644 --- a/apps/transport/lib/jobs/stops_registry_snapshot_job.ex +++ b/apps/transport/lib/jobs/stops_registry_snapshot_job.ex @@ -2,14 +2,25 @@ defmodule Transport.Jobs.StopsRegistrySnapshotJob do @moduledoc """ Job in charge of building a snapshot of the stops registry. """ - use Oban.Worker, unique: [period: {1, :days}], tags: ["registry"], max_attempts: 3 require Logger + import Transport.S3.AggregatesUploader @impl Oban.Worker - def perform(_job) do - file = "#{System.tmp_dir!()}/registre-arrets.csv" + def perform(%Oban.Job{}) do + with_tmp_file(fn file -> + :ok = Transport.Registry.Engine.execute(file) + + upload_aggregate!( + file, + "stops_registry_#{timestamp()}.csv", + "stops_registry_latest.csv" + ) + end) + end - Transport.Registry.Engine.execute(file) + defp timestamp do + DateTime.utc_now() + |> Calendar.strftime("%Y%m%d.%H%M%S.%f") end end diff --git a/apps/transport/test/support/s3_test_utils.ex b/apps/transport/test/support/s3_test_utils.ex index 7f3bbcec26..65024ea49e 100644 --- a/apps/transport/test/support/s3_test_utils.ex +++ b/apps/transport/test/support/s3_test_utils.ex @@ -32,6 +32,19 @@ defmodule Transport.Test.S3TestUtils do end) end + def s3_mock_stream_file(path: expected_path, bucket: expected_bucket, acl: expected_acl) do + Transport.ExAWS.Mock + |> expect(:request!, fn %ExAws.S3.Upload{ + src: %File.Stream{}, + bucket: ^expected_bucket, + path: ^expected_path, + opts: [acl: ^expected_acl], + service: :s3 + } -> + :ok + end) + end + def s3_mocks_delete_object(expected_bucket, expected_path) do Transport.ExAWS.Mock |> expect(:request!, fn %ExAws.Operation.S3{ @@ -43,4 +56,18 @@ defmodule Transport.Test.S3TestUtils do :ok end) end + + def s3_mocks_remote_copy_file(expected_bucket, expected_src_path, expected_dest_path) do + Transport.ExAWS.Mock + |> expect(:request!, fn %ExAws.Operation.S3{ + bucket: ^expected_bucket, + path: ^expected_dest_path, + http_method: :put, + service: :s3, + headers: headers + } -> + assert Map.get(headers, "x-amz-copy-source") =~ "/#{expected_bucket}/#{expected_src_path}" + %{body: %{}} + end) + end end diff --git a/apps/transport/test/transport/S3/aggregates_uploader_test.exs b/apps/transport/test/transport/S3/aggregates_uploader_test.exs new file mode 100644 index 0000000000..bcea579516 --- /dev/null +++ b/apps/transport/test/transport/S3/aggregates_uploader_test.exs @@ -0,0 +1,26 @@ +defmodule Transport.S3.AggregatesUploaderTest do + use ExUnit.Case, async: true + + alias Transport.S3.AggregatesUploader + alias Transport.Test.S3TestUtils + + test "export to S3" do + snapshot = "aggregate-20250127193035.csv" + latest = "aggregate-latest.csv" + checksum = "#{snapshot}.sha256sum" + latest_checksum = "#{latest}.sha256sum" + + bucket_name = Transport.S3.bucket_name(:aggregates) + + S3TestUtils.s3_mock_stream_file(path: snapshot, bucket: bucket_name, acl: :private) + S3TestUtils.s3_mock_stream_file(path: checksum, bucket: bucket_name, acl: :private) + S3TestUtils.s3_mocks_remote_copy_file(bucket_name, snapshot, latest) + S3TestUtils.s3_mocks_remote_copy_file(bucket_name, checksum, latest_checksum) + + AggregatesUploader.with_tmp_file(fn file -> + File.write(file, "some relevant data") + + :ok = AggregatesUploader.upload_aggregate!(file, snapshot, latest) + end) + end +end diff --git a/config/dev.exs b/config/dev.exs index 8b94631bdf..87b43ab8a1 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -72,7 +72,8 @@ config :transport, history: "resource-history-dev", on_demand_validation: "on-demand-validation-dev", gtfs_diff: "gtfs-diff-dev", - logos: "logos-dev" + logos: "logos-dev", + aggregates: "aggregates-dev" } config :oauth2, Datagouvfr.Authentication, diff --git a/config/prod.exs b/config/prod.exs index 86270024ca..ce7cd51bb1 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -9,7 +9,8 @@ config :transport, history: "resource-history-prod", on_demand_validation: "on-demand-validation-prod", gtfs_diff: "gtfs-diff-prod", - logos: "logos-prod" + logos: "logos-prod", + aggregates: "aggregates-prod" } # Configure Sentry for production and staging. diff --git a/config/runtime.exs b/config/runtime.exs index a66f63cc89..cee3647d15 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -94,7 +94,8 @@ if app_env == :staging do history: "resource-history-staging", on_demand_validation: "on-demand-validation-staging", gtfs_diff: "gtfs-diff-staging", - logos: "logos-staging" + logos: "logos-staging", + aggregates: "aggregates-staging" } end diff --git a/config/test.exs b/config/test.exs index e5ee64d501..6b0a67fef3 100644 --- a/config/test.exs +++ b/config/test.exs @@ -45,7 +45,8 @@ config :transport, history: "resource-history-test", on_demand_validation: "on-demand-validation-test", gtfs_diff: "gtfs-diff-test", - logos: "logos-test" + logos: "logos-test", + aggregates: "aggregates-test" }, workflow_notifier: Transport.Jobs.Workflow.ProcessNotifier, export_secret_key: "fake_export_secret_key",