Kafka Rest API (Proxy?)

들어가기 앞서……

본 글을 쓰기 전 Kafka Broker를 여러개 생성할 경우 listen해야할 ip:port가 임의로 발생하게 되는데 이를 매번 직접 port change 해줘야하는 번거로움이 발생했다.

따라서 Nginx 를 이용해 Load Balance, Proxy를 진행할 수 있지 않을까 하는 아이디어로 Google, Document Search를 통해 실험을 진행하였으니 연결에 실패했다.

stackoverflow에 질문을 한 결과 Nginx의 통신과 Kafka의 통신 방법이 근본적으로 다른 것으로 답변이 왔다.

따라서 기존 했던 방식대로 API를 구성해 로직을 이용해 Proxy될 수 있도록 하기로 하였다.

이미 오픈소스로 Kafka REST API가 있기 때문에 해당 소스를 사용하였고 Docker를 이용하였다.

Kafka compose yml

version: '2'
name: icas-kafka-server
services: 

  zk1:
    container_name: zookeeper1
    image: wurstmeister/zookeeper:latest
    restart: always
    hostname: zk1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk1/data:/data"

  zk2:
    container_name: zookeeper2
    image: wurstmeister/zookeeper:latest
    restart: always
    hostname: zk2
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk2/data:/data"

  zk3:
    container_name: zookeeper3
    image: wurstmeister/zookeeper:latest
    restart: always
    hostname: zk3
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk3/data:/data"

  kafka1:
    container_name: kafka1
    image: wurstmeister/kafka:latest
    restart: on-failure
    depends_on:
      - zk1
      - zk2
      - zk3
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
      BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
      KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_HEAP_OPTS: -Xmx256M -Xms128M

  kafka2:
    container_name: kafka2
    image: wurstmeister/kafka:latest
    restart: on-failure
    depends_on:
      - zk1
      - zk2
      - zk3
    ports:
      - "9093:9092"
    volumes:
       - /var/run/docker.sock:/var/run/docker.sock
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
      BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
      KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_HEAP_OPTS: -Xmx256M -Xms128M

  kafka3:
    container_name: kafka3
    image: wurstmeister/kafka:latest
    restart: on-failure
    depends_on:
      - zk1
      - zk2
      - zk3
    ports:
      - "9094:9092"
    volumes:
       - /var/run/docker.sock:/var/run/docker.sock
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
      BOOTSTRAP_SERVERS: ${DOCKER_HOST_IP}:9092, ${DOCKER_HOST_IP}:9093, ${DOCKER_HOST_IP}:9094
      KAFKA_ZOOKEEPER_CONNECT: "zk1:2181,zk2:2182,zk3:2183"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_HEAP_OPTS: -Xmx256M -Xms128M

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "10000:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${DOCKER_HOST_IP}:9092,${DOCKER_HOST_IP}:9093,${DOCKER_HOST_IP}:9094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zk1:2181,zk2:2182,zk1:2183
networks:      
  kafka-network:
    external:
      true
  db-network:
    external:
      true

위 compose 파일을 실행하여 Kafka, Zookeeper, Kafka UI를 백그라운드에 실행해준다.

Kafka Schema Register compose

version: '3'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1-1
    container_name: schema-registry
    restart: unless-stopped
    network_mode: bridge
    ports:
      - 18081:8081/tcp
    environment:
      # kafkastore.connection.url
      #   ZooKeeper URL for the Apache Kafka® cluster
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '192.168.0.8:2181,192.168.0.8:2182,192.168.0.8:2183'

      # kafkastore.bootstrap.servers
      #   A list of Kafka brokers to connect to. For example, PLAINTEXT://hostname:9092,SSL://hostname2:9092
      #   The effect of this setting depends on whether you specify kafkastore.connection.url.
      #
      #   If kafkastore.connection.url is not specified, the Kafka cluster containing these bootstrap servers is used
      #   both to coordinate Schema Registry instances (primary election) and to store schema data.
      #
      #   If kafkastore.connection.url is specified, this setting is used to control how Schema Registry connects to
      #   Kafka to store schema data and is particularly important when Kafka security is enabled. When this
      #   configuration is not specified, Schema Registry's internal Kafka clients will get their Kafka bootstrap
      #   server list from ZooKeeper (configured with kafkastore.connection.url). In that case, all available
      #   listeners matching the kafkastore.security.protocol setting is used.
      #
      #   By specifying this configuration, you can control which endpoints are used to connect to Kafka. Kafka may
      #   expose multiple endpoints that all will be stored in ZooKeeper, but Schema Registry may need to be configured
      #   with just one of those endpoints, for example to control which security protocol it uses.
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://192.168.0.8:9092,PLAINTEXT://192.168.0.8:9093,PLAINTEXT://192.168.0.8:9094'

      # listeners
      #   Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. If a listener uses
      #   HTTPS, the appropriate SSL configuration parameters need to be set as well.
      #
      #   Schema Registry identities are stored in ZooKeeper and are made up of a hostname and port. If multiple
      #   listeners are configured, the first listener's port is used for its identity.
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:18081'

      # avro.compatibility.level
      #   The Avro compatibility type. Valid values are: none (new schema can be any valid Avro schema), backward (new
      #   schema can read data produced by latest registered schema), backward_transitive (new schema can read data
      #   produced by all previously registered schemas), forward (latest registered schema can read data produced by
      #   the new schema), forward_transitive (all previously registered schemas can read data produced by the new
      #   schema), full (new schema is backward and forward compatible with latest registered schema), full_transitive
      #   (new schema is backward and forward compatible with all previously registered schemas)
      SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL: 'backward'

      # host.name
      #   The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.
      SCHEMA_REGISTRY_HOST_NAME: 'localhost'

      # kafkastore.ssl.key.password
      #   The password of the key contained in the keystore.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ''

      # kafkastore.ssl.keystore.location
      #   The location of the SSL keystore file.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: ''

      # kafkastore.ssl.keystore.password
      #   The password to access the keystore.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ''

      # kafkastore.ssl.truststore.location
      #   The location of the SSL trust store file.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: ''

      # kafkastore.ssl.truststore.password
      #   The password to access the trust store.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ''

      # kafkastore.topic
      #   The durable single partition topic that acts as the durable log for the data. This topic must be compacted to
      #   avoid losing data due to retention policy.
      #SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

      # kafkastore.topic.replication.factor
      #   The desired replication factor of the schema topic. The actual replication factor will be the smaller of this
      #   value and the number of live Kafka brokers.
      #SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3

      # response.mediatype.default
      #   The default response media type that should be used if no specify types are requested in an Accept header.
      #SCHEMA_REGISTRY_RESPONSE_MEDIATYPE_DEFAULT: 'application/vnd.schemaregistry.v1+json'

      # ssl.keystore.location
      #   Used for HTTPS. Location of the keystore file to use for SSL.
      #   IMPORTANT: Jetty requires that the key's CN, stored in the keystore, must match the FQDN.
      #SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: ''

      # ssl.keystore.password
      #   Used for HTTPS. The store password for the keystore file.
      #SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ''

      # ssl.key.password
      #   Used for HTTPS. The password of the private key in the keystore file.
      #SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ''

      # ssl.truststore.location
      #   Used for HTTPS. Location of the trust store. Required only to authenticate HTTPS clients.
      #SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: ''

      # ssl.truststore.password
      #   Used for HTTPS. The store password for the trust store file.
      #SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ''

      # response.mediatype.preferred
      #   An ordered list of the server's preferred media types used for responses, from most preferred to least.
      #SCHEMA_REGISTRY_RESPONSE_MEDIATYPE_PREFERRED: 'application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json'

      # zookeeper.set.acl
      #   Whether or not to set an ACL in ZooKeeper when znodes are created and ZooKeeper SASL authentication is
      #   configured.
      #   IMPORTANT: If set to true, the ZooKeeper SASL principal must be the same as the Kafka brokers.
      #SCHEMA_REGISTRY_ZOOKEEPER_SET_ACL: 'false'

      # kafkastore.init.timeout.ms
      #   The timeout for initialization of the Kafka store, including creation of the Kafka topic that stores schema
      #   data.
      #SCHEMA_REGISTRY_KAFKASTORE_INIT_TIMEOUT_MS: 60000

      # kafkastore.security.protocol
      #   The security protocol to use when connecting with Kafka, the underlying persistent storage. Values can be
      #   PLAINTEXT, SASL_PLAINTEXT, SSL or SASL_SSL.
      #SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT

      # kafkastore.ssl.enabled.protocols
      #   Protocols enabled for SSL connections.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_ENABLED_PROTOCOLS: 'TLSv1.2,TLSv1.1,TLSv1'

      # kafkastore.ssl.keystore.type
      #   The file format of the keystore.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_TYPE: JKS

      # kafkastore.ssl.protocol
      #   The SSL protocol used.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_PROTOCOL: TLS

      # kafkastore.ssl.provider
      #   The name of the security provider used for SSL.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_PROVIDER: ''

      # kafkastore.ssl.truststore.type
      #   The file format of the trust store.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_TYPE: JKS

      # kafkastore.timeout.ms
      #   The timeout for an operation on the Kafka store
      #SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS: 500

      # master.eligibility
      #   If true, this node can participate in primary election. In a multi-colo setup, turn this off for clusters in
      #   the secondary data center.
      #SCHEMA_REGISTRY_MASTER_ELIGIBILITY: 'true'

      # kafkastore.sasl.kerberos.service.name
      #   The Kerberos principal name that the Kafka client runs as. This can be defined either in the JAAS config file
      #   or here.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_SERVICE_NAME: ''

      # kafkastore.sasl.mechanism
      #   The SASL mechanism used for Kafka connections. GSSAPI is the default.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: GSSAPI

      # access.control.allow.methods
      #   Set value to Jetty Access-Control-Allow-Origin header for specified methods
      #SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: ''

      # ssl.keystore.type
      #   Used for HTTPS. The type of keystore file.
      #SCHEMA_REGISTRY_SSL_KEYSTORE_TYPE: JKS

      # ssl.truststore.type
      #   Used for HTTPS. The type of trust store file.
      #SCHEMA_REGISTRY_SSL_TRUSTSTORE_TYPE: JKS

      # ssl.protocol
      #   Used for HTTPS. The SSL protocol used to generate the SslContextFactory.
      #SCHEMA_REGISTRY_SSL_PROTOCOL: TLS

      # ssl.provider
      #   Used for HTTPS. The SSL security provider name. Leave blank to use Jetty's default.
      #SCHEMA_REGISTRY_SSL_PROVIDER: ''

      # ssl.client.auth
      #   Used for HTTPS. Whether or not to require the HTTPS client to authenticate via the server's trust store.
      #SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'false'

      # ssl.enabled.protocols
      #   Used for HTTPS. The list of protocols enabled for SSL connections. Comma-separated list. Leave blank to use
      #   Jetty's defaults.
      #SCHEMA_REGISTRY_SSL_ENABLED_PROTOCOLS: ''

      # access.control.allow.origin
      #   Set value for Jetty Access-Control-Allow-Origin header
      #SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: ''

      # debug
      #   Boolean indicating whether extra debugging information is generated in some error response entities.
      #SCHEMA_REGISTRY_DEBUG: 'false'

      # kafkastore.ssl.cipher.suites
      #   A list of cipher suites used for SSL.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_CIPHER_SUITES: ''

      # kafkastore.ssl.endpoint.identification.algorithm
      #   The endpoint identification algorithm to validate the server hostname using the server certificate.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https

      # kafkastore.ssl.keymanager.algorithm
      #   The algorithm used by key manager factory for SSL connections.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYMANAGER_ALGORITHM: SunX509

      # kafkastore.ssl.trustmanager.algorithm
      #   The algorithm used by the trust manager factory for SSL connections.
      #SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_ALGORITHM: PKIX

      # kafkastore.zk.session.timeout.ms
      #   ZooKeeper session timeout
      #SCHEMA_REGISTRY_KAFKASTORE_ZK_SESSION_TIMEOUT_MS: 30000

      # metric.reporters
      #   A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in
      #   classes that will be notified of new metric creation. The JmxReporter is always included to register JMX
      #   statistics.
      #SCHEMA_REGISTRY_METRIC_REPORTERS: ''

      # metrics.jmx.prefix
      #   Prefix to apply to metric names for the default JMX reporter.
      #SCHEMA_REGISTRY_METRICS_JMX_PREFIX: 'kafka.schema.registry'

      # metrics.num.samples
      #   The number of samples maintained to compute metrics.
      #SCHEMA_REGISTRY_METRICS_NUM_SAMPLES: 2

      # metrics.sample.window.ms
      #   The metrics system maintains a configurable number of samples over a fixed window size. This configuration
      #   controls the size of the window. For example we might maintain two samples each measured over a 30 second
      #   period. When a window expires we erase and overwrite the oldest window.
      #SCHEMA_REGISTRY_METRICS_SAMPLE_WINDOW_MS: 30000

      # request.logger.name
      #   Name of the SLF4J logger to write the NCSA Common Log Format request log.
      #SCHEMA_REGISTRY_REQUEST_LOGGER_NAME: 'io.confluent.rest-utils.requests'

      # inter.instance.protocol
      #   The protocol used while making calls between the instances of Schema Registry. The secondary to primary node
      #   calls for writes and deletes will use the specified protocol. The default value would be http. When https is
      #   set, ssl.keystore and ssl.truststore configs are used while making the call. (Use instead of the deprecated
      #   schema.registry.inter.instance.protocol.)
      #SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: http

      # resource.extension.class
      #   Fully qualified class name of a valid implementation of the interface SchemaRegistryResourceExtension. This
      #   can be used to inject user defined resources like filters. Typically used to add custom capability like
      #   logging, security, etc. (Use resource.extension.class instead of deprecated
      #   schema.registry.resource.extension.class.)
      #SCHEMA_REGISTRY_RESOURCE_EXTENSION_CLASS: ''

      # schema.registry.zk.namespace
      #   The string that is used as the ZooKeeper namespace for storing Schema Registry metadata. Schema Registry
      #   instances which are part of the same Schema Registry service should have the same ZooKeeper namespace.
      #SCHEMA_REGISTRY_SCHEMA_REGISTRY_ZK_NAMESPACE: schema_registry

      # shutdown.graceful.ms
      #   Amount of time to wait after a shutdown request for outstanding requests to complete.
      #SCHEMA_REGISTRY_SHUTDOWN_GRACEFUL_MS: 1000

      # ssl.keymanager.algorithm
      #   Used for HTTPS. The algorithm used by the key manager factory for SSL connections. Leave blank to use Jetty's
      #   default.
      #SCHEMA_REGISTRY_SSL_KEYMANAGER_ALGORITHM: ''

      # ssl.trustmanager.algorithm
      #   Used for HTTPS. The algorithm used by the trust manager factory for SSL connections. Leave blank to use
      #   Jetty's default.
      #SCHEMA_REGISTRY_SSL_TRUSTMANAGER_ALGORITHM: ''

      # ssl.cipher.suites
      #   Used for HTTPS. A list of SSL cipher suites. Comma-separated list. Leave blank to use Jetty's defaults.
      #SCHEMA_REGISTRY_SSL_CIPHER_SUITES: ''

      # ssl.endpoint.identification.algorithm
      #   Used for HTTPS. The endpoint identification algorithm to validate the server hostname using the server
      #   certificate. Leave blank to use Jetty's default.
      #SCHEMA_REGISTRY_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ''

      # kafkastore.sasl.kerberos.kinit.cmd
      #   The Kerberos kinit command path.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_KINIT_CMD: '/usr/bin/kinit'

      # kafkastore.sasl.kerberos.min.time.before.relogin
      #   The login time between refresh attempts.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN: 60000

      # kafkastore.sasl.kerberos.ticket.renew.jitter
      #   The percentage of random jitter added to the renewal time.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_TICKET_RENEW_JITTER: 0.05

      # kafkastore.sasl.kerberos.ticket.renew.window.factor
      #   Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has
      #   been reached, at which time it will try to renew the ticket.
      #SCHEMA_REGISTRY_KAFKASTORE_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR: 0.8

      # kafkastore.group.id
      #   Use this setting to override the group.id for the KafkaStore consumer. This setting can become important when
      #   security is enabled, to ensure stability over Schema Registry consumer's group.id
      #   Without this configuration, group.id will be schema-registry-<host>-<port>.
      #SCHEMA_REGISTRY_KAFKASTORE_GROUP_ID: ''

Kafka Rest Compose

version: '2'
services:
  kafka-rest:
    image: confluentinc/cp-kafka-rest:5.3.1-1
    container_name: kafka-rest
    restart: unless-stopped
    network_mode: bridge
    ports:
      - 4000:4000/tcp
    environment:
      ########## General ##########

      # host.name
      #   The host name used to generate absolute URLs in responses. If empty, the default canonical hostname is used.
      KAFKA_REST_HOST_NAME: 'KAFKA'

      # id
      #   Unique ID for the Confluent REST Proxy server instance. This is used in generating unique IDs for consumers
      #   that do not specify their ID. The ID is empty by default, which makes a single server setup easier to get up
      #   and running, but is not safe for multi-server deployments where automatic consumer IDs are used.
      #KAFKA_REST_ID: ''

      # bootstrap.servers
      #   A list of Kafka brokers to connect to. For example, PLAINTEXT://hostname:9092,SSL://hostname2:9092. This
      #   configuration is particularly important when Kafka security is enabled, because Kafka may expose multiple
      #   endpoints that all will be stored in ZooKeeper, but REST Proxy may need to be configured with just one of
      #   those endpoints. The client will make use of all servers irrespective of which servers are specified here for
      #   bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. Since these
      #   servers are just used for the initial connection to discover the full cluster membership (which may change
      #   dynamically), this list need not contain the full set of servers (you may want more than one, though, in case
      #   a server is down).
      KAFKA_REST_BOOTSTRAP_SERVERS: 'PLAINTEXT://192.168.0.8:9092,PLAINTEXT://192.168.0.8:9093,PLAINTEXT://192.168.0.8:9094'

      # listeners
      #   Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. If a listener uses
      #   HTTPS, the appropriate SSL configuration parameters need to be set as well.
      KAFKA_REST_LISTENERS: 'http://0.0.0.0:4000'

      # schema.registry.url
      #   The base URL for Schema Registry that should be used by the Avro serializer.
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://192.168.0.16:18081'

      # zookeeper.connect
      #   Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port
      #   of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down
      #   you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.
      #
      #   The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its
      #   data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in
      #   its connection string. For example to give a chroot path of /chroot/path you would give the connection string
      #   as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
      KAFKA_REST_ZOOKEEPER_CONNECT: '192.168.0.8:2181,192.168.0.8:2182,192.168.0.8:2183'
* 반드시 외부 포트는 개방하는 것을 잊지말도록 해야한다. 문도 안열고 들어오라고 하는 실수 하지 말것.

HTTP 통신을 이용해 Rest API 이용하기

{내용}
내용에 들어갈 이름들을 쓰고 {}는 지워서 사용할것.

Producer

URL : http://localhost:4000/topics/{토픽이름}

METHOD : POST

HEADER : {Content-Type : application/vnd.kafka.json.v2+json}

BODY(DATA) : {“records”:[{“key”:”keyiskey”,”value”:{“id”:”probiotics”}}]}

반영 결과가 Body Response로 출력된다.

Kafka UI를 통해 결과를 확인했을 때 잘 보내진 것을 볼 수 있다.

Consumer Create

컨슈머를 사용하기 전 먼저 생성을 진행해야한다.

URL : http://localhost:4000/consumers/sales_json_consumer

METHOD : POST

BODY(DATA): {“name”: “컨슈머명”, “format”: “json”, “auto.offset.reset”: “smallest”}

반영 결과가 Body Response에 출력되고 사용할 수 있는 url과 instanse id를 반환해준다.

Leave a Comment