Programming Insight 2015.03.18 18:59

Kafka는 producer와 consumer를 구현할 때 사용할 수 있는 Java 클라이언트를 공식적으로 제공한다. 그 외에도 서드파티에서 C, C++, Ruby, Python, Go를 비롯한 다양한 언어의 클라이언트를 제공한다. 이 글에서는 Java 클라이언트를 사용하는 법을 다룬다.


Producer 구현하기

Hello, World! 예제

package com.epicdevs.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerExample { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("metadata.broker.list", "kafka-test-001.epicdevs.com:9092,kafka-test-002.epicdevs.com:9092,kafka-test-003.epicdevs.com:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(producerConfig); KeyedMessage<String, String> message = new KeyedMessage<String, String>("test", "Hello, World!"); producer.send(message); producer.close(); } }

위의 예제는 test라는 topic에 "Hello, World"라는 메시지를 보내는 코드이다. ProducerConfig를 생성할 때 설정할 수 있는 프로퍼티들은 다음 절에서 다룬다.

Kafka producer API에서는 한꺼번에 여러 메시지를 전송할 수 있는 API 또한 제공한다. 해당 API를 적절히 사용하면 producer의 처리량을 늘릴 수 있다. 아래는 "Hello, World!" 메시지 10개를 한 번에 전송하는 예제이다.

List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>();
for (int i = 0; i < 10; i++) {
    messages.add(new KeyedMessage<String, String>("test", "Hello, World!"));
}
producer.send(messages);


필수 프로퍼티

 프로퍼티 

설명 

 metadata.broker.list

메타데이터를 받아 올 Kafka broker 리스트. 호스트1:포트1,호스트2:포트2,호스트3:포트3의 형태로 명시한다.
예) kafka-test-001.epicdevs.com:9092,kafka-test-002.epicdevs.com:9092,kafka-test-003.epicdevs.com:9092


여기서 명시하는 broker는 메타데이터를 받아오는 데만 사용하고, 실제 메시지를 전송할 때에는 메타데이터를 기반으로 새로운 connection을 맺은 다음 메시지를 전송한다. 따라서 이 리스트에는 전체 broker 중 일부만 명시해도 무관하다.


중요 프로퍼티

 프로퍼티

 기본 값 

설명 

 serializer.class

 kafka.serializer.DefaultEncoder

메시지를 serialize할 때 사용하는 인코더. DefaultEncoder는 byte[]를 받아서 그대로 전달한다.

 key.serializer.class

 serializer.class의 값과 동일

메시지 키를 serialize할 때 사용하는 인코더.

 partitioner.class

 kafka.producer.DefaultPartitioner

메시지를 어떤 partition에 전송할지 결정하는 클래스. DefaultPartitioner는 메시지 키의 해시 코드를 기반으로 메시지를 전송할 partition을 결정한다. 메시지 키를 명시하지 않았거나 null 값을 키로 전달할 경우 사용자가 명시한 partitioner.class를 무시하고 임의의 partition에 메시지를 보내게 된다. 이 때문에 특정 상황에서 전체 partition에 메시지가 제대로 분산되지 않는 현상이 발생할 수 있다. 이에 대한 자세한 사항은 Kafka FAQ의 Why is data not evenly distributed among partitions when a partitioning key is not specified?를 참조하길 바란다.

 request.required.acks

 0

Producer가 전송한 메시지가 몇 개의 replica에 commit되어야 ack처리(성공적으로 전송된 것으로 간주)를 하는지 결정하는 기준.

  • 0: producer는 broker로부터 ack를 기다리지 않고 메시지 전송이 끝나자마자 성공된 것으로 간주한다. 응답 시간은 가장 빠르지만 broker에서 오류가 발생할 경우 메시지가 유실된다.
  • 1: leader를 맡고있는 replica에 메시지가 commit되면 ack처리를 한다.
  • N: N개의 replica에 메시지가 commit되면 ack처리를 한다.
  • -1: 모든 replica에 메시지가 commit되면 ack처리를 한다.

 compression.codec

 none

메시지를 압축할 때 사용할 코덱. none, gzip, snappy 중 하나를 선택할 수 있다. none을 선택하면 메시지를 압축하지 않는다.

 producer.type

 sync

Producer가 메시지를 동기적으로 전송할지 비동기적으로 전송할지에 대한 설정. 동기적으로 전송하려면 sync로 명시하고 비동기적으로 전송하려면 async로 명시한다. 비동기 producer를 사용할 경우 메시지를 일정 시간 큐에 쌓아 두었다가 한 번에 전송하므로 producer의 메시지 처리량을 향상시킬 수 있다. 단, 장애가 발생할 경우 전송하지 않고 쌓아 둔 메시지가 유실될 우려가 있다.

 queue.buffering.max.ms

 5000

비동기 producer를 사용할 경우 몇 ms동안 메시지를 모아둘지 결정하는 값. 비동기 producer queue.buffering.max.ms 값에 도달하거나 batch.num.messages 값에 도달할 경우 쌓아두었던 메시지를 전송한다.

 batch.num.messages

 200

비동기 producer를 사용할 경우 최대 몇 개의 메시지를 모아둘지 결정하는 값. 비동기 producer는 queue.buffering.max.ms 값에 도달하거나 batch.num.messages 값에 도달할 경우 쌓아두었던 메시지를 전송한다.

위에서 언급한 필수 프로퍼티와 중요 프로퍼티 외의 항목들은 Kafka 공식 페이지의 3.3 Producer Configs를 참고하길 바란다.


라운드 로빈 Partitioner 예제

아래는 라운드 로빈 방식으로 동작하는 partitioner를 직접 구현한 예시이다. 코드를 보면 직접 구현한 RoundRobinPartitionerpartitioner.class 프로퍼티로 지정한 것을 확인할 수 있다.

KeyedMessage를 생성할 때 메시지 키를 명시하지 않으면 partitioner.class에 입력된 값을 무시하기 때문에 공백 값을 메시지 키로 전달하여 RoundRobinPartitioner가 사용되도록 하였다.

package com.epicdevs.kafka;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.utils.VerifiableProperties;

public class RoundRobinProducerExample {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("metadata.broker.list", "kafka-test-001.epicdevs.com:9092,kafka-test-002.epicdevs.com:9092,kafka-test-003.epicdevs.com:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", RoundRobinPartitioner.class.getName());

        ProducerConfig producerConfig = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(producerConfig);

        KeyedMessage<String, String> message = new KeyedMessage<String, String>("test", "", "Hello, World!");  
        producer.send(message);
        producer.close();
    }

    public static class RoundRobinPartitioner implements Partitioner {

        private AtomicInteger n = new AtomicInteger(0);

        public RoundRobinPartitioner(VerifiableProperties props) {
        }

        @Override
        public int partition(Object key, int numPartitions) {
            int i = n.getAndIncrement();
            if (i == Integer.MAX_VALUE) {
                n.set(0);
                return 0;
            }
            return i % numPartitions;
        }
    }
}


