들어가기 앞서……
본 글을 쓰기 전 Kafka Broker를 여러개 생성할 경우 listen해야할 ip:port가 임의로 발생하게 되는데 이를 매번 직접 port change 해줘야하는 번거로움이 발생했다.
따라서 Nginx 를 이용해 Load Balance, Proxy를 진행할 수 있지 않을까 하는 아이디어로 Google, Document Search를 통해 실험을 진행하였으니 연결에 실패했다.
stackoverflow에 질문을 한 결과 Nginx의 통신과 Kafka의 통신 방법이 근본적으로 다른 것으로 답변이 왔다.
따라서 기존 했던 방식대로 API를 구성해 로직을 이용해 Proxy될 수 있도록 하기로 하였다.
이미 오픈소스로 Kafka REST API가 있기 때문에 해당 소스를 사용하였고 Docker를 이용하였다.
Kafka compose yml
version: '2'
name: icas-kafka-server
services:
zk1:
container_name: zookeeper1
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk1/data:/data"
zk2:
container_name: zookeeper2
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk2
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk2/data:/data"
zk3:
container_name: zookeeper3
image: wurstmeister/zookeeper:latest
restart: always
hostname: zk3
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk3/data:/data"
kafka1:
container_name: kafka1
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_HEAP_OPTS: -Xmx256M -Xms128M
kafka2:
container_name: kafka2
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9093:9092"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_HEAP_OPTS: -Xmx256M -Xms128M
kafka3:
container_name: kafka3
image: wurstmeister/kafka:latest
restart: on-failure
depends_on:
- zk1
- zk2
- zk3
ports:
- "9094:9092"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_HEAP_OPTS: -Xmx256M -Xms128M
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "10000:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${DOCKER_HOST_IP}:9092,${DOCKER_HOST_IP}:9093,${DOCKER_HOST_IP}:9094
- KAFKA_CLUSTERS_0_ZOOKEEPER=zk1:2181,zk2:2182,zk1:2183
networks:
kafka-network:
external:
true
db-network:
external:
true
위 compose 파일을 실행하여 Kafka, Zookeeper, Kafka UI를 백그라운드에 실행해준다.
Kafka Schema Register compose
version: '3'
services:
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1-1
container_name: schema-registry
restart: unless-stopped
network_mode: bridge
ports:
- 18081:8081/tcp
environment:
# kafkastore.connection.url
# ZooKeeper URL for the Apache Kafka® cluster
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '192.168.0.8:2181,192.168.0.8:2182,192.168.0.8:2183'
# kafkastore.bootstrap.servers
# A list of Kafka brokers to connect to. For example, PLAINTEXT://hostname:9092,SSL://hostname2:9092
# The effect of this setting depends on whether you specify kafkastore.connection.url.
#
# If kafkastore.connection.url is not specified, the Kafka cluster containing these bootstrap servers is used
# both to coordinate Schema Registry instances (primary election) and to store schema data.
#
# If kafkastore.connection.url is specified, this setting is used to control how Schema Registry connects to
# Kafka to store schema data and is particularly important when Kafka security is enabled. When this
# configuration is not specified, Schema Registry's internal Kafka clients will get their Kafka bootstrap
# server list from ZooKeeper (configured with kafkastore.connection.url). In that case, all available
# listeners matching the kafkastore.security.protocol setting is used.
#
# By specifying this configuration, you can control which endpoints are used to connect to Kafka. Kafka may
# expose multiple endpoints that all will be stored in ZooKeeper, but Schema Registry may need to be configured
# with just one of those endpoints, for example to control which security protocol it uses.
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://192.168.0.8:9092,PLAINTEXT://192.168.0.8:9093,PLAINTEXT://192.168.0.8:9094'
# listeners
# Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. If a listener uses
# HTTPS, the appropriate SSL configuration parameters need to be set as well.
#
# Schema Registry identities are stored in ZooKeeper and are made up of a hostname and port. If multiple
# listeners are configured, the first listener's port is used for its identity.
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:18081'
# avro.compatibility.level
# The Avro compatibility type. Valid values are: none (new schema can be any valid Avro schema), backward (new
# schema can read data produced by latest registered schema), backward_transitive (new schema can read data
# produced by all previously registered schemas), forward (latest registered schema can read data produced by
# the new schema), forward_transitive (all previously registered schemas can read data produced by the new
# schema), full (new schema is backward and forward compatible with latest registered schema), full_transitive
# (new schema is backward and forward compatible with all previously registered schemas)
SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL: 'backward'
# host.name
# The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.
SCHEMA_REGISTRY_HOST_NAME: 'localhost'
# kafkastore.ssl.key.password
# The password of the key contained in the keystore.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ''
# kafkastore.ssl.keystore.location
# The location of the SSL keystore file.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: ''
# kafkastore.ssl.keystore.password
# The password to access the keystore.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ''
# kafkastore.ssl.truststore.location
# The location of the SSL trust store file.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: ''
# kafkastore.ssl.truststore.password
# The password to access the trust store.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ''
# kafkastore.topic
# The durable single partition topic that acts as the durable log for the data. This topic must be compacted to
# avoid losing data due to retention policy.
#SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
# kafkastore.topic.replication.factor
# The desired replication factor of the schema topic. The actual replication factor will be the smaller of this
# value and the number of live Kafka brokers.
#SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
# response.mediatype.default
# The default response media type that should be used if no specify types are requested in an Accept header.
#SCHEMA_REGISTRY_RESPONSE_MEDIATYPE_DEFAULT: 'application/vnd.schemaregistry.v1+json'
# ssl.keystore.location
# Used for HTTPS. Location of the keystore file to use for SSL.
# IMPORTANT: Jetty requires that the key's CN, stored in the keystore, must match the FQDN.
#SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: ''
# ssl.keystore.password
# Used for HTTPS. The store password for the keystore file.
#SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ''
# ssl.key.password
# Used for HTTPS. The password of the private key in the keystore file.
#SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ''
# ssl.truststore.location
# Used for HTTPS. Location of the trust store. Required only to authenticate HTTPS clients.
#SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: ''
# ssl.truststore.password
# Used for HTTPS. The store password for the trust store file.
#SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ''
# response.mediatype.preferred
# An ordered list of the server's preferred media types used for responses, from most preferred to least.
#SCHEMA_REGISTRY_RESPONSE_MEDIATYPE_PREFERRED: 'application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json'
# zookeeper.set.acl
# Whether or not to set an ACL in ZooKeeper when znodes are created and ZooKeeper SASL authentication is
# configured.
# IMPORTANT: If set to true, the ZooKeeper SASL principal must be the same as the Kafka brokers.
#SCHEMA_REGISTRY_ZOOKEEPER_SET_ACL: 'false'
# kafkastore.init.timeout.ms
# The timeout for initialization of the Kafka store, including creation of the Kafka topic that stores schema
# data.
#SCHEMA_REGISTRY_KAFKASTORE_INIT_TIMEOUT_MS: 60000
# kafkastore.security.protocol
# The security protocol to use when connecting with Kafka, the underlying persistent storage. Values can be
# PLAINTEXT, SASL_PLAINTEXT, SSL or SASL_SSL.
#SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
# kafkastore.ssl.enabled.protocols
# Protocols enabled for SSL connections.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_ENABLED_PROTOCOLS: 'TLSv1.2,TLSv1.1,TLSv1'
# kafkastore.ssl.keystore.type
# The file format of the keystore.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_TYPE: JKS
# kafkastore.ssl.protocol
# The SSL protocol used.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_PROTOCOL: TLS
# kafkastore.ssl.provider
# The name of the security provider used for SSL.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_PROVIDER: ''
# kafkastore.ssl.truststore.type
# The file format of the trust store.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_TYPE: JKS
# kafkastore.timeout.ms
# The timeout for an operation on the Kafka store
#SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS: 500
# master.eligibility
# If true, this node can participate in primary election. In a multi-colo setup, turn this off for clusters in
# the secondary data center.
#SCHEMA_REGISTRY_MASTER_ELIGIBILITY: 'true'
# kafkastore.sasl.kerberos.service.name
# The Kerberos principal name that the Kafka client runs as. This can be defined either in the JAAS config file
# or here.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_SERVICE_NAME: ''
# kafkastore.sasl.mechanism
# The SASL mechanism used for Kafka connections. GSSAPI is the default.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: GSSAPI
# access.control.allow.methods
# Set value to Jetty Access-Control-Allow-Origin header for specified methods
#SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: ''
# ssl.keystore.type
# Used for HTTPS. The type of keystore file.
#SCHEMA_REGISTRY_SSL_KEYSTORE_TYPE: JKS
# ssl.truststore.type
# Used for HTTPS. The type of trust store file.
#SCHEMA_REGISTRY_SSL_TRUSTSTORE_TYPE: JKS
# ssl.protocol
# Used for HTTPS. The SSL protocol used to generate the SslContextFactory.
#SCHEMA_REGISTRY_SSL_PROTOCOL: TLS
# ssl.provider
# Used for HTTPS. The SSL security provider name. Leave blank to use Jetty's default.
#SCHEMA_REGISTRY_SSL_PROVIDER: ''
# ssl.client.auth
# Used for HTTPS. Whether or not to require the HTTPS client to authenticate via the server's trust store.
#SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'false'
# ssl.enabled.protocols
# Used for HTTPS. The list of protocols enabled for SSL connections. Comma-separated list. Leave blank to use
# Jetty's defaults.
#SCHEMA_REGISTRY_SSL_ENABLED_PROTOCOLS: ''
# access.control.allow.origin
# Set value for Jetty Access-Control-Allow-Origin header
#SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: ''
# debug
# Boolean indicating whether extra debugging information is generated in some error response entities.
#SCHEMA_REGISTRY_DEBUG: 'false'
# kafkastore.ssl.cipher.suites
# A list of cipher suites used for SSL.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_CIPHER_SUITES: ''
# kafkastore.ssl.endpoint.identification.algorithm
# The endpoint identification algorithm to validate the server hostname using the server certificate.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https
# kafkastore.ssl.keymanager.algorithm
# The algorithm used by key manager factory for SSL connections.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYMANAGER_ALGORITHM: SunX509
# kafkastore.ssl.trustmanager.algorithm
# The algorithm used by the trust manager factory for SSL connections.
#SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_ALGORITHM: PKIX
# kafkastore.zk.session.timeout.ms
# ZooKeeper session timeout
#SCHEMA_REGISTRY_KAFKASTORE_ZK_SESSION_TIMEOUT_MS: 30000
# metric.reporters
# A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in
# classes that will be notified of new metric creation. The JmxReporter is always included to register JMX
# statistics.
#SCHEMA_REGISTRY_METRIC_REPORTERS: ''
# metrics.jmx.prefix
# Prefix to apply to metric names for the default JMX reporter.
#SCHEMA_REGISTRY_METRICS_JMX_PREFIX: 'kafka.schema.registry'
# metrics.num.samples
# The number of samples maintained to compute metrics.
#SCHEMA_REGISTRY_METRICS_NUM_SAMPLES: 2
# metrics.sample.window.ms
# The metrics system maintains a configurable number of samples over a fixed window size. This configuration
# controls the size of the window. For example we might maintain two samples each measured over a 30 second
# period. When a window expires we erase and overwrite the oldest window.
#SCHEMA_REGISTRY_METRICS_SAMPLE_WINDOW_MS: 30000
# request.logger.name
# Name of the SLF4J logger to write the NCSA Common Log Format request log.
#SCHEMA_REGISTRY_REQUEST_LOGGER_NAME: 'io.confluent.rest-utils.requests'
# inter.instance.protocol
# The protocol used while making calls between the instances of Schema Registry. The secondary to primary node
# calls for writes and deletes will use the specified protocol. The default value would be http. When https is
# set, ssl.keystore and ssl.truststore configs are used while making the call. (Use instead of the deprecated
# schema.registry.inter.instance.protocol.)
#SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: http
# resource.extension.class
# Fully qualified class name of a valid implementation of the interface SchemaRegistryResourceExtension. This
# can be used to inject user defined resources like filters. Typically used to add custom capability like
# logging, security, etc. (Use resource.extension.class instead of deprecated
# schema.registry.resource.extension.class.)
#SCHEMA_REGISTRY_RESOURCE_EXTENSION_CLASS: ''
# schema.registry.zk.namespace
# The string that is used as the ZooKeeper namespace for storing Schema Registry metadata. Schema Registry
# instances which are part of the same Schema Registry service should have the same ZooKeeper namespace.
#SCHEMA_REGISTRY_SCHEMA_REGISTRY_ZK_NAMESPACE: schema_registry
# shutdown.graceful.ms
# Amount of time to wait after a shutdown request for outstanding requests to complete.
#SCHEMA_REGISTRY_SHUTDOWN_GRACEFUL_MS: 1000
# ssl.keymanager.algorithm
# Used for HTTPS. The algorithm used by the key manager factory for SSL connections. Leave blank to use Jetty's
# default.
#SCHEMA_REGISTRY_SSL_KEYMANAGER_ALGORITHM: ''
# ssl.trustmanager.algorithm
# Used for HTTPS. The algorithm used by the trust manager factory for SSL connections. Leave blank to use
# Jetty's default.
#SCHEMA_REGISTRY_SSL_TRUSTMANAGER_ALGORITHM: ''
# ssl.cipher.suites
# Used for HTTPS. A list of SSL cipher suites. Comma-separated list. Leave blank to use Jetty's defaults.
#SCHEMA_REGISTRY_SSL_CIPHER_SUITES: ''
# ssl.endpoint.identification.algorithm
# Used for HTTPS. The endpoint identification algorithm to validate the server hostname using the server
# certificate. Leave blank to use Jetty's default.
#SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ''
# kafkastore.sasl.kerberos.kinit.cmd
# The Kerberos kinit command path.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_KINIT_CMD: '/usr/bin/kinit'
# kafkastore.sasl.kerberos.min.time.before.relogin
# The login time between refresh attempts.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN: 60000
# kafkastore.sasl.kerberos.ticket.renew.jitter
# The percentage of random jitter added to the renewal time.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_TICKET_RENEW_JITTER: 0.05
# kafkastore.sasl.kerberos.ticket.renew.window.factor
# Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has
# been reached, at which time it will try to renew the ticket.
#SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR: 0.8
# kafkastore.group.id
# Use this setting to override the group.id for the KafkaStore consumer. This setting can become important when
# security is enabled, to ensure stability over Schema Registry consumer's group.id
# Without this configuration, group.id will be schema-registry-<host>-<port>.
#SCHEMA_REGISTRY_KAFKASTORE_GROUP_ID: ''
Kafka Rest Compose
version: '2'
services:
kafka-rest:
image: confluentinc/cp-kafka-rest:5.3.1-1
container_name: kafka-rest
restart: unless-stopped
network_mode: bridge
ports:
- 4000:4000/tcp
environment:
########## General ##########
# host.name
# The host name used to generate absolute URLs in responses. If empty, the default canonical hostname is used.
KAFKA_REST_HOST_NAME: 'KAFKA'
# id
# Unique ID for the Confluent REST Proxy server instance. This is used in generating unique IDs for consumers
# that do not specify their ID. The ID is empty by default, which makes a single server setup easier to get up
# and running, but is not safe for multi-server deployments where automatic consumer IDs are used.
#KAFKA_REST_ID: ''
# bootstrap.servers
# A list of Kafka brokers to connect to. For example, PLAINTEXT://hostname:9092,SSL://hostname2:9092. This
# configuration is particularly important when Kafka security is enabled, because Kafka may expose multiple
# endpoints that all will be stored in ZooKeeper, but REST Proxy may need to be configured with just one of
# those endpoints. The client will make use of all servers irrespective of which servers are specified here for
# bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. Since these
# servers are just used for the initial connection to discover the full cluster membership (which may change
# dynamically), this list need not contain the full set of servers (you may want more than one, though, in case
# a server is down).
KAFKA_REST_BOOTSTRAP_SERVERS: 'PLAINTEXT://192.168.0.8:9092,PLAINTEXT://192.168.0.8:9093,PLAINTEXT://192.168.0.8:9094'
# listeners
# Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. If a listener uses
# HTTPS, the appropriate SSL configuration parameters need to be set as well.
KAFKA_REST_LISTENERS: 'http://0.0.0.0:4000'
# schema.registry.url
# The base URL for Schema Registry that should be used by the Avro serializer.
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://192.168.0.16:18081'
# zookeeper.connect
# Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port
# of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down
# you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.
#
# The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its
# data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in
# its connection string. For example to give a chroot path of /chroot/path you would give the connection string
# as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
KAFKA_REST_ZOOKEEPER_CONNECT: '192.168.0.8:2181,192.168.0.8:2182,192.168.0.8:2183'
* 반드시 외부 포트는 개방하는 것을 잊지말도록 해야한다. 문도 안열고 들어오라고 하는 실수 하지 말것.
HTTP 통신을 이용해 Rest API 이용하기
{내용}
내용에 들어갈 이름들을 쓰고 {}는 지워서 사용할것.
Producer
URL : http://localhost:4000/topics/{토픽이름}
METHOD : POST
HEADER : {Content-Type : application/vnd.kafka.json.v2+json}
BODY(DATA) : {“records”:[{“key”:”keyiskey”,”value”:{“id”:”probiotics”}}]}
반영 결과가 Body Response로 출력된다.
Kafka UI를 통해 결과를 확인했을 때 잘 보내진 것을 볼 수 있다.
Consumer Create
컨슈머를 사용하기 전 먼저 생성을 진행해야한다.
URL : http://localhost:4000/consumers/sales_json_consumer
METHOD : POST
BODY(DATA): {“name”: “컨슈머명”, “format”: “json”, “auto.offset.reset”: “smallest”}
반영 결과가 Body Response에 출력되고 사용할 수 있는 url과 instanse id를 반환해준다.