Skip to content

Commit

Permalink
BATMAN
Browse files Browse the repository at this point in the history
  • Loading branch information
turbobot-temp authored and rin-yato committed Sep 6, 2024
0 parents commit 12c778c
Show file tree
Hide file tree
Showing 59 changed files with 9,409 additions and 0 deletions.
38 changes: 38 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.

# Dependencies
node_modules
.pnp
.pnp.js

# Local env files
.env
.env.local
.env.development.local
.env.test.local
.env.production.local

# Testing
coverage

# Turbo
.turbo

# Vercel
.vercel

# Build Outputs
.next/
out/
build
dist


# Debug
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Misc
.DS_Store
*.pem
Empty file added .npmrc
Empty file.
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"eslint.workingDirectories": [
{
"mode": "auto"
}
]
}
2 changes: 2 additions & 0 deletions apps/api/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# deps
node_modules/
20 changes: 20 additions & 0 deletions apps/api/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "api",
"scripts": {
"dev": "bun run --watch src/index.ts"
},
"dependencies": {
"@bull-board/api": "^5.21.4",
"@bull-board/hono": "^5.21.4",
"@hono/zod-openapi": "^0.16.0",
"@scalar/hono-api-reference": "^0.5.144",
"bullmq": "^5.12.13",
"hono": "^4.5.11",
"ky": "^1.7.2",
"ts-khqr": "^2.1.3",
"zod": "^3.23.8"
},
"devDependencies": {
"@types/bun": "latest"
}
}
40 changes: 40 additions & 0 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { OpenAPIHono } from "@hono/zod-openapi";
import { apiReference } from "@scalar/hono-api-reference";
import { registerTasker } from "./lib/tasker";
import { Route } from "./route";

import { cors } from "hono/cors";
// Tasker
import { TransactionTasker } from "./route/transaction/tasker";

const app = new OpenAPIHono();

// The OpenAPI documentation will be available at /doc
app.doc31("/openapi", {
openapi: "3.1.0",
info: {
version: "1.0.0",
title: "My API",
},
});

app.get(
"/docs",
apiReference({
theme: "purple",
spec: {
url: "/openapi",
},
}),
);

registerTasker(app, [new TransactionTasker()]);

app.use(
"*",
cors({ origin: "*", allowHeaders: ["Content-Type"], allowMethods: ["GET", "POST"] }),
);

app.route("/", Route);

export default app;
21 changes: 21 additions & 0 deletions apps/api/src/lib/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { OpenAPIHono } from "@hono/zod-openapi";
import type { ZodSchema } from "zod";

export const createRoute: OpenAPIHono["openapi"] = (routeConfig, handler) => {
return new OpenAPIHono().openapi(routeConfig, handler);
};

export interface ResponseConfig {
schema: ZodSchema;
description: string;
example?: any;
}

export function response(config: ResponseConfig) {
return {
content: {
"application/json": { schema: config.schema, example: config.example },
},
description: config.description,
};
}
34 changes: 34 additions & 0 deletions apps/api/src/lib/tasker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { HonoAdapter } from "@bull-board/hono";
import type { OpenAPIHono } from "@hono/zod-openapi";
import { Queue, type Worker } from "bullmq";
import { serveStatic } from "hono/bun";

export function registerTasker(app: OpenAPIHono, taskers: Tasker[]) {
for (const tasker of taskers) {
tasker.start();
}

const serverAdapter = new HonoAdapter(serveStatic);

createBullBoard({
queues: taskers.map((tasker) => new BullMQAdapter(new Queue(tasker.worker.name))),
serverAdapter,
});

serverAdapter.setBasePath("/queue");

app.route("/queue", serverAdapter.registerPlugin());
}

export const DEFAULT_CONNECTION = {
host: "127.0.0.1",
port: 6379,
};

export interface Tasker {
worker: Worker;
start(): Promise<void>;
shutdown(): Promise<void>;
}
9 changes: 9 additions & 0 deletions apps/api/src/route/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { OpenAPIHono } from "@hono/zod-openapi";
import { logger } from "hono/logger";
import { TransactionRoute } from "./transaction";
import { UserRoute } from "./user";

export const Route = new OpenAPIHono()
.use(logger())
.route("/", UserRoute)
.route("/", TransactionRoute);
1 change: 1 addition & 0 deletions apps/api/src/route/transaction/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const BAKONG_API_URL = "https://api-bakong.nbc.gov.kh";
9 changes: 9 additions & 0 deletions apps/api/src/route/transaction/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { OpenAPIHono } from "@hono/zod-openapi";
import { createTransaction } from "./route/transaction.create";
import { getTransactionByMd5 } from "./route/transaction.get-by-md5";
import { trackTransaction } from "./route/transaction.track";

export const TransactionRoute = new OpenAPIHono()
.route("/", createTransaction)
.route("/", trackTransaction)
.route("/", getTransactionByMd5);
29 changes: 29 additions & 0 deletions apps/api/src/route/transaction/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { DEFAULT_CONNECTION } from "@/lib/tasker";
import { Queue } from "bullmq";

export const TRANSACTION_QUEUE_NAME = "{transaction}";

export class TransactionQueue {
queue;

constructor() {
this.queue = new Queue(TRANSACTION_QUEUE_NAME, {
connection: DEFAULT_CONNECTION,
});
}

async add(md5: string) {
await this.queue.add(
md5,
{ md5 },
{
jobId: md5,
attempts: 60,
delay: 5000,
backoff: { type: "fixed", delay: 3000 },
},
);
}
}

export const transactionQueue = new TransactionQueue();
37 changes: 37 additions & 0 deletions apps/api/src/route/transaction/route/transaction.create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { createRoute, response } from "@/lib/route";
import { transactionServcie } from "@/service/transaction.service";
import { z } from "@hono/zod-openapi";
import { transactionQueue } from "../queue";

export const createTransaction = createRoute(
{
method: "post",
path: "/transaction/create",
request: {
body: { content: { "application/json": { schema: z.object({ amount: z.number() }) } } },
},
responses: {
200: response({
description: "Transaction created",
schema: z.object({ data: z.any() }),
}),
400: response({
description: "Error",
schema: z.object({ error: z.string() }),
}),
},
},
async (c) => {
const { amount } = await c.req.json();

const transaction = transactionServcie.createTransaction(amount);

if (!transaction.data) {
return c.json({ error: "sth went wrong" }, 400);
}

// await transactionQueue.add(transaction.data.md5);

return c.json({ data: transaction.data });
},
);
22 changes: 22 additions & 0 deletions apps/api/src/route/transaction/route/transaction.get-by-md5.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { response } from "@/lib/route";
import { transactionServcie } from "@/service/transaction.service";
import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi";

export const getTransactionByMd5 = new OpenAPIHono().openapi(
createRoute({
method: "get",
path: "/transaction/get-by-md5/{md5}",
request: { params: z.object({ md5: z.string() }) },
responses: {
200: response({
schema: z.object({ data: z.any() }),
description: "Transaction details",
}),
},
}),
async (c) => {
const md5 = c.req.param("md5");
const transaction = await transactionServcie.getTransactionByMd5(md5);
return c.json({ data: transaction });
},
);
52 changes: 52 additions & 0 deletions apps/api/src/route/transaction/route/transaction.track.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { OpenAPIHono, createRoute } from "@hono/zod-openapi";
import { streamSSE } from "hono/streaming";
import { z } from "zod";
import { transactionQueue } from "../queue";

export const trackTransaction = new OpenAPIHono().openapi(
createRoute({
path: "/transaction/track/{md5}",
method: "get",
request: { params: z.object({ md5: z.string() }) },
responses: {
200: { description: "Track a transaction" },
},
}),
async (c) => {
const md5 = c.req.param("md5");

if (md5 === undefined) {
return c.json({ error: "MD5 is required" }, 400);
}

return streamSSE(
c,
async (stream) => {
stream.onAbort(async () => {
stream.abort();
await stream.close();
});

while (true) {
const job = await transactionQueue.queue.getJob(md5);
const isCompleted = (await job?.isCompleted()) ?? false;
const isFailed = (await job?.isFailed()) ?? false;

if (isCompleted) {
await stream.writeSSE({ data: "COMPLETED" });
await stream.close();
} else if (isFailed) {
await stream.writeSSE({ data: "FAILED" });
await stream.close();
}

await stream.writeSSE({ data: "PENDING" });
await stream.sleep(3000);
}
},
async (error, stream) => {
await stream.writeln(`ERROR: ${error.message}`);
},
);
},
);
57 changes: 57 additions & 0 deletions apps/api/src/route/transaction/tasker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { transactionServcie } from "@/service/transaction.service";
import { type Job, UnrecoverableError, Worker } from "bullmq";
import ky from "ky";
import { DEFAULT_CONNECTION, type Tasker } from "../../lib/tasker";
import { BAKONG_API_URL } from "./config";
import { TRANSACTION_QUEUE_NAME } from "./queue";
import type { TransactionSuccess } from "./type";

export class TransactionTasker implements Tasker {
worker;

constructor() {
this.worker = new Worker(TRANSACTION_QUEUE_NAME, this.process, {
autorun: false,
connection: DEFAULT_CONNECTION,
concurrency: 20,
});

this.worker.on("ready", () => {
console.log("Transaction worker is ready");
});

this.worker.on("closing", () => {
console.log("Transaction worker is closing");
});
}

async start() {
await this.worker.run();
}

async shutdown() {
await this.worker.close();
}

async process(job: Job<{ md5: string }>) {
// if the job age exceeds 5mn, it will be removed
if (job.timestamp + 300000 < Date.now()) {
throw new UnrecoverableError("Job is too old");
}

const transactionStatus = await transactionServcie.getTransactionByMd5(job.data.md5);

if (transactionStatus.responseCode === 1) {
if (transactionStatus.errorCode === 3) {
// transaction failed
throw new UnrecoverableError(transactionStatus.responseMessage);
}

// not found error
throw new Error(transactionStatus.responseMessage);
}

await job.updateProgress(100);
job.log("Transaction is successful");
}
}
Loading

0 comments on commit 12c778c

Please sign in to comment.