Skip to content

Commit

Permalink
H-3707, H-3708: Implement libp2p frontend (#5761)
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp authored Dec 3, 2024
1 parent 863d181 commit d7f2ebd
Show file tree
Hide file tree
Showing 20 changed files with 1,847 additions and 45 deletions.
14 changes: 13 additions & 1 deletion libs/@local/harpc/client/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,19 @@
"test:unit": "vitest --run"
},
"dependencies": {
"effect": "3.11.2"
"@chainsafe/libp2p-noise": "16.0.0",
"@chainsafe/libp2p-yamux": "7.0.1",
"@libp2p/crypto": "5.0.7",
"@libp2p/identify": "3.0.12",
"@libp2p/interface": "2.2.1",
"@libp2p/ping": "2.0.12",
"@libp2p/tcp": "10.0.13",
"@multiformats/multiaddr": "12.3.3",
"effect": "3.11.2",
"it-stream-types": "2.0.2",
"libp2p": "2.3.1",
"multiformats": "13.3.1",
"uint8arraylist": "2.4.8"
},
"devDependencies": {
"@effect/platform": "0.70.2",
Expand Down
51 changes: 51 additions & 0 deletions libs/@local/harpc/client/typescript/src/net/Client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Effect, Function } from "effect";

import { createProto } from "../utils.js";
import * as Connection from "./Connection.js";
import * as internalTransport from "./internal/transport.js";
import type * as Transport from "./Transport.js";

const TypeId: unique symbol = Symbol("@local/harpc-client/net/Client");
export type TypeId = typeof TypeId;

export interface ClientConfig {
transport?: Transport.TransportConfig;
connection?: Connection.ConnectionConfig;
}

export interface Client {
readonly [TypeId]: TypeId;
}

interface ClientImpl extends Client {
readonly client: internalTransport.Transport;
readonly config?: ClientConfig;
}

const ClientProto: Omit<ClientImpl, "client" | "config"> = {
[TypeId]: TypeId,
};

// TODO: add a metrics compatability layer
// see: https://linear.app/hash/issue/H-3712/libp2p-metrics-compatibility-layer
export const make = (config?: ClientConfig) =>
Effect.gen(function* () {
const client = yield* internalTransport.make(config?.transport);

return createProto(ClientProto, {
client,
config,
}) satisfies ClientImpl as Client;
});

export const connect: {
(
address: Transport.Address,
): (self: Client) => Effect.Effect<Connection.Connection>;
(
self: Client,
address: Transport.Address,
): Effect.Effect<Connection.Connection>;
} = Function.dual(2, (self: ClientImpl, address: Transport.Address) =>
Connection.makeUnchecked(self.client, self.config?.connection ?? {}, address),
);
51 changes: 51 additions & 0 deletions libs/@local/harpc/client/typescript/src/net/Config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { Uint8ArrayList } from "uint8arraylist";

export { Config as YamuxConfig } from "@chainsafe/libp2p-yamux/config";
export { TCPOptions as TCPConfig, TCPSocketOptions } from "@libp2p/tcp";

// Vendored from @chainsafe/libp2p-noise
export interface NoiseExtensions {
webtransportCerthashes: Uint8Array[];
}

export interface KeyPair {
publicKey: Uint8Array;
privateKey: Uint8Array;
}

export interface ICryptoInterface {
hashSHA256(data: Uint8Array | Uint8ArrayList): Uint8Array;

getHKDF(
ck: Uint8Array,
ikm: Uint8Array,
): [Uint8Array, Uint8Array, Uint8Array];

generateX25519KeyPair(): KeyPair;
generateX25519KeyPairFromSeed(seed: Uint8Array): KeyPair;
generateX25519SharedKey(
privateKey: Uint8Array | Uint8ArrayList,
publicKey: Uint8Array | Uint8ArrayList,
): Uint8Array;

chaCha20Poly1305Encrypt(
plaintext: Uint8Array | Uint8ArrayList,
nonce: Uint8Array,
ad: Uint8Array,
k: Uint8Array,
): Uint8ArrayList | Uint8Array;
chaCha20Poly1305Decrypt(
ciphertext: Uint8Array | Uint8ArrayList,
nonce: Uint8Array,
ad: Uint8Array,
k: Uint8Array,
dst?: Uint8Array,
): Uint8ArrayList | Uint8Array;
}

