Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update KCL to version 3 #228

Open
matheisco opened this issue Nov 20, 2024 · 8 comments
Open

Update KCL to version 3 #228

matheisco opened this issue Nov 20, 2024 · 8 comments
Milestone

Comments

@matheisco
Copy link

Amazon released KCL version 3. Please consider switching to the latest version.

In the meantime, is it safe to override the version number in the pom.xml of our own project?

@artembilan
Copy link
Member

Well, that would justify a new major version for spring-integration-aws and, respectively, new major for this Kinesis Binder project.
That is going to be a plan for the next year.

Meanwhile indeed, you can try:

			<dependency>
				<groupId>software.amazon.kinesis</groupId>
				<artifactId>amazon-kinesis-client</artifactId>
				<version>3.0.1</version>
			</dependency>

in your project with an attempt to override whatever comes as a transitive dependency from the spring-cloud-stream-binder-kinesis.
Might be the case that you would need to exclude that client from transitive deps as well:

			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
				<exclusions>
					<exclusion>
						<groupId>software.amazon.kinesis</groupId>
						<artifactId>amazon-kinesis-client</artifactId>
					</exclusion>
				</exclusions>
			</dependency>

I have tried to test spring-integration-aws against that KCL latest version.
Compiles well, but some of the tests were not completed.
I think the EC2 attempt is wrong:

2024-11-20 13:05:52,279 ERROR [Test worker] [software.amazon.kinesis.worker.platform.Ec2Resource] - Unable to retrieve instance metadata
java.net.SocketException: Network is unreachable: no further information
	at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[?:?]
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672) ~[?:?]
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:547) ~[?:?]
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:602) ~[?:?]
	at java.base/java.net.Socket.connect(Socket.java:633) ~[?:?]
	at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:178) ~[?:?]
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:534) ~[?:?]
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:639) ~[?:?]
	at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:282) ~[?:?]
	at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:387) ~[?:?]
	at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:409) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1308) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1241) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1127) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1056) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1686) ~[?:?]
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1610) ~[?:?]
	at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529) ~[?:?]
	at software.amazon.kinesis.worker.platform.Ec2Resource.isEc2(Ec2Resource.java:74) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.worker.platform.Ec2Resource.isOnPlatform(Ec2Resource.java:108) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.worker.WorkerMetricsSelector.getOperatingRangeDataProvider(WorkerMetricsSelector.java:66) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.worker.WorkerMetricsSelector.getDefaultWorkerMetrics(WorkerMetricsSelector.java:82) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.coordinator.Scheduler.selectWorkerMetricsIfAvailable(Scheduler.java:1353) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.coordinator.Scheduler.createDynamicMigrationComponentsInitializer(Scheduler.java:381) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.coordinator.Scheduler.<init>(Scheduler.java:297) ~[amazon-kinesis-client-3.0.1.jar:?]
	at software.amazon.kinesis.coordinator.Scheduler.<init>(Scheduler.java:242) ~[amazon-kinesis-client-3.0.1.jar:?]

@artembilan artembilan added this to the 5.0.0-M1 milestone Nov 20, 2024
@artembilan
Copy link
Member

Yeah...
Taking that back.
When I added:

adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig ->
					leaseManagementConfig.workerUtilizationAwareAssignmentConfig().disableWorkerMetrics(true));

and increased receive timeout, I got tests passed.
So, looks like KCL v3 is just a drop-in dependency!

@matheisco
Copy link
Author

Wonderful, thanks for checking!

@matheisco
Copy link
Author

matheisco commented Nov 21, 2024

There might be one small change necessary, I'll mention it here since I didn't see an issue for KCL 3 on spring-integration-aws.

I see the log message Scheduler is shutting down; checkpointing... when KCL 3 is doing lease handoffs. I'm still new with KCL, but Scheduler is shutting down sounds incorrect in this context.

KCL 3 introduced those graceful lease handoffs, where it calls shutdownRequested(), see also here.

EDIT: although, in the AWS example they log the same message 🤔 Maybe I'm just misunderstanding.

@artembilan
Copy link
Member

Our logic is like this:

		@Override
		public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
			logger.info("Scheduler is shutting down; checkpointing...");
			try {
				shutdownRequestedInput.checkpointer().checkpoint();
			}
			catch (ShutdownException | InvalidStateException ex) {
				logger.error(ex, "Exception while checkpointing at requested shutdown. Giving up");
			}
		}

And that is callback for a ShardRecordProcessor.
And that one is called from the ShardConsumer.shutdownComplete(), where:

    /**
     * Requests the shutdown of the ShardConsumer. This should give the record processor a chance to checkpoint
     * before being shutdown.
     *
     * @param shutdownNotification used to signal that the record processor has been given the chance to shut down.
     */
    public void gracefulShutdown(ShutdownNotification shutdownNotification) {
        if (subscriber != null) {
            subscriber.cancel();
        }
        if (shutdownNotification != null) {
            this.shutdownNotification = shutdownNotification;
        }
        markForShutdown(ShutdownReason.REQUESTED);
    }

And that one initiated by Scheduler.startGracefulShutdown().
And this one, in turn, is called from the KclMessageDrivenChannelAdapter:

	protected void doStop() {
		super.doStop();
		if (this.gracefulShutdownTimeout == 0) {
			this.scheduler.shutdown();
		}
		else {
			try {
				logger.info("Start graceful shutdown for KCL...");
				this.scheduler.startGracefulShutdown().get(this.gracefulShutdownTimeout, TimeUnit.MILLISECONDS);
			}
			catch (InterruptedException | ExecutionException | TimeoutException ex) {
				throw new RuntimeException("Graceful shutdown for KCL has failed.", ex);
			}
		}
	}

So, I'm not sure what is your concern since everything is correct.
We see that Scheduler is shutting down exactly we when stop polling from the Kinesis.

What do I miss?

@matheisco
Copy link
Author

To me Scheduler is shutting down sounds like the whole worker is shutting down. With KCL 3, this message is now also printed when there is no shutdown, but a lease is handed from one worker to another. In my understanding the Scheduler keeps running.

@artembilan
Copy link
Member

Well, that's question not to this project.
We just follow an API and its description:

    /**
     * Called when the Scheduler has been requested to shutdown. This is called while the record processor still holds
     * the lease so checkpointing is possible. Once this method has completed the lease for the record processor is
     * released, and {@link #leaseLost(LeaseLostInput)} will be called at a later time.
     *
     * @param shutdownRequestedInput
     *            provides access to a checkpointer allowing a record processor to checkpoint before the shutdown is
     *            completed.
     */
    void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput);

So, since this callback is somehow tied with a shutdown for the current worker, we indicate respectively in logs.
You always can turn off info logging level for org.springframework category.

@matheisco
Copy link
Author

That's fair enough.
IMO AWS changed the semantics of this method with v3, and I suspect they forgot to update javadoc, and tbh. it's a really minor thing. Just something to be aware of.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants