How to Set Up Kafka Integration Test – Grape Up
Do you look at device testing as not ample remedy for preserving the application’s reliability and security? Are you scared that by some means or somewhere there is a prospective bug hiding in the assumption that unit tests ought to include all circumstances? And also is mocking Kafka not adequate for job prerequisites? If even a single reply is ‘yes’, then welcome to a good and straightforward guideline on how to set up Integration Checks for Kafka applying TestContainers and Embedded Kafka for Spring!
What is TestContainers?
TestContainers is an open up-supply Java library specialised in supplying all wanted methods for the integration and tests of exterior resources. It means that we are in a position to mimic an true databases, internet server, or even an party bus atmosphere and address that as a trusted place to exam application functionality. All these extravagant options are hooked into docker photos, outlined as containers. Do we need to examination the database layer with precise MongoDB? No anxieties, we have a take a look at container for that. We can not also ignore about UI exams – Selenium Container will do nearly anything that we in fact will need.
In our scenario, we will concentration on Kafka Testcontainer.
What is Embedded Kafka?
As the name suggests, we are likely to deal with an in-memory Kafka instance, all set to be used as a ordinary broker with total operation. It allows us to do the job with producers and shoppers, as standard, generating our integration exams light-weight.
Ahead of we start out
The concept for our check is easy – I would like to exam Kafka purchaser and producer working with two unique techniques and examine how we can make use of them in true situations.
Kafka Messages are serialized applying Avro schemas.
Embedded Kafka – Producer Take a look at
The idea is quick – let us develop a basic job with the controller, which invokes a provider technique to force a Kafka Avro serialized information.
Dependencies:
dependencies
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')
implementation('org.springframework.boot:spring-boot-starter-web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-exam-assist:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
Also value mentioning excellent plugin for Avro. In this article plugins area:
plugins
id 'org.springframework.boot' version '2.6.8'
id 'io.spring.dependency-management' variation '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" edition "1.3."
Avro Plugin supports schema car-creating. This is a will have to-have.
Backlink to plugin: https://github.com/davidmc24/gradle-avro-plugin
Now let’s define the Avro schema:
"namespace": "com.grapeup.myawesome.myawesomeproducer",
"sort": "record",
"identify": "RegisterRequest",
"fields": [
"name": "id", "type": "long",
"name": "address", "type": "string", "avro.java.string": "String"
]
Our ProducerService will be centered only on sending messages to Kafka utilizing a template, absolutely nothing interesting about that portion. Most important performance can be finished just employing this line:
ListenableFuture> long term = this.kafkaTemplate.mail("sign up-request", kafkaMessage)
We simply cannot forget about about exam qualities:
spring:
key:
enable-bean-definition-overriding: genuine
kafka:
customer:
group-id: team_id
automobile-offset-reset: earliest
vital-deserializer: org.apache.kafka.popular.serialization.StringDeserializer
price-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
producer:
vehicle.sign-up.schemas: accurate
critical-serializer: org.apache.kafka.common.serialization.StringSerializer
price-serializer: com.grapeup.myawesome.myawesomeconsumer.widespread.CustomKafkaAvroSerializer
qualities:
certain.avro.reader: genuine
As we see in the mentioned take a look at homes, we declare a custom made deserializer/serializer for KafkaMessages. It is very advised to use Kafka with Avro – really don’t allow JSONs manage item structure, let us use civilized mapper and object definition like Avro.
Serializer:
general public course CustomKafkaAvroSerializer extends KafkaAvroSerializer
public CustomKafkaAvroSerializer()
tremendous()
super.schemaRegistry = new MockSchemaRegistryClient()
community CustomKafkaAvroSerializer(SchemaRegistryClient shopper)
tremendous(new MockSchemaRegistryClient())
general public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props)
super(new MockSchemaRegistryClient(), props)
Deserializer:
general public class CustomKafkaAvroSerializer extends KafkaAvroSerializer
general public CustomKafkaAvroSerializer()
super()
super.schemaRegistry = new MockSchemaRegistryClient()
public CustomKafkaAvroSerializer(SchemaRegistryClient client)
super(new MockSchemaRegistryClient())
community CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props)
super(new MockSchemaRegistryClient(), props)
And we have every thing to get started composing our test.
@ExtendWith(SpringExtension.course)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.For each_Class)
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 1, matters = "register-ask for")
class ProducerControllerTest {
All we have to have to do is incorporate @EmbeddedKafka annotation with stated matters and partitions. Application Context will boot Kafka Broker with provided configuration just like that. Maintain in intellect that @TestInstance must be utilized with distinctive thought. Lifecycle.For every_Course will stay clear of generating the similar objects/context for each and every test method. Well worth checking if exams are too time-consuming.
Purchaser consumerServiceTest
@BeforeEach
void Setup()
DefaultKafkaConsumerFactory buyer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()
consumerServiceTest = client.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject_Name))
Here we can declare the test shopper, based mostly on the Avro schema return form. All Kafka properties are already delivered in the .yml file. That buyer will be applied as a check if the producer truly pushed a message.
In this article is the genuine test strategy:
@Examination
void whenValidInput_therReturns200() throws Exception
RegisterRequestDto ask for = RegisterRequestDto.builder()
.id(12)
.tackle("tempAddress")
.develop()
mockMvc.carry out(
post("/register-request")
.contentType("software/json")
.articles(objectMapper.writeValueAsBytes(ask for)))
.andExpect(status().isOk())
ConsumerRecord consumedRegisterRequest = KafkaTestUtils.getSingleRecord(consumerServiceTest, Subject_Title)
RegisterRequest valueReceived = consumedRegisterRequest.worth()
assertEquals(12, valueReceived.getId())
assertEquals("tempAddress", valueReceived.getAddress())
1st of all, we use MockMvc to complete an motion on our endpoint. That endpoint works by using ProducerService to thrust messages to Kafka. KafkaConsumer is utilized to confirm if the producer labored as envisioned. And that is it – we have a entirely performing check with embedded Kafka.
Take a look at Containers – Consumer Check
TestContainers are practically nothing else like unbiased docker pictures completely ready for being dockerized. The adhering to exam circumstance will be improved by a MongoDB graphic. Why not retain our knowledge in the databases correct just after everything happened in Kafka stream?
Dependencies are not a lot unique than in the past illustration. The subsequent ways are wanted for examination containers:
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'
ext
set('testcontainersVersion', "1.17.1")
dependencyManagement
imports
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
Let us target now on the Client component. The test circumstance will be straightforward – one particular client assistance will be liable for getting the Kafka message and storing the parsed payload in the MongoDB assortment. All that we need to have to know about KafkaListeners, for now, is that annotation:
@KafkaListener(matters = "register-request")
By the functionality of the annotation processor, KafkaListenerContainerFactory will be dependable to produce a listener on our technique. From this minute our method will react to any forthcoming Kafka message with the stated subject matter.
Avro serializer and deserializer configs are the similar as in the previous exam.
About TestContainer, we ought to get started with the next annotations:
@SpringBootTest
@ActiveProfiles("check")
@Testcontainers
community class AbstractIntegrationTest {
Throughout startup, all configured TestContainers modules will be activated. It suggests that we will get entry to the complete running surroundings of the selected resource. As instance:
@Autowired
personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
@Container
general public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)
As a outcome of booting the check, we can expect two docker containers to commence with the provided configuration.