export interface NoiseConfig {
staticNoiseKey?: Uint8Array;
extensions?: NoiseExtensions;
crypto?: ICryptoInterface;
prologueBytes?: Uint8Array;
}
274 changes: 274 additions & 0 deletions libs/@local/harpc/client/typescript/src/net/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
import type { PeerId } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { Chunk } from "effect";
import {
Data,
Deferred,
Duration,
Effect,
Function,
MutableHashMap,
Option,
pipe,
Queue,
Sink,
Stream,
} from "effect";

import { createProto } from "../utils.js";
import { Buffer } from "../wire-protocol/index.js";
import type { RequestId } from "../wire-protocol/models/request/index.js";
import { Request as WireRequest } from "../wire-protocol/models/request/index.js";
import type { Response as WireResponse } from "../wire-protocol/models/response/index.js";
import { ResponseFlags } from "../wire-protocol/models/response/index.js";
import { ResponseFromBytesStream } from "../wire-protocol/stream/index.js";
import type { IncompleteResponseError } from "../wire-protocol/stream/ResponseFromBytesStream.js";
import type * as internalTransport from "./internal/transport.js";
import type * as Request from "./Request.js";
import * as Transaction from "./Transaction.js";

const TypeId: unique symbol = Symbol("@local/harpc-client/net/Connection");
export type TypeId = typeof TypeId;

export class TransportError extends Data.TaggedError("TransportError")<{
cause: unknown;
}> {
get message() {
return "Underlying transport stream experienced an error";
}
}

interface ConnectionDuplex {
readonly read: Stream.Stream<
WireResponse.Response,
TransportError | WireResponse.DecodeError | IncompleteResponseError
>;

readonly write: Sink.Sink<
void,
WireRequest.Request,
never,
TransportError | WireRequest.EncodeError
>;
}

export interface ConnectionConfig {
/**
* The maximum duration to wait for a response before considering the transaction lagged.
* If a response is not received within this timeout, the transaction will be dropped.
*
* @default 200ms
*/
lagTimeout?: Duration.DurationInput;

/**
* The size of the number of buffered responses to keep in memory.
* A larger buffer can improve performance and allows for more lenient timeouts,
* but consumes more memory. (a single response is a maximum of 64KiB)
*
* @default 16
*/
responseBufferSize?: number;

maxOutboundStreams?: number;
runOnLimitedConnection?: boolean;
}

export interface Connection {
[TypeId]: TypeId;
}

interface TransactionContext {
queue: Queue.Enqueue<WireResponse.Response>;
drop: Effect.Effect<void>;
}

interface ConnectionImpl extends Connection {
readonly transactions: MutableHashMap.MutableHashMap<
RequestId.RequestId,
TransactionContext
>;

readonly duplex: ConnectionDuplex;

readonly config: ConnectionConfig;
}

const ConnectionProto: Omit<
ConnectionImpl,
"transactions" | "duplex" | "config"
> = {
[TypeId]: TypeId,
};

const makeSink = (connection: ConnectionImpl) =>
// eslint-disable-next-line unicorn/no-array-for-each
Sink.forEach((response: WireResponse.Response) =>
Effect.gen(function* () {
const id = response.header.requestId;

const transaction = MutableHashMap.get(connection.transactions, id);
if (Option.isNone(transaction)) {
yield* Effect.logWarning("response without a transaction found");

return;
}

const lagTimeout = connection.config.lagTimeout ?? Duration.seconds(0.2);

const isOnline = yield* pipe(
Queue.offer(transaction.value.queue, response),
Effect.timeout(lagTimeout),
Effect.catchTag("TimeoutException", (timeout) =>
Effect.gen(function* () {
yield* Effect.logWarning(
"transaction has lagged behind too far, dropping it",
).pipe(Effect.annotateLogs({ timeout }));

yield* transaction.value.drop;
}),
),
);

if (isOnline === false) {
yield* Effect.logWarning("transaction has been closed, dropping it");
yield* transaction.value.drop;

return;
}

if (ResponseFlags.isEndOfResponse(response.header.flags)) {
yield* Effect.logDebug("end of response");

yield* transaction.value.drop;
}
}).pipe(Effect.annotateLogs({ id: response.header.requestId })),
);

