Kafka Event in Integration Tests

2024.10.17 · 9 minute read

What is the requirement?

When introducing a new feature to the system, it is imperative to develop and implement a comprehensive suite of tests, including Unit Tests, Integration Tests, and Acceptance Tests.

This post will primarily focus on the Integration Test (IT) phase.  Integration Tests are designed to verify the functionality of the service on a local machine environment. It is crucial to ensure that all external systems and dependencies are properly configured and operational before commencing the testing process. Integration Testing serves as the penultimate stage of validation before proceeding to Acceptance Testing.

The current requirement is as follows: Upon successful completion of the primary process, the service is tasked with triggering a Kafka event. This event serves as a notification mechanism, informing other services of the successful execution. This signalling mechanism must be thoroughly validated through rigorous integration testing procedures.

Start Integration Test

There are few steps before run the code test:

  1. Clean up the docker containers and images.
  2. mvn clean install: clean files, compile and package the current source code.
  3. mvn clean verify [with special options for validation]: verify step is one of the step in install command, but the validation can be realised with different options.
  4. -P parameters : activate specified Maven profiles. Like open daemon service to block the main services exits after involking.
  5. -D parameters: sets system properties.
  6. -rf service: Resume the build from the specified module.

Files ending names with “ITCase” or “IT” are designated as integration tests. This is a standard naming convention. Maven is configured to automatically identify and execute these files as integration tests based on the presence of “IT” in their names.

Note: The code snippet is provided for illustrative purposes only. It is not a functional example and cannot be used in a real-world context.

Basic Code Structure

The basic structure for ITCase is comprised of four elements:

  • Define constants, class/instance variables
  • Define @BeforeAll: it marks a method to be run once before ALL test methods in the class.
  • Define @BeforeEach: Marks a method to be run before EACH test method in the class.
  • Define test: Implementing custom annotations to tag use case IDs enhances documentation traceability and improves test case management.
public class ServiceITCase {
 // define variables;
 private static final STATIC_VALUE = VALUE;
 private InstanceDefinedInAtBeforeAll instance;

 @BeforeAll
 static void setUp() throws Exception {
  // ...
 }

 @BeforeEach
 static void setUpBeforeEach() throws Exception {
  // …
 }

 @Test
    @Verifies("TraceId_useCase")
    void TestMethod() {
        // Given
        // ...
  // When
  // ...
  // Then
     // ...
    }
}

Configure Kafka Event

Properties is a class that represents a persistent set of properties and is a part of the java.util package. getProperty() and setProperty() are common methods.

This code example configures properties for Kafka consumer.

private static Properties setKafkaConsumerWith(final String groupId) {
        final Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootServer());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
                ServiceITCase.class.getSimpleName() + System.currentTimeMillis());
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomEventDeserializer.class.getName());
        return properties;
    }

The rough model of Kafka is as follows: The producer sends messages to Kafka Brokers with topics, which then send them to the consumer. Here is a brief description of each line:

  1. BOOTSTRAP_SERVERS_CONFIG: the Kafka broker addresses. It provides the initial points of contact for a related client (producer, consumer, or admin) to connect to the Kafka cluster.
  2. GROUP_ID_CONFIG: defines the consumer group id. Consumers with the same group Id work together as a single logical consumer.
  3. AUTO_OFFSET_RESET_CONFIG: optional, but recommended. Define where to start reading messages if no offsets is found.
  4. CLIENT_ID_CONFIG: Define a unique identifier for the client. In this case, it is this class.
  5. KEY_DESERIALIZER_CLASS_CONFIG: specifies the deserialiser for message keys.
  6. VALUE_DESERIALIZER_CLASS_CONFIG: specifies the deserialiser for message value.

What is Topic, Partition and Offset?

Before we proceed further with our code exploration, it is beneficial to familiarize ourselves with some key technical terminology. If you’re already well-versed in these terms, feel free to advance to the subsequent chapter.

Topic: Kafka topics organize related events. For example, we may have a topic called ‘logs’, which contains logs from an application.

Partitions: Topics are broken down into a number of partitions. A single topic may have more than one partition.

Offsets represent the position of a message within a Kafka Partition. It is an integer value, and each message in a given partition has a unique offset.

Brief Summary: Kafka structures data into topics, which are segmented into partitions. Each partition contains messages with unique, sequential identifiers known as offsets. These offsets enable consumers to monitor their progress and resume reading from specific points within the message stream, facilitating efficient data processing.

Main Test

Let’s move on to the central part of the test.

    @Test
    @Verifies("TraceId_useCase")
    void TestMethod() {
        // Given
     prepareSomething = prepare();
        final Properties properties = setKafkaConsumerWith("groupId");

        try (final Consumer<String, CustomEvent> kafkaConsumer = new KafkaConsumer<>(properties)) {
            kafkaConsumer.subscribe(Collections.singleton("custom_topic"));
            kafkaConsumer.poll(java.time.Duration.ofSeconds(1));

            // When
            // This method will send the event
            var result = doSomethingAndGetResult();

            // Then
            assertNotNull(result);
            assertThat(result.contains("value")).isTrue();
            await().atMost(3, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertTestMethod(kafkaConsumer));
        }
    }

