Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Sep 13, 2024
1 parent e5f549f commit 9018c38
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 70 deletions.
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"dev:common:2": "nodemon -x ts-node src/common-readme-example.ts | pino-pretty",
"dev:common:3": "nodemon -x ts-node src/common-join-leave-rejoin-example.ts | pino-pretty",
"dev:common:4": "nodemon -x ts-node src/common-waiting-example.ts | pino-pretty",
"dev:common:5": "nodemon -x ts-node src/common-join-leave-rejoin-example-2.ts | pino-pretty",
"dev:redis:1": "nodemon -x ts-node src/redis-remote-map-example.ts | pino-pretty",
"dev:redis:2": "nodemon -x ts-node src/redis-dynamic-record-example.ts | pino-pretty",
"dev:redis:3": "nodemon -x ts-node src/redis-job-executing-example.ts | pino-pretty",
Expand Down
103 changes: 103 additions & 0 deletions examples/src/common-join-leave-rejoin-example-2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
This example demonstrates how to manage a small Hamok cluster with multiple servers, focusing on
dynamic joining, leaving, and handling peer connections in a distributed environment.
Workflow:
1. Initialization:
- Two servers (`server_1` and `server_2`) are initialized, and their local peer IDs are logged.
- The servers are set up to communicate by listening to each other's messages.
2. Joining the Cluster:
- Both servers join the cluster, and a third server (`server_3`) is added to the cluster later.
3. Handling Server Leaving:
- `server_1` leaves the cluster. The example monitors how `server_2` and `server_3` handle this event, including leader re-election.
4. Rejoining the Cluster:
- After a pause, `server_1` re-joins the cluster. The example ensures that the rejoin event is correctly triggered and handled.
5. Final Cleanup:
- The servers are gracefully closed, terminating the cluster.
Practical Use:
- Cluster Management: The example is useful for understanding how to manage a dynamic cluster using Hamok,
including handling server failures, leader elections, and reconnections.
- Event Handling: Demonstrates how to use Hamok’s event system to monitor and react to changes in the cluster.
- Logging: Provides a practical example of integrating logging into a Hamok-based system for debugging and monitoring purposes.
This example is suitable for developers looking to implement or understand distributed systems using the Hamok library.
*/


import { Hamok, setHamokLogLevel } from 'hamok';
import * as pino from 'pino';

const logger = pino.pino({
name: 'common-join-example-2',
level: 'debug',
});

export async function run() {

const server_1 = new Hamok({
onlyFollower: true,
});
logger.info('server 1 is %s', server_1.localPeerId);
const server1Acceptor = server_1.accept.bind(server_1);

let server_1_joined = false;

for (let i = 0; i < 10; ++i) {
const server_2 = new Hamok();
// by having the communication channel we assume we can inquery remote endpoints
logger.info('server 2 is %s', server_2.localPeerId);

const server2Acceptor = server_2.accept.bind(server_2);

server_1.on('message', server2Acceptor);
server_2.on('message', server1Acceptor);

const timer = setInterval(() => {
logger.debug('\
\niteration: %d, \
\nserver_1 (%s, state: %s) remotePeers are %s, \
\nserver_2 (%s, state: %s) remotePeers are %s',
i,
server_1.localPeerId,
server_1.state,
[...server_1.remotePeerIds].join(', '),
server_2.localPeerId,
server_2.state,
[...server_2.remotePeerIds].join(', '),
);
}, 1000)


await Promise.all([
server_1_joined ? Promise.resolve() : server_1.join(),
server_2.join(),
]);

logger.info('Server 1 and Server 2 joined');

server_2.close();

server_1.off('message', server2Acceptor);
server_2.off('message', server1Acceptor);

server_1_joined = true;
server_1.raft.config.onlyFollower = false;
clearInterval(timer);
}


logger.info('Close');

server_1.close();
}

if (require.main === module) {
logger.info('Running from module file');
setHamokLogLevel('debug');
run();
}
61 changes: 19 additions & 42 deletions src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
private readonly _codec = new HamokGridCodec();
public readonly grid: HamokGrid;

private _checkingRemotePeers?: Promise<void>;
private _lookingForRemotePeers?: ReturnType<typeof setInterval>;

public constructor(providedConfig?: Partial<HamokConfig<AppData>>) {
super();
Expand Down Expand Up @@ -757,6 +757,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
}

