Skip to content

Commit

Permalink
Implemented async batch annotation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
eugenvoronov committed Jan 29, 2025
1 parent e9aeffa commit 9370bed
Show file tree
Hide file tree
Showing 15 changed files with 1,060 additions and 199 deletions.
4 changes: 2 additions & 2 deletions packages/apps/job-launcher/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ CRON_SECRET="cron-secret"
GOOGLE_PROJECT_ID=
GOOGLE_PRIVATE_KEY=
GOOGLE_CLIENT_EMAIL=
GOOGLE_CLOUD_STORAGE_POSITIVE_ABUSE_RESULTS_BUCKET=
GOOGLE_CLOUD_STORAGE_POSSIBLE_ABUSE_RESULTS_BUCKET=
GCS_TEMP_ASYNC_RESULTS_BUCKET=
GCS_MODERATION_RESULTS_BUCKET=

# Slack
SLACK_ABUSE_NOTIFICATION_WEBHOOK_URL=
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ export const envValidator = Joi.object({
GOOGLE_PROJECT_ID: Joi.string().required(),
GOOGLE_PRIVATE_KEY: Joi.string().required(),
GOOGLE_CLIENT_EMAIL: Joi.string().required(),
GOOGLE_CLOUD_STORAGE_POSITIVE_ABUSE_RESULTS_BUCKET: Joi.string().required(),
GOOGLE_CLOUD_STORAGE_POSSIBLE_ABUSE_RESULTS_BUCKET: Joi.string().required(),
GCS_TEMP_ASYNC_RESULTS_BUCKET: Joi.string().required(),
GCS_MODERATION_RESULTS_BUCKET: Joi.string().required(),
// Slack
SLACK_ABUSE_NOTIFICATION_WEBHOOK_URL: Joi.string().required(),
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ export class VisionConfigService {
constructor(private configService: ConfigService) {}

/**
* The project ID for connecting to the Google Cloud Vision API.
* The Google Cloud Storage (GCS) bucket name where temporary async moderation results will be saved.
* Required
*/
get projectId(): string {
return this.configService.getOrThrow<string>('GOOGLE_PROJECT_ID');
get tempAsyncResultsBucket(): string {
return this.configService.getOrThrow<string>(
'GCS_TEMP_ASYNC_RESULTS_BUCKET',
);
}

/**
* The Google Cloud Storage (GCS) bucket name where moderation results with positive issues will be saved.
* The Google Cloud Storage (GCS) bucket name where moderation results will be saved.
* Required
*/
get positiveAbuseResultsBucket(): string {
get moderationResultsBucket(): string {
return this.configService.getOrThrow<string>(
'GOOGLE_CLOUD_STORAGE_POSITIVE_ABUSE_RESULTS_BUCKET',
'GCS_MODERATION_RESULTS_BUCKET',
);
}

/**
* The Google Cloud Storage (GCS) bucket name where moderation results with possible issues will be saved.
* The project ID for connecting to the Google Cloud Vision API.
* Required
*/
get possibleAbuseResultsBucket(): string {
return this.configService.getOrThrow<string>(
'GOOGLE_CLOUD_STORAGE_POSSIBLE_ABUSE_RESULTS_BUCKET',
);
get projectId(): string {
return this.configService.getOrThrow<string>('GOOGLE_PROJECT_ID');
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ export enum ErrorJobModeration {
ErrorProcessingDataset = 'Error processing dataset',
InappropriateContent = 'Job cannot be processed due to inappropriate content',
ContentModerationFailed = 'Job cannot be processed due to failure in content moderation',
NoDestinationURIFound = 'No destination URI found in the response',
InvalidBucketUrl = 'Invalid bucket URL',
NoResultsFound = 'No results found',
ResultsParsingFailed = 'Results parsing failed',
JobModerationFailed = 'Job moderation failed',
}

/**
Expand Down Expand Up @@ -137,6 +142,8 @@ export enum ErrorBucket {
InvalidRegion = 'Invalid region for the storage provider',
EmptyBucket = 'bucketName cannot be empty',
FailedToFetchBucketContents = 'Failed to fetch bucket contents',
InvalidGCSUrl = 'Invalid Google Cloud Storage URL',
UrlParsingError = 'URL format is valid but cannot be parsed',
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ export const TESTNET_CHAIN_IDS = [
ChainId.POLYGON_AMOY,
ChainId.SEPOLIA,
];
export const MAINNET_CHAIN_IDS = [
ChainId.BSC_MAINNET,
ChainId.POLYGON,
];
export const MAINNET_CHAIN_IDS = [ChainId.BSC_MAINNET, ChainId.POLYGON];

export const SENDGRID_API_KEY_REGEX =
/^SG\.[A-Za-z0-9-_]{22}\.[A-Za-z0-9-_]{43}$/;
Expand Down Expand Up @@ -86,6 +83,8 @@ export const CONTENT_MODERATION_FEATURE = {
SAFE_SEARCH_DETECTION: 'SAFE_SEARCH_DETECTION',
};

export const GS_PROTOCOL = 'gs://';
export const JOB_MODERATION_BATCH_SIZE = 16;
export const JOB_MODERATION_ASYNC_BATCH_SIZE = 100;
export const JOB_MODERATION_MAX_REQUESTS_PER_MINUTE = 1800;
export const ONE_MINUTE_IN_MS = 60000;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export enum CronJobType {
JobModeration = 'job-moderation',
ParseJobModerationResults = 'parse-job-moderation-results',
CreateEscrow = 'create-escrow',
SetupEscrow = 'setup-escrow',
FundEscrow = 'fund-escrow',
Expand Down
1 change: 1 addition & 0 deletions packages/apps/job-launcher/server/src/common/enums/job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export enum JobStatus {
PENDING = 'pending',
PAID = 'paid',
ON_MODERATION = 'on_moderation',
MODERATION_PASSED = 'moderation_passed',
POSSIBLE_ABUSE_IN_REVIEW = 'possible_abuse_in_review',
CREATED = 'created',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ import { AWSRegions, StorageProviders } from '../enums/storage';
import { JobRequestType } from '../enums/job';
import axios from 'axios';
import { StorageDataDto } from '../../modules/job/job.dto';
import { generateBucketUrl, listObjectsInBucket } from './storage';
import {
convertToGCSPath,
convertToHttpUrl,
generateBucketUrl,
isGCSBucketUrl,
listObjectsInBucket,
} from './storage';
import { ControlledError } from '../errors/controlled';
import { ErrorBucket } from '../constants/errors';
import { HttpStatus } from '@nestjs/common';

jest.mock('axios');

Expand Down Expand Up @@ -144,3 +153,79 @@ describe('Storage utils', () => {
});
});
});

describe('GCS URL utils', () => {
describe('isGCSBucketUrl', () => {
it('should return true for a valid GCS HTTP URL', () => {
expect(
isGCSBucketUrl('https://my-bucket.storage.googleapis.com/object.jpg'),
).toBe(true);
});

it('should return true for a valid GCS gs:// URL', () => {
expect(isGCSBucketUrl('gs://my-bucket/object.jpg')).toBe(true);
});

it('should return false for an invalid GCS HTTP URL', () => {
expect(isGCSBucketUrl('https://invalid-url.com/object.jpg')).toBe(false);
});

it('should return false for an invalid gs:// URL', () => {
expect(isGCSBucketUrl('gs:/invalid-bucket/object.jpg')).toBe(false);
});

it('should return false for a completely invalid URL', () => {
expect(isGCSBucketUrl('randomstring')).toBe(false);
});
});

describe('convertToGCSPath', () => {
it('should convert a valid GCS HTTP URL to a gs:// path', () => {
const result = convertToGCSPath(
'https://my-bucket.storage.googleapis.com/object.jpg',
);
expect(result).toBe('gs://my-bucket/object.jpg');
});

it('should convert a valid GCS HTTP URL without an object path to a gs:// bucket path', () => {
const result = convertToGCSPath(
'https://my-bucket.storage.googleapis.com',
);
expect(result).toBe('gs://my-bucket/');
});

it('should throw an error for an invalid GCS URL', () => {
expect(() =>
convertToGCSPath('https://invalid-url.com/object.jpg'),
).toThrow(
new ControlledError(ErrorBucket.InvalidGCSUrl, HttpStatus.BAD_REQUEST),
);
});
});

describe('convertToHttpUrl', () => {
it('should convert a gs:// path to a valid HTTP URL', () => {
const result = convertToHttpUrl('gs://my-bucket/object.jpg');
expect(result).toBe(
'https://my-bucket.storage.googleapis.com/object.jpg',
);
});

it('should convert a gs:// bucket path without an object to an HTTP bucket URL', () => {
const result = convertToHttpUrl('gs://my-bucket/');
expect(result).toBe('https://my-bucket.storage.googleapis.com/');
});

it('should throw an error for an invalid gs:// path', () => {
expect(() => convertToHttpUrl('invalid-gcs-path')).toThrow(
new ControlledError(ErrorBucket.InvalidGCSUrl, HttpStatus.BAD_REQUEST),
);
});

it('should throw an error if the gs:// format is incorrect', () => {
expect(() => convertToHttpUrl('gs:/missing-slash/object.jpg')).toThrow(
new ControlledError(ErrorBucket.InvalidGCSUrl, HttpStatus.BAD_REQUEST),
);
});
});
});
75 changes: 75 additions & 0 deletions packages/apps/job-launcher/server/src/common/utils/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,78 @@ export async function listObjectsInBucket(url: URL): Promise<string[]> {
}
});
}

/**
* Validates if a given URL is a valid Google Cloud Storage URL.
*
* Supported formats:
* - HTTP/HTTPS: https://<bucket>.storage.googleapis.com[/<object_path>]
* - GCS URI: gs://<bucket>[/<object_path>]
*
*
* @param url - The URL to validate.
* @returns {boolean} - Returns true if the URL is valid, otherwise returns false.
*/
export function isGCSBucketUrl(url: string): boolean {
const gcsHttpRegex =
/^https:\/\/([a-zA-Z0-9\-]+)\.storage\.googleapis\.com(?:\/(.*))?$/;
const gcsGsRegex = /^gs:\/\/[a-zA-Z0-9\-]+(\/.*)?$/;

return gcsHttpRegex.test(url) || gcsGsRegex.test(url);
}

/**
* Converts a valid Google Cloud Storage HTTP URL to a GCS path.
*
* @param url - The HTTP URL to convert.
* @returns {string} - The converted GCS path.
* @throws Error - If the URL is not a valid GCS URL.
*/
export function convertToGCSPath(url: string): string {
if (!isGCSBucketUrl(url)) {
throw new ControlledError(
ErrorBucket.InvalidGCSUrl,
HttpStatus.BAD_REQUEST,
);
}

const urlPattern =
/^https:\/\/([a-zA-Z0-9\-]+)\.storage\.googleapis\.com(?:\/(.*))?$/;
const match = url.match(urlPattern);

if (!match) {
throw new ControlledError(
ErrorBucket.UrlParsingError,
HttpStatus.BAD_REQUEST,
);
}

const bucketName = match[1];
const objectPath = match[2] || '';

return `gs://${bucketName}/${objectPath}`;
}

/**
* Converts a GCS path to a valid Google Cloud Storage HTTP URL.
*
* @param gcsPath - The GCS path to convert (e.g., "gs://bucket-name/object-path").
* @returns {string} - The converted HTTP URL.
* @throws Error - If the GCS path is not valid.
*/
export function convertToHttpUrl(gcsPath: string): string {
const gcsPathPattern = /^gs:\/\/([a-zA-Z0-9\-]+)\/?(.*)?$/;
const match = gcsPath.match(gcsPathPattern);

if (!match) {
throw new ControlledError(
ErrorBucket.InvalidGCSUrl,
HttpStatus.BAD_REQUEST,
);
}

const bucketName = match[1];
const objectPath = match[2] || '';

return `https://${bucketName}.storage.googleapis.com/${objectPath}`;
}
Loading

0 comments on commit 9370bed

Please sign in to comment.