Skip to content

Commit

Permalink
DRAFT
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Sep 30, 2024
1 parent eee3515 commit fdc59f0
Show file tree
Hide file tree
Showing 4 changed files with 1,132 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -897,10 +898,12 @@ else if (listener instanceof MessageListener) {
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

final BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback
= (cRecord, runtimeException) ->
this.invokeErrorHandlerBySingleRecord(cRecord, runtimeException);
this.listener.setAsyncRetryCallback(asyncRetryCallback);
if (!AopUtils.isAopProxy(listener)) {
final BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback
= (cRecord, runtimeException) ->
this.invokeErrorHandlerBySingleRecord(cRecord, runtimeException);
this.listener.setAsyncRetryCallback(asyncRetryCallback);
}
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down
Loading

0 comments on commit fdc59f0

Please sign in to comment.