const wrapDrop = (
connection: ConnectionImpl,
id: RequestId.RequestId,
drop: Deferred.Deferred<void>,
) =>
Effect.gen(function* () {
const transaction = MutableHashMap.get(connection.transactions, id);
if (Option.isNone(transaction)) {
yield* Effect.logWarning("transaction has been dropped multiple times");

return;
}

MutableHashMap.remove(connection.transactions, id);
yield* transaction.value.queue.shutdown;

// call user defined drop function
const dropImpl = yield* Deferred.poll(drop);
if (Option.isSome(dropImpl)) {
yield* dropImpl.value;
}
});

const task = (connection: ConnectionImpl) =>
Effect.gen(function* () {
const sink = makeSink(connection);

// We don't need to monitor if the connection itself closes as that simply means that our stream would end.
yield* Stream.run(connection.duplex.read, sink);
});

/** @internal */
export const makeUnchecked = (
transport: internalTransport.Transport,
config: ConnectionConfig,
peer: PeerId | Multiaddr | Multiaddr[],
) =>
Effect.gen(function* () {
const connection = yield* Effect.tryPromise((abort) =>
transport.dial(peer, { signal: abort }),
);

const stream = yield* Effect.acquireRelease(
Effect.tryPromise((abort) =>
connection.newStream("/harpc/1.0.0", {
signal: abort,
maxOutboundStreams: config.maxOutboundStreams,
runOnLimitedConnection: config.runOnLimitedConnection,
}),
),
(_) => Effect.promise(() => _.close()),
);

const readStream = pipe(
Stream.fromAsyncIterable(
stream.source,
(cause) => new TransportError({ cause }),
),
Stream.mapConcat((list) => list),
ResponseFromBytesStream.make,
);

const writeSink = pipe(
Sink.forEachChunk((chunk: Chunk.Chunk<Uint8Array>) =>
Effect.try({
try: () => stream.sink(chunk),
catch: (cause) => new TransportError({ cause }),
}),
),
Sink.mapInputEffect((request: WireRequest.Request) =>
Effect.gen(function* () {
const buffer = yield* Buffer.makeWrite();

yield* WireRequest.encode(buffer, request);

const array = yield* Buffer.take(buffer);
return new Uint8Array(array);
}),
),
);

const duplex = { read: readStream, write: writeSink } as ConnectionDuplex;

const self: ConnectionImpl = createProto(ConnectionProto, {
transactions: MutableHashMap.empty<
RequestId.RequestId,
TransactionContext
>(),
duplex,
config,
});

// TODO: we might want to observe the task, for that we would need to have a partial connection that we then patch
yield* Effect.fork(task(self));

return self as Connection;
});

export const send: {
<E, R>(
request: Request.Request<E, R>,
): (self: Connection) => Effect.Effect<Transaction.Transaction>;
<E, R>(
self: Connection,
request: Request.Request<E, R>,
): Effect.Effect<Transaction.Transaction>;
} = Function.dual(
2,
<E, R>(self: ConnectionImpl, request: Request.Request<E, R>) =>
Effect.gen(function* () {
const deferredDrop = yield* Deferred.make<void>();
const drop = wrapDrop(self, request.id, deferredDrop);

const queue = yield* Queue.bounded<WireResponse.Response>(
self.config.responseBufferSize ?? 16,
);

const transactionContext: TransactionContext = {
queue,
drop,
};

MutableHashMap.set(self.transactions, request.id, transactionContext);

return Transaction.makeUnchecked(request.id, queue, deferredDrop);
}),
);
Loading

0 comments on commit d7f2ebd

Please sign in to comment.