What is truly crucial for the mongo container – it gives us comprehensive entry to the database using just a simple link uri. With these a aspect, we are ready to just take a appear what is the current point out in our collections, even throughout debug mode and ready breakpoints.
Acquire a look also at the Ryuk container – it is effective like overwatch and checks if our containers have started correctly.
And here is the final part of the configuration:
@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry)
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.insert("spring.kafka.customer.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.incorporate("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.insert("spring.info.mongodb.uri", mongoDBContainer::getReplicaSetUrl)
static
kafkaContainer.commence()
mongoDBContainer.start out()
mongoDBContainer.waitingFor(Hold out.forListeningPort()
.withStartupTimeout(Period.ofSeconds(180L)))
@BeforeTestClass
community void beforeTest()
kafkaListenerEndpointRegistry.getListenerContainers().forEach(
messageListenerContainer ->
ContainerTestUtils
.waitForAssignment(messageListenerContainer, 1)
)
@AfterAll
static void tearDown()
kafkaContainer.prevent()
mongoDBContainer.stop()
DynamicPropertySource gives us the possibility to set all necessary atmosphere variables all through the exam lifecycle. Strongly required for any config applications for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each individual listener to get anticipated partitions for the duration of container startup.
And the past portion of the Kafka take a look at containers journey – the main system of the examination:
@Examination
public void containerStartsAndPublicPortIsAvailable() throws Exception
writeToTopic("register-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").create())
//Wait for KafkaListener
TimeUnit.SECONDS.snooze(5)
Assertions.assertEquals(1, taxiRepository.findAll().sizing())
personal KafkaProducer createProducer()
return new KafkaProducer<>(kafkaProperties.buildProducerProperties())
non-public void writeToTopic(String topicName, RegisterRequest... registerRequests)
check out (KafkaProducer producer = createProducer())
Arrays.stream(registerRequests)
.forEach(registerRequest ->
ProducerRecord history = new ProducerRecord<>(topicName, registerRequest)
producer.send out(report)
)
The customized producer is dependable for composing our concept to KafkaBroker. Also, it is advisable to give some time for people to tackle messages properly. As we see, the concept was not just consumed by the listener, but also saved in the MongoDB selection.
Conclusions
As we can see, existing answers for integration exams are rather straightforward to carry out and maintain in initiatives. There is no issue in keeping just unit checks and counting on all strains covered as a signal of code/logic quality. Now the dilemma is, should really we use an Embedded resolution or TestContainers? I counsel initially of all focusing on the term “Embedded”. As a great integration test, we want to get an nearly great duplicate of the creation surroundings with all qualities/functions integrated. In-memory methods are very good, but mostly, not ample for massive organization jobs. Unquestionably, the gain of Embedded products and services is the quick way to apply these exams and sustain configuration, just when nearly anything takes place in memory.
TestContainers at the 1st sight may appear like overkill, but they give us the most significant function, which is a separate ecosystem. We do not have to even count on existing docker pictures – if we want we can use tailor made types. This is a huge enhancement for opportunity test situations.
What about Jenkins? There is no rationale to be scared also to use TestContainers in Jenkins. I firmly advocate examining TestContainers documentation on how very easily we can established up the configuration for Jenkins agents.
To sum up – if there is no blocker or any undesirable problem for employing TestContainers, then really don’t be reluctant. It is constantly superior to retain all companies managed and secured with integration take a look at contracts.