An enterprise-grade, event-driven pipeline that streams product reviews through a local LLM for real-time sentiment analysis and bot detection, then indexes enriched results into Elasticsearch.
POST /api/v1/reviews/batch
│
▼
┌───────────────┐ raw-reviews topic ┌─────────────────────┐
│ Redpanda │ ◄────────────────────────── │ ReviewSeeder API │
│ (Kafka) │ │ (Spring REST) │
└──────┬────────┘ └─────────────────────┘
│
│ KafkaListener
▼
┌───────────────────────────────────────────────────────────┐
│ ReviewPipelineConsumer (virtual thread per message) │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ ReviewAnalysisService │ │
│ │ @CircuitBreaker + @TimeLimiter │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Ollama / phi3 (local LLM) │ │
│ │ → { sentiment, bot_probability, summary } │ │
│ └──────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ EnrichedReview DlqProducer │
│ → Elasticsearch → raw-reviews-dlq │
└───────────────────────────────────────────────────────────┘
| Layer | Technology | Version |
|---|---|---|
| Framework | Spring Boot | 3.4.1 |
| Java | Java (Virtual Threads) | 21 |
| Messaging | Redpanda (Kafka-compatible, KRaft) | 24.3.1 |
| LLM | Ollama + phi3 | 0.5.4 |
| AI Integration | Spring AI | 1.0.0-M5 |
| Search/Storage | Elasticsearch | 8.17.0 |
| Resilience | Resilience4j (CircuitBreaker + TimeLimiter) | 2.2.0 |
| Testing | Testcontainers, WireMock, Awaitility | — |
- Docker Desktop (or Docker Engine + Compose v2)
- JDK 21 (must be on
PATH) - Maven 3.9+ (or use
./mvnw)
docker compose up -dThis starts Redpanda (Kafka), Redpanda Console, Elasticsearch, and Ollama.
Run once after first docker compose up. Takes a few minutes depending on your connection.
docker exec -it ollama ollama pull phi3./mvnw spring-boot:runThe app starts on port 8081. Kafka topics (raw-reviews, raw-reviews-dlq) are auto-created on startup.
curl -s -X POST http://localhost:8081/api/v1/reviews/batch \
-H "Content-Type: application/json" \
-d '{
"reviews": [
{
"reviewId": "r1",
"userId": "u1",
"productId": "p1",
"text": "Absolutely love this product! Build quality is outstanding."
}
]
}' | jqExpected response:
{ "accepted": 1, "topic": "raw-reviews" }The review is published to Kafka. The pipeline consumer picks it up, calls Ollama, and indexes an EnrichedReview into Elasticsearch within a few seconds.
curl -s "http://localhost:9200/enriched-reviews/_search?pretty"curl -s http://localhost:8081/actuator/health | jq '.components.circuitBreakers'Publishes a batch of raw reviews to the Kafka pipeline.
Request body:
{
"reviews": [
{
"reviewId": "string (required)",
"userId": "string (required)",
"productId": "string (required)",
"text": "string (required)"
}
]
}Constraints: minimum 1 review, maximum 100 reviews per request.
Response: 202 Accepted
{ "accepted": 3, "topic": "raw-reviews" }# Unit tests only (no Docker required)
./mvnw test -Dgroups="unit"
# Full test suite — requires Docker (Testcontainers will pull images automatically)
./mvnw verifyThe test suite comprises four layers:
| Test class | Type | Infrastructure |
|---|---|---|
ReviewSentinelApplicationTest |
Smoke | None (all mocked) |
ReviewSeederControllerTest |
Unit | None (MockMvc) |
ReviewAnalysisServiceIT |
Integration | WireMock (stubs Ollama) |
EnrichedReviewRepositoryIT |
Integration | Testcontainers Elasticsearch |
ReviewPipelineIT |
End-to-end | Testcontainers Kafka + ES + WireMock |
src/
├── main/
│ ├── java/com/reviewsentinel/
│ │ ├── ReviewSentinelApplication.java # Entry point
│ │ ├── api/
│ │ │ ├── ReviewSeederController.java # POST /api/v1/reviews/batch
│ │ │ └── BatchReviewRequest.java # Request DTO
│ │ ├── config/
│ │ │ └── KafkaTopicConfig.java # Auto-creates Kafka topics
│ │ ├── consumer/
│ │ │ └── ReviewPipelineConsumer.java # Kafka listener → orchestrates pipeline
│ │ ├── domain/
│ │ │ ├── RawReview.java # Kafka message payload (record)
│ │ │ ├── EnrichedReview.java # Elasticsearch document
│ │ │ ├── AnalysisResult.java # LLM JSON output DTO (record)
│ │ │ └── Sentiment.java # POSITIVE | NEGATIVE | NEUTRAL
│ │ ├── exception/
│ │ │ └── LLMProcessingException.java # Signals LLM failures to consumer
│ │ ├── producer/
│ │ │ └── DlqProducer.java # Routes failures to DLQ topic
│ │ ├── repository/
│ │ │ └── EnrichedReviewRepository.java # Spring Data ES repository
│ │ └── service/
│ │ └── ReviewAnalysisService.java # @CircuitBreaker + @TimeLimiter LLM call
│ └── resources/
│ ├── application.yml
│ ├── application-test.yml
│ └── prompts/
│ └── review-analysis.st # LLM prompt template
└── test/
└── java/com/reviewsentinel/
├── ReviewSentinelApplicationTest.java
├── api/ReviewSeederControllerTest.java
├── pipeline/ReviewPipelineIT.java
├── repository/EnrichedReviewRepositoryIT.java
└── service/ReviewAnalysisServiceIT.java
Virtual Threads (Java 21)
spring.threads.virtual.enabled=true replaces Tomcat's platform thread pool and the @Async executor. Each Kafka listener invocation runs on a virtual thread; calling .get() on the LLM CompletableFuture parks cheaply without starving carrier threads.
Resilience4j: Circuit Breaker + Time Limiter
A shared llm-circuit-breaker instance wraps every Ollama call. The @TimeLimiter enforces a hard deadline (10 s in production, 2 s in tests). When the failure rate exceeds 50 % over a sliding window of 10 calls, the circuit opens for 30 s before allowing trial requests through HALF_OPEN state.
Dead Letter Queue
Reviews that fail LLM analysis (timeout, circuit open, malformed JSON) are published to raw-reviews-dlq with their original payload intact. A separate reprocessing job can drain this topic once the LLM recovers, with zero data loss.
Zero-dependency local setup The full stack — broker, console UI, Elasticsearch, and LLM runtime — runs in Docker. No cloud accounts or API keys required.
| Service | URL |
|---|---|
| Redpanda Console | http://localhost:8080 |
| Elasticsearch | http://localhost:9200 |
| Ollama API | http://localhost:11434 |
| Actuator / Health | http://localhost:8081/actuator/health |
To run Ollama with an NVIDIA GPU, uncomment the deploy block in docker-compose.yml under the ollama service:
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]Requires NVIDIA Container Toolkit.