-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Components] kafka - new source and action components #15175
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 3 Skipped Deployments
|
WalkthroughThis pull request introduces a comprehensive Kafka integration for Pipedream, adding multiple components and actions for interacting with Kafka clusters. The implementation includes sources for monitoring new messages and topics, and actions for publishing, creating, and deleting Kafka topics. The changes leverage the KafkaJS library and provide a robust set of methods for Kafka cluster management, with support for configurable parameters like partitions, replication factors, and message properties. Changes
Assessment against linked issues
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
fafe946
to
7325611
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 2
🧹 Nitpick comments (12)
components/kafka/kafka.app.mjs (3)
8-15
: Consider supporting an optional advanced topic selector.
The currenttopic
prop useslistTopics()
to provide a straightforward list of available topics. For scenarios with large or dynamically changing topic sets, consider implementing a search or wildcard-based selector to enhance UX and handle large-list performance more gracefully.
36-44
: Add retry or error-handling logic aroundwithApi
.
WhilewithApi
neatly encapsulates connect/disconnect, intermittent network failures or ephemeral broker issues may causeconnect()
ordisconnect()
to fail. Consider adding retries or fallback handling to improve stability.
63-78
: Handle consumer errors and unexpected disconnections more gracefully.
ThemessageListener
method solidly manages consumer subscriptions, but lacks explicit error handling for failures likeconsumer.run()
rebalances and crashes. Consider adding callbacks or try/catch logic to handle consumer errors, especially in long-running processes.components/kafka/actions/delete-topic/delete-topic.mjs (2)
3-18
: Ensure sufficient instructions in user-facing properties.
The action properties mention “topic to delete” but do not clarify if it supports multiple topics. Button text or a more descriptive label can reduce confusion, especially for users expecting to delete multiple topics at once.
19-35
: Recommend validation or error-handling for non-existent topics.
If the specified topic does not exist,deleteTopics
could fail silently or throw. Consider adding error handling to return a user-friendly message or gracefully handle the scenario where the topic is missing.components/kafka/sources/new-topic-created/new-topic-created.mjs (2)
35-43
: Optimize event emission for large topic sets.
emitTopics
individually emits for each topic in the loop. If the cluster has many new topics concurrently, you may experience performance bottlenecks. Consider batched emission if the downstream pipeline can handle it.
45-55
: Handle scenarios with no new topics.
WhenlistTopics()
returns no matching new topics, no events are emitted. Ensure this is the intended behavior and consider logging or returning a summary for proactive monitoring.components/kafka/sources/new-message/new-message.mjs (1)
27-36
: Encourage named logging for clarity.
Inside thedelay
method, consider using a more explicit log or leveraging a logging library so that debugging around consumer disconnection is clearer in production.components/kafka/actions/publish-message/publish-message.mjs (1)
42-70
: Provide partition fallback or auto-balancing guidance.
When thepartition
prop is not specified, Kafka automatically determines partitions. Document this behavior or provide best-practices for explicit partition usage, ensuring the user is aware of how data is distributed across partitions.components/kafka/actions/create-topic/create-topic.mjs (3)
10-44
: Provide defaults or hints for input props
All props are well-organized. Consider providing default values or indicator hints for optional props likenumPartitions
,replicationFactor
, etc., so that users understand common or recommended defaults.
45-53
: Consider error handling
Therun()
method doesn't perform any explicit error handling. Ifapp.createTopics
rejects, this action will raise. Consider wrapping it in try-catch to handle or report errors gracefully.async run({ $ }) { try { ... const success = await app.createTopics({ ... }); $.export("$summary", "Successfully created topic."); return { success }; } catch (error) { + $.export("$summary", `Failed to create topic: ${error.message}`); + throw error; } }
70-85
: Summary message
Emitting a success summary is a good practice. However, consider returning more details (e.g., the created topic name) to give users direct confirmation of the newly created resource.$.export("$summary", "Successfully created topic."); +$.export("$summary", `Successfully created topic: ${topic}`);
🛑 Comments failed to post (2)
components/kafka/kafka.app.mjs (1)
27-35: 🛠️ Refactor suggestion
Refine client creation for multiple broker endpoints.
ThegetClient()
andgetApiClient()
methods appear to handle only a single host/port combination. For production environments, users often provide multiple brokers for failover. Consider supporting an array of brokers from the user’s auth or config for more robust Kafka connections.components/kafka/sources/new-message/new-message.mjs (1)
38-70: 🛠️ Refactor suggestion
Consider unique group IDs per user or source.
Using a hardcoded group ID (“pipedream-group”) can lead to collisions if multiple sources are configured in the same cluster. Allowing a dynamic group ID per source or user ID can avoid group membership conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
components/kafka/kafka.app.mjs (1)
33-44
: Enhance error handling in API wrapper.The connection lifecycle management is good, but error handling could be more robust.
Consider this enhancement:
async withApi(fn, api = constants.API.ADMIN, config) { const apiClient = this.getApiClient(api, config); - await apiClient.connect(); + try { + await apiClient.connect(); + } catch (err) { + throw new Error(`Failed to connect to Kafka ${api}: ${err.message}`); + } try { return await fn(apiClient); } finally { - await apiClient.disconnect(); + try { + await apiClient.disconnect(); + } catch (err) { + console.error(`Failed to disconnect from Kafka ${api}: ${err.message}`); + } } },
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (8)
components/kafka/actions/create-topic/create-topic.mjs
(1 hunks)components/kafka/actions/delete-topic/delete-topic.mjs
(1 hunks)components/kafka/actions/publish-message/publish-message.mjs
(1 hunks)components/kafka/common/constants.mjs
(1 hunks)components/kafka/kafka.app.mjs
(1 hunks)components/kafka/package.json
(2 hunks)components/kafka/sources/new-message/new-message.mjs
(1 hunks)components/kafka/sources/new-topic-created/new-topic-created.mjs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- components/kafka/package.json
- components/kafka/actions/publish-message/publish-message.mjs
- components/kafka/common/constants.mjs
- components/kafka/actions/delete-topic/delete-topic.mjs
- components/kafka/sources/new-topic-created/new-topic-created.mjs
- components/kafka/actions/create-topic/create-topic.mjs
- components/kafka/sources/new-message/new-message.mjs
🔇 Additional comments (2)
components/kafka/kafka.app.mjs (2)
1-16
: LGTM! Clean implementation of imports and prop definitions.The implementation follows Pipedream's component structure with proper type definitions and dynamic options loading.
60-78
:⚠️ Potential issueCritical: Improve message handling and consumer lifecycle management.
The current implementation has several potential issues:
- No message validation in sendMessages
- messageListener lacks error handling
- No cleanup mechanism for consumer disconnection
Apply these critical fixes:
sendMessages(args = {}) { + const { topic, messages } = args; + + if (!topic || !messages) { + throw new Error("topic and messages are required"); + } + + if (!Array.isArray(messages)) { + throw new Error("messages must be an array"); + } + + // Validate each message + messages.forEach(msg => { + if (!msg.value) { + throw new Error("Each message must have a value"); + } + }); + return this.withApi((producer) => producer.send(args), constants.API.PRODUCER); }, async messageListener({ topic, fromBeginning = true, onMessage, groupId, } = {}) { + if (!topic || !onMessage) { + throw new Error("topic and onMessage callback are required"); + } + const config = { groupId, + retry: { + initialRetryTime: 100, + retries: 8 + }, }; const consumer = this.getApiClient(constants.API.CONSUMER, config); + + const errorTypes = ['unhandledRejection', 'uncaughtException'] + const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] + + errorTypes.forEach(type => { + process.on(type, async () => { + try { + await consumer.disconnect() + } finally { + process.exit(0) + } + }) + }) + + signalTraps.forEach(type => { + process.once(type, async () => { + try { + await consumer.disconnect() + } finally { + process.exit(0) + } + }) + }) + await consumer.connect(); await consumer.subscribe({ topic, fromBeginning, }); await consumer.run({ - eachMessage: onMessage, + eachMessage: async (...args) => { + try { + await onMessage(...args); + } catch (error) { + console.error('Error processing message:', error); + // Depending on your error handling strategy: + // throw error; // to retry + // or continue processing next message + } + }, }); return consumer; },This implementation:
- Adds proper message validation
- Implements graceful shutdown
- Adds error handling for message processing
- Includes retry configuration
Let's verify the consumer group management:
7325611
to
21510c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
components/kafka/kafka.app.mjs (2)
18-32
: 🛠️ Refactor suggestionEnhance client configuration for production readiness.
The current implementation lacks important security and reliability features needed for production environments.
Building upon the previous review comment, also consider:
- Adding retry options for connection resilience
- Configurable client ID for better monitoring
- Connection timeout settings
getBrokers() { const { host, port, + additionalBrokers = [], } = this.$auth; + + if (!host || !port) { + throw new Error("Missing required authentication parameters: host and port"); + } + return [ `${host}:${port}`, + ...additionalBrokers, ]; }, getClient() { + const { + clientId = "Pipedream", + connectionTimeout = 3000, + authenticationTimeout = 1000, + ssl = false, + sasl, + } = this.$auth; + return new Kafka({ - clientId: "Pipedream", + clientId, brokers: this.getBrokers(), + ssl, + sasl, + connectionTimeout, + authenticationTimeout, + retry: { + initialRetryTime: 100, + retries: 8 + }, }); },
45-62
: 🛠️ Refactor suggestionAdd comprehensive validation and error handling for Kafka operations.
The Kafka operation methods need input validation and specific error handling.
Building upon the previous review comment about admin operations, also add:
- Type checking for all method parameters
- Specific error handling for common Kafka errors
- Validation for message format in sendMessages
+ async validateTopic(topic) { + if (!topic || typeof topic !== "string") { + throw new Error("Topic name must be a non-empty string"); + } + const topics = await this.listTopics(); + return topics.includes(topic); + }, + async sendMessages(args = {}) { + const { topic, messages } = args; + + if (!Array.isArray(messages)) { + throw new Error("Messages must be an array"); + } + + if (!await this.validateTopic(topic)) { + throw new Error(`Topic ${topic} does not exist`); + } + + const validatedMessages = messages.map(msg => ({ + ...msg, + value: msg.value ? + (typeof msg.value === 'string' ? msg.value : JSON.stringify(msg.value)) + : '', + })); + - return this.withApi((producer) => producer.send(args), constants.API.PRODUCER); + return this.withApi( + (producer) => producer.send({ ...args, messages: validatedMessages }), + constants.API.PRODUCER + ).catch(err => { + if (err.type === 'NOT_LEADER_FOR_PARTITION') { + throw new Error('Failed to write to partition. Please retry.'); + } + throw err; + }); },
🧹 Nitpick comments (2)
components/kafka/kafka.app.mjs (2)
8-15
: Add validation for topic name format.Consider adding validation for topic names to ensure they meet Kafka's requirements (e.g., valid characters, length limits).
topic: { type: "string", label: "Topic", description: "The topic to interact with.", + validate: (topic) => { + if (!topic.match(/^[a-zA-Z0-9._-]+$/)) { + return "Topic name can only contain alphanumeric characters, dots, underscores, and hyphens"; + } + if (topic.length > 249) { + return "Topic name cannot exceed 249 characters"; + } + return true; + }, options() { return this.listTopics(); }, },
1-78
: Consider architectural improvements for production scalability.While the implementation provides basic Kafka functionality, consider these architectural improvements for production scalability:
- Implement connection pooling for better resource management
- Add metrics collection for monitoring (e.g., message rates, latency)
- Consider implementing a dead letter queue for failed messages
- Add circuit breaker pattern for handling Kafka cluster outages
Would you like me to provide implementation examples for any of these architectural improvements?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (8)
components/kafka/actions/create-topic/create-topic.mjs
(1 hunks)components/kafka/actions/delete-topic/delete-topic.mjs
(1 hunks)components/kafka/actions/publish-message/publish-message.mjs
(1 hunks)components/kafka/common/constants.mjs
(1 hunks)components/kafka/kafka.app.mjs
(1 hunks)components/kafka/package.json
(2 hunks)components/kafka/sources/new-message/new-message.mjs
(1 hunks)components/kafka/sources/new-topic-created/new-topic-created.mjs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- components/kafka/common/constants.mjs
- components/kafka/actions/delete-topic/delete-topic.mjs
- components/kafka/actions/publish-message/publish-message.mjs
- components/kafka/sources/new-message/new-message.mjs
- components/kafka/sources/new-topic-created/new-topic-created.mjs
- components/kafka/actions/create-topic/create-topic.mjs
- components/kafka/package.json
WHY
Resolves #15098
Summary by CodeRabbit
Release Notes: Kafka Integration
New Features
Improvements
Version Update
kafkajs
dependency