View on GitHub

SCRATCH PAD

No polish, just pulse

Kafka

Part 1: Introduction to Kafka Fundamentals

Before touching code, a beginner must understand the “Who, What, and Why” of Kafka.

1. What is Kafka?

Imagine a very busy restaurant.

Key Definition: Kafka is a distributed event streaming platform. It acts as a middleman that stores “messages” (data) so that different parts of a software system can talk to each other without being directly connected.

2. Core Components (The Vocabulary)


Part 2: Setting Up the Environment

The original guide starts here, but it misses explaining the commands.

Step 1: Install and Start Kafka

  1. Download: Get the latest binary from kafka.apache.org.
  2. Start Zookeeper: Kafka needs a coordinator to run.
# Open a terminal and run:
bin/zookeeper-server-start.sh config/zookeeper.properties

  1. Start Kafka Broker: Now start the actual engine.
# Open a NEW terminal and run:
bin/kafka-server-start.sh config/server.properties

  1. Create a Topic: We need a place to send our messages. Let’s call it logs.
bin/kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1


Part 3: The Spring Boot “Glue”

To make Spring Boot talk to Kafka, we need dependencies.

What was missing in the guide: A beginner needs to know that Kafka communicates using Bytes. However, our Java code uses Objects. We need a Serializer (to turn Java objects into bytes) and a Deserializer (to turn bytes back into Java objects).

Project Dependencies (pom.xml): If using Maven, you must ensure these are present:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Note: Jackson is needed to convert our Java objects into JSON strings, which Kafka can then send.

Part 4: Building the “Brain” (The Producer)

Now we need to create the service that actually sends the messages.

1. What is a KafkaTemplate?

A beginner might ask: “How does Java actually talk to Kafka?” In Spring Boot, we use KafkaTemplate. Think of it like a “post office.” You give it your message and the address (Topic), and it handles all the complicated network stuff to get it to the Kafka broker.

2. The Producer Code (with explanations)

The original guide had a simple producer, but it didn’t show how to handle the “Success” or “Failure” of a sent message. If the Kafka server is down, your application shouldn’t just crash or pretend it worked.

package com.example.kafkalogs.kafka;

import com.example.kafkalogs.model.LogMessage;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;

@Service
public class LogProducer {
    // String is the Key (usually a unique ID), LogMessage is the Value
    private final KafkaTemplate<String, LogMessage> kafkaTemplate;

    public LogProducer(KafkaTemplate<String, LogMessage> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendLog(String level, String message) {
        LogMessage log = new LogMessage(level, message, LocalDateTime.now());
        
        // Sending to the topic named "logs"
        // In real apps, we use .whenComplete() to check if the message actually arrived
        kafkaTemplate.send("logs", log).whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("Sent message=[" + log.getMessage() + 
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            } else {
                System.err.println("Unable to send message due to : " + ex.getMessage());
            }
        });
    }
}

What is an Offset? (Beginner Concept): Every message in Kafka is assigned a number (0, 1, 2…). This is called an Offset. It’s like a page number in a book so Kafka knows exactly where each message is.


Part 5: The “Listener” (The Consumer)

The Consumer “listens” for new messages.

1. The @KafkaListener Annotation

This is the magic part of Spring Boot. You just put this annotation on a method, and Spring will automatically run that method whenever a new message arrives in the topic.

2. The Consumer Code

package com.example.kafkalogs.kafka;

import com.example.kafkalogs.model.LogMessage;
import com.example.kafkalogs.repository.LogRepository;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class LogConsumer {
    private final LogRepository logRepository;

    public LogConsumer(LogRepository logRepository) {
        this.logRepository = logRepository;
    }

    // groupId: Messages are divided among consumers with the same group ID.
    // If you have 2 instances of this app, Kafka will send half the logs to each.
    @KafkaListener(topics = "logs", groupId = "log-group")
    public void consumeLog(LogMessage log) {
        logRepository.save(log);
        System.out.println("Received and saved: " + log.getMessage());
    }
}


Part 6: Why application.properties Matters

The original guide provided the properties but didn’t explain the most important part for beginners: Serialization.

Kafka only understands Zeros and Ones (Bytes). Java understands Objects.

Update your application.properties with these critical lines:

# THE SERVER ADDRESS
spring.kafka.bootstrap-servers=localhost:9092

# PRODUCER SETTINGS
# We use String for the key and JSON for our object
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# CONSUMER SETTINGS
spring.kafka.consumer.group-id=log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

# IMPORTANT: Tell Spring it's safe to trust our model package for JSON
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.kafkalogs.model

Warning: If you miss the trusted.packages line, your consumer will throw a security error because it doesn’t trust the incoming data to be converted into your Java class!


Part 7: The Data Warehouse (Database & Repository)