The Consumer interface uses two type of parameters to specify the type of the key and value in Kafka messages.

final Consumer<String, CustomEvent> kafkaConsumer = new KafkaConsumer<>(properties)

The next two lines of code are crucial for setting up and operating a consumer:

kafkaConsumer.subscribe(Collections.singleton("custom_topic"));
kafkaConsumer.poll(java.time.Duration.ofSeconds(1));

subscribe() registers the consumer to specific Kafka topic, specifying which data streams to monitor for incoming messages.

poll() fetches messages from the subscribed topic. The duration parameter (1 second in this case) defines the maximum wait time for new messages before the method returns.

The first poll() call serves to establish a connection with the Kafka cluster and retrieve initial offsets. This initial call also allows for early detection of potential connection errors.

Complementing the standard assertThat(), we can leverage await() from org.awaitility.Awaitility. This code snippet demonstrates an asynchronous testing methodology:

await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTestMethod(kafkaConsumer));

This line of code:

  1. waits for a condition to be met
  2. has a timeout of 3 seconds
  3. repeatedly checks the assertion that will be introduced in the subsequent chapter
  4. Passes if the assertion becomes true within the timeout
  5. Fails if the timeout is reached before the assertion is true

Assertion

The Assertion of record does not require any specialized procedure. Step 2 is an optional phase designed to verify the accuracy of the record obtained from the broker.

  private void assertTestMethod(final Consumer<String, CustomEvent> kafkaConsumer) {
  // 1. Poll records from broker
  final ConsumerRecords<String, CustomEvent> consumerRecords =
                kafkaConsumer.poll(java.time.Duration.ofSeconds(1));

  // 2. (Optional) Verify the records
        final Optional<ConsumerRecord<String, CustomEvent>> serviceRecord = StreamSupport
                .stream(consumerRecords.spliterator(), false)
                .filter(consumerRecord -> {
                    final CustomEvent event = consumerRecord.value();
                    return event.getSomething().equals(testCase.getSomething().toString());
                }).findAny();
        assertThat(serviceRecord).isPresent();

  // 3. Assert Record
        final ConsumerRecord<String, CustomEvent> record = serviceRecord.get();
        assertThat(record.key()).isEqualTo(testCase.getKey().toString());

  // 4. (Optional) Debug
        for (Header header : record.headers()) {
            System.out.println("Header Key: " + header.key() +
                    ", Value: " + new String(header.value()));
        }
    }

Regarding step 4, it’s important to note the presence of headers in the record. Headers are a crucial feature in Kafka, enabling the attachment of metadata to messages without altering the message payload itself. These headers consist of key-value pairs, where the key is a String and the value is a byte array. To illustrate, here’s how one can add headers to a ProducerRecord:

ProducerRecord<String, String> record = new ProducerRecord<>(
    "topicName",
    null, // partition
    "key",
    "value",
    Headers headers = new RecordHeaders();
    headers.add("header1", "value1".getBytes());
    headers.add("header2", "value2".getBytes());
);

Extended Topic: Perceptual Decoupling

Before concluding, I’d like to share my thoughts on method design. These are just observations and experiences, not definitive guidelines.

In the “Basic Code Structure” chapter, I mentioned tests are divided into three parts: @BeforeAll@BeforeEach, and @Test. They not only execute in sequence but also have a hierarchical structure logically. @BeforeAll can be understood as global, with all tests and helper methods able to access variables defined there. @BeforeEach is for each test, where the test environment and state can be reset.

Once variables are defined, we can use them in @Test. At this point, when creating different methods based on this test, a divergence appears: whether to use these defined variables in sub-methods or pass them as arguments. This difference highlights the concept of decoupling.

As the project has been ongoing for a long time, many test cases exist. For new tests, I can combine different methods through copy and paste to complete testing for new features. I’ve found that the former method using arguments is clear and explicit, while with the other, I need to frequently check the @BeforeAll and @BeforeEach annotated functions.

Here is an example.

private static HttpClient client;

@BeforeEach
void setUp(){
 client = createHttpClient(USER_NAME,PASSWORD);
}

@Test
void test() {
 useClient1();
 useClient2(client);
}

void useClient1() {
 client.toTarget("www.abc.com").request().post()
}

void useClient2(HttpClient server) {
 server.toTarget("www.abc.com").request().post()
}

When I copy useClient2(), I clearly know it requires an HttpClient. I also know that HttpClient is defined in the @BeforeEach block above.

When I copy useClient1(), the client is hidden and not explicitly defined. So I need to spend more effort to find the corresponding keyword.

These two approaches represent different levels of coupling with the test. Of course, I’m not saying useClient2() is better than useClient1(), as we can see useClient2() contains more code, making the overall code more verbose.

How to choose between these two styles? My current answer is that it depends on the project style. If the project is mature with many similar code blocks, the decoupled approach might be better. But if this test is unique and likely to appear only here, the first approach is better.

Thank you for reading! Your support is appreciated.

If you enjoyed this, consider buying me a coffee. ☕️