Consumer 구현하기

Kafka에서는 consumer 구현에 사용할 수 있는 두 종류의 API를 제공한다. 세부적인 것들은 모두 추상화되어 있어 몇 번의 간단한 함수 호출로 consumer를 구현할 수 있는 High Level Consumer API와 offset과 같은 세부적인 부분까지 다룰 수 있지만 이 때문에 구현하기가 상당히 까다로운 Simple Consumer API가 제공된다(이름은 simple이지만 전혀 simple하지 않다).

이 글에서는 high level consumer API를 사용한 예시만을 다룬다. Simple level consumer API의 구현 예시는 Kafka Wiki의 0.8.0 SimpleConsumer Example을 참조하길 바란다.


High Level Consumer API 예제

package com.epicdevs.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerExample {

    private static final String TOPIC = "test";
    private static final int NUM_THREADS = 20;

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("group.id", "test-group");
        props.put("zookeeper.connect", "kafka-test-001.epicdevs.com:2181,kafka-test-002.epicdevs.com:2181,kafka-test-003.epicdevs.com:2181");
        props.put("auto.commit.interval.ms", "1000");

        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, NUM_THREADS);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);

        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);

        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
                        System.out.println(new String(messageAndMetadata.message()));
                    }
                }
            });
        }

        Thread.sleep(60000);

        consumer.shutdown();
        executor.shutdown();
    }
}

위의 코드를 실행하면 consumer가 byte[]로된 메시지를 받아서 String으로 변환한 뒤 standard output에 출력한다. ConsumerConfig를 생성할 때 설정할 수 있는 프로퍼티들은 다음 절에서 다룬다.