Now that our “Post Office” (Kafka) is working, we need a place to store the mail once it’s delivered. This is where Spring Data JPA and PostgreSQL come in.

1. Why a Database?

Kafka is a “streaming” platform—it’s great for moving data. But if you want to look up a log from three days ago efficiently or create a search feature, you should store the results in a permanent database like PostgreSQL or MySQL.

2. The Entity (The Blueprint)

We use the @Entity annotation to tell Spring that this class represents a table in our database.

package com.example.kafkalogs.model;

import jakarta.persistence.*;
import java.time.LocalDateTime;

@Entity 
@Table(name = "logs") // This creates a table named 'logs' automatically
public class LogMessage {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY) // Database handles the ID auto-increment
    private Long id;
    
    private String level;    // e.g., INFO, ERROR, WARN
    private String message;  // The actual log text
    private LocalDateTime timestamp;

    // Standard empty constructor (Required by JPA)
    public LogMessage() {}

    public LogMessage(String level, String message, LocalDateTime timestamp) {
        this.level = level;
        this.message = message;
        this.timestamp = timestamp;
    }

    // Getters and Setters (Omitted for brevity, but you MUST include them)
}

3. The Repository (The Data Access)

Beginners often wonder: “Do I need to write SQL queries like INSERT INTO...?” No. Spring Data JPA provides the JpaRepository. Just by extending this interface, you get methods like .save(), .findAll(), and .delete() for free.

package com.example.kafkalogs.repository;

import com.example.kafkalogs.model.LogMessage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface LogRepository extends JpaRepository<LogMessage, Long> {
    // No code needed here! Spring handles the implementation.
}


Part 8: Connecting the Dots (REST API)

We need a way to trigger a message. We will create a REST Controller so we can send an HTTP request that tells our Producer to send a message to Kafka.

package com.example.kafkalogs.controller;

import com.example.kafkalogs.kafka.LogProducer;
import com.example.kafkalogs.model.LogMessage;
import com.example.kafkalogs.repository.LogRepository;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/api/logs") // Better naming practice to use /api/
public class LogController {
    private final LogProducer logProducer;
    private final LogRepository logRepository;

    public LogController(LogProducer logProducer, LogRepository logRepository) {
        this.logProducer = logProducer;
        this.logRepository = logRepository;
    }

    // Endpoint to TRIGGER the producer
    @PostMapping("/send")
    public String sendToKafka(@RequestParam String level, @RequestParam String message) {
        logProducer.sendLog(level, message);
        return "Message successfully queued for Kafka!";
    }

    // Endpoint to READ from the database
    @GetMapping("/all")
    public List<LogMessage> fetchFromDb() {
        return logRepository.findAll();
    }
}


Part 9: How to Test (The “Moment of Truth”)

The original guide didn’t explain how to see if it’s working. Follow these steps:

  1. Run the App: Click ‘Run’ in your IDE (IntelliJ/Eclipse).
  2. Send a Message: Use a tool like Postman or your browser to hit this URL: POST http://localhost:8080/api/logs/send?level=ERROR&message=DatabaseConnectionFailed
  3. Check the Console: You should see:
    • Sent message=[DatabaseConnectionFailed]... (from Producer)
    • Received and saved: DatabaseConnectionFailed (from Consumer)
  4. Verify Storage: Go to: GET http://localhost:8080/api/logs/all You should see a JSON list containing your message.

Part 10: Common Beginner Errors (Troubleshooting)

When working with Kafka for the first time, things rarely work perfectly on the first try. Here is how to fix the “Big Three” mistakes beginners make.

1. The “Connection Refused” Error

Symptoms: Your Spring Boot app crashes on startup or logs Connection to node -1 could not be established. Broker may not be available.

2. The “Class Not Found” / Serialization Error

Symptoms: The Producer sends the message, but the Consumer logs a massive error saying java.lang.IllegalArgumentException: The class 'com.example.LogMessage' is not in the trusted packages.

```

(Using * trusts all packages, which is fine for local learning.)

3. The “Silent Consumer” (No Data Received)

Symptoms: You send a message, the Producer says “Success,” but the Consumer does absolutely nothing.


Final Summary of the “Kafka Journey”

  1. Zookeeper manages the Brokers.
  2. Producers convert Java Objects into JSON Bytes (Serialization).
  3. Topics act as the mailbox where messages sit.
  4. Consumers pull the bytes and convert them back into Java Objects (Deserialization).
  5. JPA/Hibernate saves that object into your permanent Database.

One Last Beginner Tip: Visualizing Kafka

If you find the terminal/command line confusing, you can download a tool called Offset Explorer (formerly Kafka Tool). It provides a visual UI where you can click on your topics and actually see the messages sitting inside Kafka. It’s much easier than reading raw terminal logs!