Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,40 @@ opensearchproject/opensearch
| @MinioTestcontainer | @MinioTestcontainerSingleton |
| @OpensearchTestcontainer | @OpensearchTestcontainerSingleton |

## Embedded режим без Docker

Для легковесных интеграционных тестов можно использовать
embedded-аннотации.
Библиотека сама запускает локальный backend.
Она инициализирует runtime-настройки в Spring context
и очищает данные между test methods.

| embedded | backend |
|-------------------------|---------------------------------|
| @EmbeddedPostgresqlTest | io.zonky.test embedded-postgres |
| @EmbeddedKafkaTest | spring-kafka-test EmbeddedKafka |

Пример:

```java
@EmbeddedPostgresqlTest
@EmbeddedKafkaTest(
topics = {
"magista-invoicing-test",
"magista-invoice-template-test"
},
properties = {
"kafka.topics.invoicing.id=magista-invoicing-test",
"kafka.topics.invoicing.consume.enabled=true",
"kafka.topics.invoice-template.id=magista-invoice-template-test",
"kafka.topics.invoice-template.consume.enabled=true"
}
)
@SpringBootTest
class KafkaListenerTest {
}
```

Для изменения `docker image tag`, который используется тестконтейнерами нужно переопределить параметры в `application.yml`:

```yml
Expand Down
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<packaging>jar</packaging>

<artifactId>testcontainers-annotations</artifactId>
<version>4.1.3</version>
<version>4.2.0</version>

<name>testcontainers-annotations</name>
<description>testcontainers-annotations</description>
Expand Down Expand Up @@ -43,6 +43,7 @@
<kafka-common-lib.version>0.0.3</kafka-common-lib.version>
<geck.version>1.0.2</geck.version>
<lombok.version>1.18.38</lombok.version>
<embedded-postgres.version>2.0.3</embedded-postgres.version>
<checkstyle.config.suppressions.path>./src/main/resources/checkstyle/checkstyle-suppressions.xml
</checkstyle.config.suppressions.path>
</properties>
Expand Down Expand Up @@ -181,6 +182,15 @@
<artifactId>spring-kafka</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
<dependency>
<groupId>io.zonky.test</groupId>
<artifactId>embedded-postgres</artifactId>
<version>${embedded-postgres.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;

Expand Down Expand Up @@ -76,7 +78,9 @@ private void executeMigration(Connection connection, String path) {
var split = sql.split(";");
for (var exec : split) {
if (exec != null && !exec.trim().isEmpty()) {
connection.createStatement().execute(exec);
try (var statement = connection.createStatement()) {
statement.execute(exec);
}
}
}
} catch (SQLException e) {
Expand All @@ -87,11 +91,14 @@ private void executeMigration(Connection connection, String path) {

private String getFile(String fileName) {
var classLoader = ClickhouseContainerExtension.class.getClassLoader();
try {
return IOUtils.toString(classLoader.getResourceAsStream(fileName), StandardCharsets.UTF_8);
try (var inputStream = Optional.ofNullable(classLoader.getResourceAsStream(fileName))
.orElseThrow(() -> new ClickhouseStartingException(
"Migration file not found: " + fileName,
new FileNotFoundException(fileName)))) {
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("Error when getFile e: ", e);
return "";
throw new ClickhouseStartingException("Error when reading migration file: " + fileName, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ private ClickhouseContainerExtension getOrCreateSingletonContainer(String databa
}

private ClickhouseContainerExtension create(String databaseName, String[] migrations) {
try (var container = new ClickhouseContainerExtension(databaseName, migrations)) {
return container;
}
return new ClickhouseContainerExtension(databaseName, migrations);
}

private static class SingletonHolder {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package dev.vality.testcontainers.annotations.kafka;

import dev.vality.testcontainers.annotations.KafkaTestConfig;
import dev.vality.testcontainers.annotations.kafka.config.KafkaConsumer;
import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Аннотация {@code @EmbeddedKafkaTest} подключает и запускает embedded Kafka
* {@link org.springframework.kafka.test.EmbeddedKafkaBroker}, также
* настройки embedded брокера будут проинициализированы в контекст тестового приложения
* <p>Аннотация требует дополнительной конфигурации {@link EmbeddedKafkaTest#topics()}
* <p><h3>Примеры</h3>
* <p>В примере ниже создается обертка над аннотацией для конкретного приложения с инициализацией
* конкретных топиков приложения. Эту обертку можно позже переиспользовать для любых тестов,
* требующих embedded Kafka без запуска Docker контейнера
* <pre> {@code
* @Target({ElementType.TYPE})
* @Retention(RetentionPolicy.RUNTIME)
* @EmbeddedKafkaTest(
* properties = {
* "kafka.topics.invoicing.consume.enabled=true",
* "kafka.topics.invoice-template.consume.enabled=true",
* "kafka.state.cache.size=0"},
* topics = {
* "magista-invoicing-test",
* "magista-invoice-template-test"})
* public @interface CustomEmbeddedKafkaTest {
* }}</pre>
* <p>В примере ниже {@link EmbeddedKafkaTest} подключается напрямую
* к {@link SpringBootTest} для проведения теста консьюмера, который читает данные из топика
* <pre> {@code
* @EmbeddedKafkaTest(
* properties = {
* "kafka.topics.invoicing.id=reporter-invoicing-test",
* "kafka.topics.invoicing.enabled=true"},
* topics = "reporter-invoicing-test")
* @SpringBootTest
* public class KafkaListenerTest {
*
* @Autowired
* private KafkaProducer<TBase<?, ?>> testThriftKafkaProducer;
*
* ...
* }}</pre>
*
* @see KafkaTestcontainer @KafkaTestcontainer
* @see KafkaTestcontainerSingleton @KafkaTestcontainerSingleton
* @see EmbeddedKafka @EmbeddedKafka
* @see KafkaProducer KafkaProducer
* @see KafkaConsumer KafkaConsumer
* @see KafkaTestConfig KafkaTestConfig
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@EmbeddedKafka
@ExtendWith(EmbeddedKafkaTestExtension.class)
@KafkaTestConfig
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public @interface EmbeddedKafkaTest {

/**
* Аналогичный параметр как у аннотации {@link SpringBootTest#properties()}
* <p>
* пример — properties = {"kafka.topics.invoicing.consume.enabled=true",...}
*/
String[] properties() default {};

/**
* Обязательный параметр — здесь перечисляются имена топиков, которые требуется создать при старте embedded Kafka
* <p>
* пример — topics = {"magista-invoicing-test",...}
*/
@AliasFor(annotation = EmbeddedKafka.class, attribute = "topics")
String[] topics() default {};

/**
* Очищать топики между тестами
*
* @return true - данные между тестами удаляются из embedded Kafka
*/
boolean cleanupTopics() default true;

/**
* Топики, которые не нужно очищать между тестами.
* Используется только если {@link #cleanupTopics()} = true
* <p>
* пример — excludeCleanupTopics = {"magista-invoicing-test"}
*/
String[] excludeCleanupTopics() default {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package dev.vality.testcontainers.annotations.kafka;

import org.junit.platform.commons.support.AnnotationSupport;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;

import java.util.List;
import java.util.Optional;

public class EmbeddedKafkaTestContextCustomizerFactory implements ContextCustomizerFactory {

@Override
public ContextCustomizer createContextCustomizer(
Class<?> testClass,
List<ContextConfigurationAttributes> configAttributes) {
return (context, mergedConfig) ->
findAnnotation(testClass).ifPresent(annotation -> init(context, annotation));
}

private Optional<EmbeddedKafkaTest> findAnnotation(Class<?> testClass) {
return AnnotationSupport.findAnnotation(testClass, EmbeddedKafkaTest.class);
}

private void init(ConfigurableApplicationContext context, EmbeddedKafkaTest annotation) {
TestPropertyValues.of(
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"kafka.ssl.enabled=false")
.and(annotation.properties())
.applyTo(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package dev.vality.testcontainers.annotations.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.platform.commons.support.AnnotationSupport;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class EmbeddedKafkaTestExtension implements BeforeEachCallback {

private static final int WAIT_TIMEOUT_SECONDS = 10;

@Override
public void beforeEach(ExtensionContext context) {
AnnotationSupport.findAnnotation(context.getTestClass(), EmbeddedKafkaTest.class)
.filter(EmbeddedKafkaTest::cleanupTopics)
.ifPresent(annotation -> cleanupTopics(context, annotation));
}

@SneakyThrows
private void cleanupTopics(ExtensionContext context, EmbeddedKafkaTest annotation) {
var topics = Arrays.stream(annotation.topics())
.filter(topic -> !List.of(annotation.excludeCleanupTopics()).contains(topic))
.toList();
if (topics.isEmpty()) {
return;
}

var applicationContext = SpringExtension.getApplicationContext(context);
var embeddedKafkaBroker = applicationContext.getBean(EmbeddedKafkaBroker.class);
try (var adminClient = AdminClient.create(buildAdminProperties(embeddedKafkaBroker))) {
var existingTopics = adminClient.listTopics().names().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
var missingTopics = topics.stream()
.filter(topic -> !existingTopics.contains(topic))
.toList();
if (!missingTopics.isEmpty()) {
embeddedKafkaBroker.addTopics(missingTopics.toArray(String[]::new));
}
var recordsToDelete = buildRecordsToDelete(embeddedKafkaBroker, topics);
if (!recordsToDelete.isEmpty()) {
adminClient.deleteRecords(recordsToDelete).all().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}
}

private Map<TopicPartition, RecordsToDelete> buildRecordsToDelete(
EmbeddedKafkaBroker embeddedKafkaBroker,
List<String> topics) {
try (var consumer = new KafkaConsumer<byte[], byte[]>(buildConsumerProperties(embeddedKafkaBroker))) {
var topicPartitions = topics.stream()
.flatMap(topic -> buildTopicPartitions(topic, embeddedKafkaBroker.getPartitionsPerTopic()).stream())
.toList();
consumer.assign(topicPartitions);
var endOffsets = consumer.endOffsets(topicPartitions);
var recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
endOffsets.forEach((topicPartition, offset) -> {
if (offset > 0) {
recordsToDelete.put(topicPartition, RecordsToDelete.beforeOffset(offset));
}
});
return recordsToDelete;
}
}

private List<TopicPartition> buildTopicPartitions(String topic, int partitions) {
return java.util.stream.IntStream.range(0, partitions)
.mapToObj(partition -> new TopicPartition(topic, partition))
.toList();
}

private Properties buildAdminProperties(EmbeddedKafkaBroker embeddedKafkaBroker) {
var properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
return properties;
}

private Properties buildConsumerProperties(EmbeddedKafkaBroker embeddedKafkaBroker) {
var properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "embedded-kafka-cleanup-" + UUID.randomUUID());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
}
Loading
Loading