public async submit(entry: HamokMessage): Promise<void> {
if (this._closed) throw new Error('Cannot submit on a closed Hamok instance');
if (!this.raft.leaderId) {
throw new Error(`No leader is elected, cannot submit message type ${entry.type}`);
}
Expand Down Expand Up @@ -814,6 +815,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
}

public accept(message: HamokMessage, commitIndex?: number): void {
if (this._closed) return;
if (message.destinationId && message.destinationId !== this.localPeerId) {
return logger.trace('%s Received message address is not matching with the local peer %o', this.localPeerId, message);
}
Expand Down Expand Up @@ -1042,7 +1044,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
minNumberOfLogs = notification.numberOfLogs;
}

notification.activeEndpointIds?.forEach((remotePeerId) => (remotePeerId !== this.localPeerId ? remotePeerIds.add(remotePeerId) : void 0));
// notification.activeEndpointIds?.forEach((remotePeerId) => (remotePeerId !== this.localPeerId ? remotePeerIds.add(remotePeerId) : void 0));
if (notification.sourceEndpointId !== this.localPeerId) {
remotePeerIds.add(notification.sourceEndpointId);
}
Expand Down Expand Up @@ -1088,6 +1090,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
}
const raftEngine = this.raft;

logger.debug('%s Stopping the raft engine', this.localPeerId);
clearInterval(this._raftTimer);
this._raftTimer = undefined;

Expand Down Expand Up @@ -1138,6 +1141,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
const notification = this._codec.decodeJoinNotification(message);

if (notification.sourcePeerId !== this.localPeerId) {
logger.debug('%s Received join notification from %s', this.localPeerId, notification.sourcePeerId);
this.addRemotePeerId(notification.sourcePeerId);
} else {
logger.debug('%s Received join notification from itself %o', this.localPeerId, notification);
Expand Down Expand Up @@ -1253,6 +1257,8 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
for (const collection of this.storages.values()) {
collection.connection.emit('leader-changed', leaderId);
}
clearInterval(this._lookingForRemotePeers);
this._lookingForRemotePeers = undefined;

if (this.localPeerId === leaderId) {
this.on('remote-peer-joined', this._sendEndpointNotificationsToAll);
Expand All @@ -1262,8 +1268,10 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
this.off('remote-peer-left', this._sendEndpointNotificationsToAll);

if (leaderId === undefined) {
logger.warn('%s detected that Leader is gone, clearing the remote peers', this.localPeerId);
this.raft.remotePeers.clear();
// when there is no leader we need to check the remote peers
this._checkRemotePeers().catch(() => void 0);
this._checkRemotePeers();
}
}
}
Expand Down Expand Up @@ -1363,53 +1371,22 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
return connection;
}

private async _checkRemotePeers(): Promise<void> {
private _checkRemotePeers(): void {
if (!this._closed) return;
if (this._checkingRemotePeers) {
return this._checkingRemotePeers;
if (this._lookingForRemotePeers) {
return;
}

// if the leader is elected, we don't need to check the remote peers
if (this.raft.leaderId !== undefined || !this._run) return;

logger.debug('_checkRemotePeers(): %s checking remote peers', this.localPeerId);
this._lookingForRemotePeers = setInterval(() => {
const joinMsg = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId));

this._checkingRemotePeers = (async () => {
try {
const response = await this.fetchRemotePeers(1000);

// we don't need to check the remote peers if the leader is elected meanwhile
if (this.raft.leaderId !== undefined || !this._run) return logger.trace('_checkRemotePeers(): %s leader is elected meanwhile', this.localPeerId);

logger.debug('_checkRemotePeers(): %s received remote peers %o, this peer current remote peers: %s',
this.localPeerId,
response.remotePeers,
[ ...this.remotePeerIds ].join(', ')
);

for (const remotePeerId of response.remotePeers) {
if (this.remotePeerIds.has(remotePeerId)) continue;
if (remotePeerId === this.localPeerId) continue;

logger.debug('_checkRemotePeers(): %s add remote peer %s', this.localPeerId, remotePeerId);
this.addRemotePeerId(remotePeerId);
}

for (const remotePeerId of this.remotePeerIds) {
if (response.remotePeers.includes(remotePeerId)) continue;
if (remotePeerId === this.localPeerId) continue;

logger.debug('_checkRemotePeers(): %s remove remote peer %s', this.localPeerId, remotePeerId);
this.removeRemotePeerId(remotePeerId);
}
} catch (err) {
logger.warn('_checkRemotePeers(): Failed to fetch remote peers', err);
} finally {
this._checkingRemotePeers = undefined;
}
})();

return this._checkingRemotePeers;
// this will trigger the remote endpoint to add this endpoint
this._emitMessage(joinMsg);
}, Math.max(this.raft.config.followerMaxIdleInMs / 5, 100));
}

