Skip to content

Commit

Permalink
refactor(gate): use stream during artifact upload to s3 (#841)
Browse files Browse the repository at this point in the history
<!--
Pull requests are squashed and merged using:
- their title as the commit message
- their description as the commit body

Having a good title and description is important for the users to get
readable changelog.
-->

<!-- 1. Explain WHAT the change is about -->

-

<!-- 2. Explain WHY the change cannot be made simpler -->

-

<!-- 3. Explain HOW users should update their code -->

#### Migration notes

...

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
destifo authored Sep 16, 2024
1 parent 060cd80 commit 70a4d49
Show file tree
Hide file tree
Showing 8 changed files with 463 additions and 236 deletions.
419 changes: 228 additions & 191 deletions .ghjk/lock.json

Large diffs are not rendered by default.

194 changes: 194 additions & 0 deletions deno.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
"@std/path": "jsr:@std/path@^1.0.2",
"@std/path/": "jsr:/@std/path@^1.0.2/",
"@std/uuid": "jsr:@std/uuid@^1.0.1",
"aws-sdk/client-s3": "https://esm.sh/@aws-sdk/[email protected]?pin=v131",
"aws-sdk/s3-request-presigner": "https://esm.sh/@aws-sdk/[email protected]?pin=v131",
"aws-sdk/client-s3": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"aws-sdk/lib-storage": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"aws-sdk/s3-request-presigner": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"dispose": "https://deno.land/x/[email protected]/mod.ts",
"graphql": "npm:[email protected]",
"jwt": "https://deno.land/x/[email protected]/mod.ts",
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/runtimes/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import { Runtime } from "./Runtime.ts";
import type { ComputeStage } from "../engine/query_engine.ts";
import type { RuntimeInitParams } from "../types.ts";
// import { iterParentStages, JSONValue } from "../utils.ts";
import {
GetObjectCommand,
type GetObjectCommandInput,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/services/artifact_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class ArtifactService {
}

// TODO key?
const hash = await this.store.persistence.save(stream);
const hash = await this.store.persistence.save(stream, meta.sizeInBytes);
if (hash !== meta.hash) {
await this.store.persistence.delete(hash);
logger.warn("hash mismatch: {} {}", hash, meta.hash);
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/artifacts/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class LocalArtifactPersistence implements ArtifactPersistence {
await Deno.remove(this.dirs.artifacts, { recursive: true });
}

async save(stream: ReadableStream): Promise<string> {
async save(stream: ReadableStream, size: number): Promise<string> {
const tmpFile = await Deno.makeTempFile({ dir: this.dirs.temp });
const file = await Deno.open(tmpFile, { write: true, truncate: true });
const hasher = createHash("sha256");
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/artifacts/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export type ArtifactMeta = z.infer<typeof artifactMetaSchema>;

export interface ArtifactPersistence extends AsyncDisposable {
dirs: Dirs;
save(stream: ReadableStream): Promise<string>;
save(stream: ReadableStream, size: number): Promise<string>;
delete(hash: string): Promise<void>;
has(hash: string): Promise<boolean>;
/** Fetch the artifact to local file system and returns the path */
Expand Down
74 changes: 35 additions & 39 deletions src/typegate/src/typegate/artifacts/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getLogger } from "../../log.ts";
// deno-lint-ignore no-external-import
import { createHash } from "node:crypto";
import type { TypegateCryptoKeys } from "../../crypto.ts";
import { S3 } from "aws-sdk/client-s3";
import { S3, S3Client } from "aws-sdk/client-s3";
import type {
ArtifactPersistence,
RefCounter,
Expand All @@ -20,6 +20,7 @@ import { exists } from "@std/fs/exists";
import { dirname } from "@std/path";
import { chunk } from "@std/collections/chunk";
import { ArtifactError } from "./mod.ts";
import { Upload } from "aws-sdk/lib-storage";

const logger = getLogger(import.meta);

Expand Down Expand Up @@ -52,13 +53,14 @@ class SharedArtifactPersistence implements ArtifactPersistence {
): Promise<SharedArtifactPersistence> {
const localShadow = await LocalArtifactPersistence.init(baseDir);
const s3 = new S3(syncConfig.s3);
return new SharedArtifactPersistence(localShadow, s3, syncConfig.s3Bucket);
return new SharedArtifactPersistence(localShadow, s3, syncConfig.s3Bucket, syncConfig);
}

constructor(
private localShadow: LocalArtifactPersistence,
private s3: S3,
private s3Bucket: string,
private syncConfig: SyncConfig
) {}

get dirs() {
Expand All @@ -70,51 +72,45 @@ class SharedArtifactPersistence implements ArtifactPersistence {
this.s3.destroy();
}

async save(stream: ReadableStream<any>): Promise<string> {
async save(stream: ReadableStream<any>, size: number): Promise<string> {
const hasher = createHash("sha256");

// TODO compatibility with Node.js streams?
// const stream2 = stream.pipeThrough(new HashTransformStream(hasher));
//
// const tempKey = resolveS3Key(
// `tmp/${Math.random().toString(36).substring(2)}`,
// );
//
// const _ = await this.s3.putObject({
// Bucket: this.s3Bucket,
// Body: stream2,
// Key: tempKey,
// });
// const hash = hasher.digest("hex");
//
// await this.s3.copyObject({
// Bucket: this.s3Bucket,
// CopySource: tempKey,
// Key: resolveS3Key(hash),
// });
//
// await this.s3.deleteObject({
// Bucket: this.s3Bucket,
// Key: tempKey,
// });
//
// return hash;

const tmpFile = await Deno.makeTempFile({ dir: this.dirs.temp });
const file = await Deno.open(tmpFile, { write: true, truncate: true });
await stream
.pipeThrough(new HashTransformStream(hasher))
.pipeTo(file.writable);
const stream2 = stream.pipeThrough(new HashTransformStream(hasher));

// temporary key is needed as we won't be able to get the hash sum of the file,
// which we use as the key of the object,
// before going through whole stream.
// so we create a temporary key to store the file/object and then copy the object after we have computed the hash.
const tempKey = resolveS3Key(this.s3Bucket,
`tmp/${Math.random().toString(36).substring(2)}`,
);

const upload = new Upload({
client: new S3Client(this.syncConfig.s3),
params: {
Bucket: this.s3Bucket,
Key: tempKey,
Body: stream2,
ContentLength: size,
},
});

const _ = await upload.done();

const hash = hasher.digest("hex");
const body = await Deno.readFile(tmpFile);
logger.info(`persisting artifact to S3: ${hash}`);
const _ = await this.s3.putObject({

await this.s3.copyObject({
Bucket: this.s3Bucket,
Body: body,
CopySource: `${this.s3Bucket}/${tempKey}`,
Key: resolveS3Key(this.s3Bucket, hash),
});


await this.s3.deleteObject({
Bucket: this.s3Bucket,
Key: tempKey,
});

return hash;
}

Expand Down

0 comments on commit 70a4d49

Please sign in to comment.