Skip to content

Instantly share code, notes, and snippets.

@SPopenko
Last active September 16, 2025 07:04
Show Gist options
  • Select an option

  • Save SPopenko/ccecd6517c5512a57c9ab4aae8899463 to your computer and use it in GitHub Desktop.

Select an option

Save SPopenko/ccecd6517c5512a57c9ab4aae8899463 to your computer and use it in GitHub Desktop.
This is POC implementation of Camel K route that support Request/Reply pattern based on Kafka request and response topics.
// camel-k: language=js
// camel-k: dependency=camel-kafka
// camel-k: dependency=camel-platform-http
// camel-k: trait=service.enabled=true
// camel-k: trait=service.type=NodePort
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// HTTP endpoint that accepts requests and returns responses
// https://camel.apache.org/components/4.14.x/eips/requestReply-eip.html
rest('/api/request')
.produces("application/json")
.consumes("application/json")
.post()
.to('direct:process-request');
// Main request processing route
from('direct:process-request')
// Set exchange pattern to InOut for request-reply
.setExchangePattern(org.apache.camel.ExchangePattern.InOut)
// Generate correlation ID
.setHeader('correlationId', simple('${exchangeId}'))
// Log the incoming request
.log('Received HTTP request with correlation ID: ${header.correlationId}')
// Add timestamp for tracking
.setHeader('requestTimestamp', simple('${date:now:yyyy-MM-dd HH:mm:ss}'))
// Send request to Kafka topic
.to('kafka:request-topic?brokers={{kafka.brokers:localhost:9092}}&key=${header.correlationId}')
// Log that request was sent
.log('Request sent to Kafka with correlation ID: ${header.correlationId}')
// Wait for response from Kafka response topic
.to('kafka:response-topic?brokers={{kafka.brokers:localhost:9092}}&key=${header.correlationId}&groupId=request-reply-consumer')
// Log the response
.log('Response received from Kafka with correlation ID: ${header.correlationId}')
// Set response headers
.setHeader('Content-Type', constant('application/json'))
.setHeader('X-Correlation-ID', simple('${header.correlationId}'))
// Transform response if needed (optional)
.transform().simple('{"message": "${body}", "correlationId": "${header.correlationId}", "timestamp": "${header.requestTimestamp}"}');
// Alternative route for testing without Kafka (direct response)
rest('/api/test')
.produces("application/json")
.consumes("application/json")
.post()
.to('direct:test-response');
from('direct:test-response')
.setExchangePattern(org.apache.camel.ExchangePattern.InOut)
.setHeader('correlationId', simple('${exchangeId}'))
.setHeader('Content-Type', constant('application/json'))
.transform().simple('{"message": "Test response", "correlationId": "${header.correlationId}", "originalBody": "${body}"}');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment