Kafka Multi Broker, Docker-compose, nodejs

Docker Compose yml

version: '1'
services:
  zookeeper:
    image: zookeeper:3.7
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
    volumes:
      - ~/data/zookeeper/data:/data
      - ~/data/zookeeper/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ~/data/kafka1/data:/tmp/kafka-logs
    depends_on:
      - zookeeper
  
  kafka2:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka2
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ~/data/kafka2/data:/tmp/kafka-logs
    depends_on:
      - zookeeper

  kafka3:
    image: confluentinc/cp-kafka:7.0.0
    hostname: kafka3
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ~/data/kafka3/data:/tmp/kafka-logs
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKER_CONNECT: "kafka1:19091"
    depends_on:
      - kafka1
      - kafka2
      - kafka3

  postgres:
    container_name: icas_postgres
    image: "postgres:12"
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
    volumes:
       - postgres-data:/var/lib/postgresql/data
       - ./docker-pg-config/database.sql:/docker-entrypoint-initdb.d/create_database.sql
       - ./docker-pg-config/pgsql.sql:/docker-entrypoint-initdb.d/create_table.sql
       - ./docker-pg-config/pgFucntion.sql:/docker-entrypoint-initdb.d/create_function.sql
    ports:
      - "${PRODUCER_OUT_PORT}:${PRODUCER_IN_PORT}"
    restart: always

  redisdb:
    image: redis:alpine
    container_name: icas-redisdb
    ports:
      - 6379:6379
    volumes:
      - ./redis/data:/data
      - ./redis/conf/redis.conf:/usr/local/conf/redis.conf
      - redis-data:/var/lib/redis/data/
    labels:
      - "name=redis"
      - "mode=standalone"
    restart: unless-stopped
    command: redis-server /usr/local/conf/redis.conf

  influxdb:
    image: influxdb:2.0
    container_name: influxdb2
    volumes:
      - /mnt/influxdb/data:/var/lib/influxdb2:rw
      - influxdb-data:/var/lib/influxdb/data/
    env_file:
      - ./influxv2.env
#    entrypoint: ["./entrypoint.sh"]
    ports:
      - 8086:8086
    restart: unless-stopped
    

volumes:
  postgres-data:
  redis-data:
  influxdb-data:
  

docker-compose up -d

nodejs

conf.js

const { Kafka } = require('kafkajs');
const { Partitioners } = require('kafkajs');

const kafkaClient = new Kafka({
  clientId: 'icas-client',
  brokers: ['localhost:9091','localhost:9092','localhost:9093']
});

const topic =  'create-api-data'
const producer = kafkaClient.producer({ createPartitioner: Partitioners.LegacyPartitioner });

const consumer = kafkaClient.consumer({ groupId: 'icas' });
module.exports.kafkaClient = kafkaClient;
module.exports.producer = producer;
module.exports.consumer = consumer;
module.exports.topic = topic;

producer.js

const express = require('express')
const {producer,topic} = require('./conf')
const app = express()
const port = 3000


const initKafka = async () => {
  await producer.connect()
}

app.post('/events/:event', async (req, res) => {
  await producer.send({
    topic:topic,
    messages: [
      { value: req.params.event },
    ],
  })
  res.send('successfully stored event : '+ req.params.event + '\n')
})

app.listen(port, async  () => {
  console.log(`kafka app listening on port ${port}`)
})

initKafka();

consumer.js

const {consumer,topic} = require('./conf')


const run = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning: true })
  await consumer.run({
    // eachBatch: async ({ batch }) => {
    //   console.log(batch)
    // },
    eachMessage: async ({ topic, partition, message }) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
      console.log(`- ${prefix} ${message.key}#${message.value}`)
    },
  })
}


run().catch(e => console.error(`[example/consumer] ${e.message}`, e))

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

errorTypes.forEach(type => {
  process.on(type, async e => {
    try {
      console.log(`process.on ${type}`)
      console.error(e)
      await consumer.disconnect()
      process.exit(0)
    } catch (_) {
      process.exit(1)
    }
  })
})

signalTraps.forEach(type => {
  process.once(type, async () => {
    try {
      await consumer.disconnect()
    } finally {
      process.kill(process.pid, type)
    }
  })
})

Test

Postman 또는 curl 이용해 테스트

http://localhost:3000/events/테스트

Leave a Comment