Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaStreamsChannel : be-engine throws MissingSourceTopicException on startup. #117

Open
tsaxena18 opened this issue Jul 18, 2023 · 1 comment
Assignees

Comments

@tsaxena18
Copy link
Collaborator

tsaxena18 commented Jul 18, 2023

Version Information

Software Version(s)
BusinessEvents 6.3.0
OS Type? Win
OS Version? Win 10

What is the expected behavior?

The kafka-streams engine from KafkaStreamsChannel project should start successfully.

What is the actual behavior?

The kafka-streams engine throws exception on startup as described below :
stream-thread [be_kafka_count_streams-d1299413-3206-4f76-92a0-71b7bd3fd846-StreamThread-1-consumer] Caught an error in the task assignment. Returning an error assignment.
org.apache.kafka.streams.errors.MissingSourceTopicException: Missing source topics.
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareRepartitionTopics(StreamsPartitionAssignor.java:522) ~[kafka-streams-1.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:388) [kafka-streams-1.0.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:698) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:736) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:112) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:640) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:603) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1270) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1245) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1323) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1254) [kafka-clients.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) [kafka-clients.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1007) [kafka-streams-1.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:955) [kafka-streams-1.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762) [kafka-streams-1.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613) [kafka-streams-1.0.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) [kafka-streams-1.0.0.jar:?]

Please provide a unit test that demonstrates the bug.

  1. Create the Kafka Streams channel jar from repo. The jar is automatically added in BE_HOME/lib/ext/tpcl.
  2. Restart BE Studio and create channel of type Kafka Streams in BE project. Set all the fields in channel.
  3. Configure the project acc, and build the EAR file.
  4. Start kafka-streams engine using CDD and EAR, which throws exception as described above.

Please provide log files.

BE630v161.zip
KafkaStreamsChannel.zip
BE630-Kafka3.5.0.log

rakulkar-tibco added a commit to rakulkar-tibco/be-contribution that referenced this issue Jul 20, 2023
@tsaxena18
Copy link
Collaborator Author

As discussed today, the issue seem to be old as it is reproducible on previous release of BE. It happens only when kafka-streams is started before kafka-producer. It is however cannot be seen with KafkaChannel example, if we start consumer before producer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants