Docker 환경설정
docker-compose.yml 파일 생성 후 아래 코드 입력하고 저장
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.11-1.1.1
ports:
- "9092:9092"
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CREATE_TOPICS: "topic-test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose up
Nodejs 관련 라이브러리 설치
express와 kafkajs를 설치한다.
npm install express
npm install kafkajs
producer.js 생성 후 아래 코드 입력하고 저장
const express = require('express')
const app = express()
const port = 3000
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const initKafka = async () => {
await producer.connect()
}
app.post('/events/:event', async (req, res) => {
await producer.send({
topic: 'quickstart-events',
messages: [
{ value: req.params.event },
],
})
res.send('successfully stored event : '+ req.params.event + '\n')
})
app.listen(port, async () => {
console.log(`kafka app listening on port ${port}`)
})
initKafka();
consumer.js 파일을 생성하고 아래 코드를 입력하고 저장한다.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const initKafka = async () => {
console.log('start subscribe')
await consumer.connect()
await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
initKafka()
Postman 또는 powershell의 curl을 이용해 테스트를 진행한다.
Client로부터 request를 받으면 사전에 생성한 producer를 통해 토픽과 함께 메세지를 보낸다.
consumer는 producer로부터 보내오는 메세지를 대기하면서 수신하면 출력한다.