Kafka Nodejs 설치와 간단 사용 [kafkajs]

Docker 환경설정

docker-compose.yml 파일 생성 후 아래 코드 입력하고 저장

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.11-1.1.1
    ports:
      - "9092:9092"
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME:  127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_CREATE_TOPICS: "topic-test:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

docker-compose up


Nodejs 관련 라이브러리 설치

express와 kafkajs를 설치한다.

npm install express
npm install kafkajs

producer.js 생성 후 아래 코드 입력하고 저장

const express = require('express')
const app = express()
const port = 3000

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const initKafka = async () => {
  await producer.connect()
}

app.post('/events/:event', async (req, res) => {
  await producer.send({
    topic: 'quickstart-events',
    messages: [
      { value: req.params.event },
    ],
  })
  res.send('successfully stored event : '+ req.params.event + '\n')
})

app.listen(port, async  () => {
  console.log(`kafka app listening on port ${port}`)
})

initKafka();

consumer.js 파일을 생성하고 아래 코드를 입력하고 저장한다.

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const initKafka = async () => {
  console.log('start subscribe')
  await consumer.connect()
  await consumer.subscribe({ topic: 'quickstart-events', fromBeginning: true })
  await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})
}

initKafka()

Postman 또는 powershell의 curl을 이용해 테스트를 진행한다.


Client로부터 request를 받으면 사전에 생성한 producer를 통해 토픽과 함께 메세지를 보낸다.

consumer는 producer로부터 보내오는 메세지를 대기하면서 수신하면 출력한다.

Leave a Comment