-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathintegration.spec.ts
132 lines (117 loc) · 3.68 KB
/
integration.spec.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import {
createCastle,
createLogging,
toLogCreator,
Logger,
consumeEachMessage,
LoggingContext,
} from '@ovotech/castle';
import * as uuid from 'uuid';
import { Schema } from 'avsc';
import { ObjectReadableMock } from 'stream-mock';
import { retry } from 'ts-retry-promise';
import { createCastleStream } from '../src';
import { CastleStreamConsumerConfig } from '../src/types';
const topic1 = `test-1-${uuid.v4()}`;
const topic2 = `test-2-${uuid.v4()}`;
const groupId1 = `test-group-1-${uuid.v4()}`;
export interface Event1 {
field1: string;
}
export interface Event2 {
field2: string;
}
export const Event1Schema: Schema = {
type: 'record',
name: 'Event',
fields: [{ name: 'field1', type: 'string' }],
};
export const Event2Schema: Schema = {
type: 'record',
name: 'Event',
fields: [{ name: 'field2', type: 'string' }],
};
const log: Array<[string, string, unknown]> = [];
const myLogger: Logger = {
log: (level, message, metadata) => log.push([level, message, metadata]),
};
const logging = createLogging(myLogger);
const logCreator = toLogCreator(myLogger);
const data: string[] = [];
const eachEvent1 = consumeEachMessage<Event1, LoggingContext>(async ({ message, logger }) => {
if (message.value) {
data.push(message.value.field1);
logger.log('info', message.value.field1);
}
});
const event2Consumer1: CastleStreamConsumerConfig<string, Event2, null> = {
topic: topic2,
source: new ObjectReadableMock(['test1', 'test2', 'test3']),
toKafkaMessage: (message) => ({ value: { field2: message }, key: null, schema: Event2Schema }),
eachBatch: async ({ batch: { messages }, producer }) => {
producer.send({
topic: topic1,
schema: Event1Schema,
messages: messages.map((message) => ({
value: { field1: `new-${message.value?.field2}` },
key: null,
})),
});
},
};
const event2Consumer2: CastleStreamConsumerConfig<string, Event2, null> = {
topic: topic2,
source: new ObjectReadableMock(['test-other-1', 'test-other-2']),
toKafkaMessage: (message) => ({ value: { field2: message }, key: null, schema: Event2Schema }),
eachMessage: async ({ message, producer }) => {
producer.send({
topic: topic1,
schema: Event1Schema,
messages: [{ value: { field1: `new-${message.value?.field2}` }, key: null }],
});
},
};
describe('Integration', () => {
it('Should process response', async () => {
jest.setTimeout(15000);
const castle = createCastle({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'], logCreator },
consumers: [
{ topic: topic1, fromBeginning: true, groupId: groupId1, eachMessage: logging(eachEvent1) },
],
});
const castleStream = createCastleStream({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'], logCreator },
consumers: [event2Consumer1, event2Consumer2],
});
const admin = castle.kafka.admin();
try {
await admin.connect();
await admin.createTopics({ topics: [{ topic: topic1 }, { topic: topic2 }] });
await castle.start();
await castleStream.start();
await retry(
async () => {
expect(castle.isRunning()).toBe(true);
expect(castleStream.isRunning()).toBe(true);
expect(data).toEqual(
expect.arrayContaining([
'new-test1',
'new-test2',
'new-test3',
'new-test-other-1',
'new-test-other-2',
]),
);
},
{ delay: 1000, retries: 3 },
);
} finally {
await admin.disconnect();
await castle.stop();
await castleStream.stop();
}
});
});