Docker Compose yml
version: '1'
services:
zookeeper:
image: zookeeper:3.7
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
volumes:
- ~/data/zookeeper/data:/data
- ~/data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ~/data/kafka1/data:/tmp/kafka-logs
depends_on:
- zookeeper
kafka2:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ~/data/kafka2/data:/tmp/kafka-logs
depends_on:
- zookeeper
kafka3:
image: confluentinc/cp-kafka:7.0.0
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ~/data/kafka3/data:/tmp/kafka-logs
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKER_CONNECT: "kafka1:19091"
depends_on:
- kafka1
- kafka2
- kafka3
postgres:
container_name: icas_postgres
image: "postgres:12"
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
volumes:
- postgres-data:/var/lib/postgresql/data
- ./docker-pg-config/database.sql:/docker-entrypoint-initdb.d/create_database.sql
- ./docker-pg-config/pgsql.sql:/docker-entrypoint-initdb.d/create_table.sql
- ./docker-pg-config/pgFucntion.sql:/docker-entrypoint-initdb.d/create_function.sql
ports:
- "${PRODUCER_OUT_PORT}:${PRODUCER_IN_PORT}"
restart: always
redisdb:
image: redis:alpine
container_name: icas-redisdb
ports:
- 6379:6379
volumes:
- ./redis/data:/data
- ./redis/conf/redis.conf:/usr/local/conf/redis.conf
- redis-data:/var/lib/redis/data/
labels:
- "name=redis"
- "mode=standalone"
restart: unless-stopped
command: redis-server /usr/local/conf/redis.conf
influxdb:
image: influxdb:2.0
container_name: influxdb2
volumes:
- /mnt/influxdb/data:/var/lib/influxdb2:rw
- influxdb-data:/var/lib/influxdb/data/
env_file:
- ./influxv2.env
# entrypoint: ["./entrypoint.sh"]
ports:
- 8086:8086
restart: unless-stopped
volumes:
postgres-data:
redis-data:
influxdb-data:
docker-compose up -d
nodejs
conf.js
const { Kafka } = require('kafkajs');
const { Partitioners } = require('kafkajs');
const kafkaClient = new Kafka({
clientId: 'icas-client',
brokers: ['localhost:9091','localhost:9092','localhost:9093']
});
const topic = 'create-api-data'
const producer = kafkaClient.producer({ createPartitioner: Partitioners.LegacyPartitioner });
const consumer = kafkaClient.consumer({ groupId: 'icas' });
module.exports.kafkaClient = kafkaClient;
module.exports.producer = producer;
module.exports.consumer = consumer;
module.exports.topic = topic;
producer.js
const express = require('express')
const {producer,topic} = require('./conf')
const app = express()
const port = 3000
const initKafka = async () => {
await producer.connect()
}
app.post('/events/:event', async (req, res) => {
await producer.send({
topic:topic,
messages: [
{ value: req.params.event },
],
})
res.send('successfully stored event : '+ req.params.event + '\n')
})
app.listen(port, async () => {
console.log(`kafka app listening on port ${port}`)
})
initKafka();
consumer.js
const {consumer,topic} = require('./conf')
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
// },
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
errorTypes.forEach(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
await consumer.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})
signalTraps.forEach(type => {
process.once(type, async () => {
try {
await consumer.disconnect()
} finally {
process.kill(process.pid, type)
}
})
})
Test
Postman 또는 curl 이용해 테스트
http://localhost:3000/events/테스트