private _acceptKeepAliveHamokMessage(message: HamokMessage) {
Expand Down
55 changes: 28 additions & 27 deletions src/collections/HamokEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,35 +165,36 @@ export class HamokEmitter<T extends HamokEmitterEventMap> {
);
}
})
.on('remote-peer-removed', (remotePeerId) => {
if (this.connection.grid.leaderId !== this.connection.localPeerId) {
if (this.connection.grid.leaderId === undefined) {
this._removedPeerIdsBuffer.push(remotePeerId);
}
// this is problematic if we a bit more flexible about the remote peers
// .on('remote-peer-removed', (remotePeerId) => {
// if (this.connection.grid.leaderId !== this.connection.localPeerId) {
// if (this.connection.grid.leaderId === undefined) {
// this._removedPeerIdsBuffer.push(remotePeerId);
// }

return;
}
let retried = 0;
const process = async (): Promise<unknown> => {
if (this.connection.grid.leaderId === undefined) {
return Promise.resolve(this._removedPeerIdsBuffer.push(remotePeerId));
} else if (this.connection.grid.leaderId !== this.connection.localPeerId) {
// not our problem.
return Promise.resolve();
}
try {
return this.connection.requestDeleteEntries(new Set([ remotePeerId ]));
} catch (err) {
logger.warn('Error while requesting to remove endpoint %s, from subscriptions in emitter %s, error: %o', remotePeerId, this.id, err);

if (++retried < 10) {
return process();
}
}
};
// return;
// }
// let retried = 0;
// const process = async (): Promise<unknown> => {
// if (this.connection.grid.leaderId === undefined) {
// return Promise.resolve(this._removedPeerIdsBuffer.push(remotePeerId));
// } else if (this.connection.grid.leaderId !== this.connection.localPeerId) {
// // not our problem.
// return Promise.resolve();
// }
// try {
// return this.connection.requestDeleteEntries(new Set([ remotePeerId ]));
// } catch (err) {
// logger.warn('Error while requesting to remove endpoint %s, from subscriptions in emitter %s, error: %o', remotePeerId, this.id, err);

// if (++retried < 10) {
// return process();
// }
// }
// };

process().catch(() => void 0);
})
// process().catch(() => void 0);
// })
.on('leader-changed', (leaderId) => {
if (leaderId !== this.connection.grid.localPeerId) {
if (leaderId !== undefined) {
Expand Down
7 changes: 6 additions & 1 deletion src/raft/RaftFollowerState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ export function createRaftFollowerState(context: RaftFollowerStateContext) {
messageEmitter.send(response);
};
const voteRequestListener = (request: RaftVoteRequest) => {
logger.trace('%s Received a vote request %o, votedFor: %s', localPeerId, request, props.votedFor);
logger.debug('%s Received a vote request %o, votedFor: %s', localPeerId, request, props.votedFor);
// if (raftEngine.leaderId !== undefined) {
// if we know the leader, we should not vote for anyone else, until the leader is alive
// return messageEmitter.send(request.createResponse(false));
// }
if (request.term <= props.currentTerm) {
// someone requested a vote from a previous or equal term.
logger.warn('The candidate %s requested a vote from a previous or equal term. The current term is %d.', request.candidateId, props.currentTerm);

return messageEmitter.send(request.createResponse(false));
}

Expand All @@ -167,6 +169,8 @@ export function createRaftFollowerState(context: RaftFollowerStateContext) {
if (request.lastLogIndex < logs.commitIndex) {
// if the highest index of the candidate is smaller than the commit index of this,
// then that candidate should not lead this cluster, and wait for another leader who can
logger.warn('The candidate %s has a smaller last log index than the commit index of this follower %s. The candidate should not lead this cluster.', request.candidateId, localPeerId);

return messageEmitter.send(request.createResponse(false));
}
}
Expand Down Expand Up @@ -209,6 +213,7 @@ export function createRaftFollowerState(context: RaftFollowerStateContext) {
}
// we don't know a leader at this point
raftEngine.leaderId = undefined;
raftEngine.props.votedFor = undefined;
if (raftEngine.remotePeers.size < 1) {
// if we are alone, there is no point to start an election
// so we just restart the timer
Expand Down

0 comments on commit 9018c38

Please sign in to comment.