코드 상에는 consumer가 소비할 메시지의 offset과 관련된 내용은 전혀 존재하지 않는다. Offset 값은 Zookeeper에서 별도로 관리하며, high level consumer는 Zookeeper로부터 자신이 속한 consumer group이 몇 번째 메시지 offset을 소비할 차례인지 전달받은 뒤 해당 offset의 메시지부터 소비하기 시작한다.


필수 프로퍼티

 프로퍼티 

설명 

 group.id

Consumer가 속한 consumer group의 ID. Zookeeper에서는 각 consumer group의 메시지 offset을 관리하는데, 이 때 이 ID가 키로써 사용된다. 따라서 consumer group ID가 같으면 모두 같은 consumer group에 속한 것으로 간주되며 메시지 offset 값 또한 공유된다.

 zookeeper.connect

Zookeeper 인스턴스 리스트. 호스트1:포트1,호스트2:포트2,호스트3:포트3의 형태로 명시한다.
예) kafka-test-001.epicdevs.com:2181,kafka-test-002.epicdevs.com:2181,kafka-test-003.epicdevs.com:2181


중요 프로퍼티

 프로퍼티

 기본 값 

설명 

 auto.commit.enable true

Consumer가 메시지를 전달받은 뒤 자동으로 offset 값을 commit할지 결정하는 플래그. 메시지가 성공적으로 처리되었을 때만 offset이 commit되도록 하려면 이 값을 false로 설정해야 한다. 이 값이 true일 경우 auto.commit.interval.ms 값마다 주기적으로 offset을 commit하며, false일 경우 ConsumerConnectorcommitOffsets 메소드를 직접 호출해야 offset이 commit된다

 auto.commit.interval.ms 60000

auto.commit.enabletrue일 때 offset을 자동으로 commit하는 주기. 이 값을 길게 잡으면 메시지 처리 중에 장애가 발생할 경우 실제로 처리된 메시지 offset과 commit된 offset 간의 격차가 커져서 fail over 후 중복으로 처리되는 메시지의 수가 많아질 가능성이 높으며, 짧게 잡을 경우 잦은 Zookeeper 업데이트로 인한 오버헤드가 발생할 수 있다.

 auto.offset.reset largest

Consumer가 속한 consumer group의 offset 값이 존재하지 않거나 범위를 벗어나는 값을 전달받았을 경우 어떻게 동작할지를 정하는 값.

  • smallest: 가장 작은 offset의 메시지부터 소비한다.
  • largest: 가장 큰 offset의 메시지 이후부터 소비한다. (즉, 새롭게 전송되는 메시지부터 소비한다.)

위에서 언급한 필수 프로퍼티와 중요 프로퍼티 외의 항목들은 Kafka 공식 페이지의 3.2 Consumer Configs를 참고하길 바란다.


pom.xml

아래는 producer와 consumer 예제를 Maven으로 빌드할 때 사용한 pom.xml이다.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.epicdevs</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.5.1</version>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>


참고자료


신고
Trackback 0 Comment 6
  1. 2015.08.26 09:07

    비밀댓글입니다

  2. 핫독 2015.12.14 10:39 신고

    0.9.0.0 클라이언트 환경에서의 Consumer는 가능한게 있을까요?

  3. tykji 2016.03.23 20:18 신고

    안녕하세요. 질문이 있어 댓글 남깁니다.

    props.put()라는 함수가 프로퍼티를 설정하는 라인인가요? 표에 적혀있는 프로퍼티를 사용하려고 한다면 props.put 안에 적으면 되는 것 인가요?

    그리고 consumer에서 thread sleep을 거는 이유가 있나요?

    감사합니다.

    • epicdev 2016.03.24 11:27 신고

      네. 표에 적혀있는 프로퍼티를 props.put 안에 적으시면 됩니다. Thread sleep을 걸지 않으면 메인스레드에서 consumer.shutdown을 호출하기 때문에 Consumer가 생성되자마자 프로그램이 종료됩니다. 이를 방지하기 위해서 임의로 Consumer가 60초 동안 메시지를 소비하게 한 다음 종료시키기 위해 넣어둔 코드입니다.

  4. tykji 2016.03.28 20:52 신고

    질문이 또 생겨 글 남깁니다.

    코드를 짰는데 import error가 뜹니다.

    class path를 어디로 설정해야 하나요? libs의 jar파일 아닌가요?

    감사합니다.

    • epicdev 2016.03.29 17:42 신고

      죄송합니다만 기초적인 개발환경까지 제가 다 답변해드릴순 없습니다. 구글링 해보시고 스스로 해결해보시길 바랍니다.