Skip to content

Commit

Permalink
Polish "Add Spring Pulsar transaction support"
Browse files Browse the repository at this point in the history
See gh-
  • Loading branch information
wilkinsona committed Apr 17, 2024
1 parent e3377cf commit fdfb697
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -104,14 +103,6 @@ public Template getTemplate() {
return this.template;
}

/**
* Whether transactions are enabled for either the template or the listener.
* @return whether transactions are enabled for either the template or the listener
*/
boolean isTransactionEnabled() {
return this.template.getTransaction().isEnabled() || this.listener.getTransaction().isEnabled();
}

public static class Client {

/**
Expand Down Expand Up @@ -775,7 +766,7 @@ public static class Listener {
/**
* Transaction settings.
*/
private final Transaction transaction = new ListenerTransaction();
private final Transaction transaction = new Transaction();

public SchemaType getSchemaType() {
return this.schemaType;
Expand Down Expand Up @@ -879,7 +870,7 @@ public static class Template {
/**
* Transaction settings.
*/
private final Transaction transaction = new TemplateTransaction();
private final Transaction transaction = new Transaction();

public boolean isObservationsEnabled() {
return this.observationsEnabled;
Expand All @@ -895,7 +886,7 @@ public Transaction getTransaction() {

}

public abstract static class Transaction {
public static class Transaction {

/**
* Whether transactions are enabled for the component.
Expand Down Expand Up @@ -937,41 +928,6 @@ public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

void validate() {
if (this.required && !this.enabled) {
String requiredProp = "%s.required".formatted(this.propertyPath());
String enabledProp = "%s.enabled".formatted(this.propertyPath());
throw new InvalidConfigurationPropertyValueException(requiredProp, this.required,
"Transactions must be enabled in order to be required. "
+ "Either set %s to 'true' or make transactions optional by setting %s to 'false'"
.formatted(enabledProp, requiredProp));
}
}

/**
* Gets the property path that the transaction properties are mapped to.
* @return the property path that the transaction properties are mapped to
*/
protected abstract String propertyPath();

}

static class TemplateTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.template.transaction";
}

}

static class ListenerTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.listener.transaction";
}

}

public static class Authentication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
Expand All @@ -65,7 +66,8 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
if (this.properties.isTransactionEnabled()) {
if (this.properties.getTemplate().getTransaction().isEnabled()
|| this.properties.getListener().getTransaction().isEnabled()) {
clientBuilder.enableTransaction(true);
}
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
Expand Down Expand Up @@ -163,7 +165,7 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {

<T> void customizeTemplate(PulsarTemplate<T> template) {
PulsarProperties.Transaction properties = this.properties.getTemplate().getTransaction();
properties.validate();
validate(properties, "spring.pulsar.template.transaction");
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(template.transactions()::setEnabled);
map.from(properties::isRequired).to(template.transactions()::setRequired);
Expand Down Expand Up @@ -214,13 +216,24 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie

private void customizePulsarContainerTransactionProperties(PulsarContainerProperties containerProperties) {
PulsarProperties.Transaction properties = this.properties.getListener().getTransaction();
properties.validate();
validate(properties, "spring.pulsar.listener.transaction");
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(containerProperties.transactions()::setEnabled);
map.from(properties::isRequired).to(containerProperties.transactions()::setRequired);
map.from(properties::getTimeout).to(containerProperties.transactions()::setTimeout);
}

private void validate(PulsarProperties.Transaction properties, String prefix) {
if (properties.isRequired() && !properties.isEnabled()) {
String requiredProp = "%s.required".formatted(prefix);
String enabledProp = "%s.enabled".formatted(prefix);
throw new InvalidConfigurationPropertyValueException(requiredProp, properties.isRequired(),
"When transactions are required they must also be enabled. "
+ "Either set %s to 'true' or make transactions optional by setting %s to 'false'"
.formatted(enabledProp, requiredProp));
}
}

<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
PulsarProperties.Reader properties = this.properties.getReader();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,36 +629,36 @@ class TransactionManagerTests {

@Test
@SuppressWarnings("unchecked")
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
void whenUserHasDefinedATransactionManagerTheAutoConfigurationBacksOff() {
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
}

@Test
void whenNoPropertiesSetDoesNotAutoconfigureBean() {
void whenNoPropertiesAreSetTransactionManagerShouldNotBeDefined() {
this.contextRunner
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenListenerAndTemplateDisablesTransactionsDoesNotAutoconfigureBean() {
void whenListenerAndTemplateDisableTransactionsTransactionManagerIsNotAutoConfigured() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=false",
"spring.pulsar.template.transaction.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenListenerEnablesTransactionsAutoconfiguresBean() {
void whenListenerEnablesTransactionsTransactionManagerIsAutoConfigured() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=true",
"spring.pulsar.template.transaction.enabled=false")
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}

@Test
void whenTemplateEnablesTransactionsAutoconfiguresBean() {
void whenTemplateEnablesTransactionsTransactionMAnagerIsAutoConfigured() {
this.contextRunner
.withPropertyValues("spring.pulsar.listener.transaction.enabled=false",
"spring.pulsar.template.transaction.enabled=true")
Expand All @@ -674,7 +674,7 @@ void whenTemplateRequiresTransactionsThenTransactionsMustBeEnabled() {
.getFailure()
.hasMessageEndingWith(
"Property spring.pulsar.template.transaction.required with value 'true' is invalid: "
+ "Transactions must be enabled in order to be required. Either set "
+ "When transactions are required they must also be enabled. Either set "
+ "spring.pulsar.template.transaction.enabled to 'true' or make transactions "
+ "optional by setting spring.pulsar.template.transaction.required to 'false'"));
}
Expand All @@ -688,7 +688,7 @@ void whenListenerRequiresTransactionsThenTransactionsMustBeEnabled() {
.getFailure()
.hasMessageEndingWith(
"Property spring.pulsar.listener.transaction.required with value 'true' is invalid: "
+ "Transactions must be enabled in order to be required. Either set "
+ "When transactions are required they must also be enabled. Either set "
+ "spring.pulsar.listener.transaction.enabled to 'true' or make transactions "
+ "optional by setting spring.pulsar.listener.transaction.required to 'false'"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,44 +408,4 @@ void bind() {

}

@Nested
class TransactionProperties {

@Test
void transactionsEnabledWhenListenerAndTemplateBothEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(true);
properties.getTemplate().getTransaction().setEnabled(true);
assertThat(properties.isTransactionEnabled()).isTrue();

}

@Test
void transactionsEnabledWhenListenerEnabledAndTemplateDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(true);
properties.getTemplate().getTransaction().setEnabled(false);
assertThat(properties.isTransactionEnabled()).isTrue();

}

@Test
void transactionsEnabledWhenListenerDisabledAndTemplateEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(false);
properties.getTemplate().getTransaction().setEnabled(true);
assertThat(properties.isTransactionEnabled()).isTrue();

}

void transactionsDisabledWhenListenerAndTemplateBothDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getListener().getTransaction().setEnabled(false);
properties.getTemplate().getTransaction().setEnabled(false);
assertThat(properties.isTransactionEnabled()).isFalse();

}

}

}

0 comments on commit fdfb697

Please sign in to comment.