Skip to main content

Create Producer/Consumer Application for Kafka

· 6 min read

ここではSetup Apache Kafka - Confluent ver. (JPver.)で作成したKafka clusterに対しPub/Subを行うProducer/Consumer applicationを作成する。簡単のためまずはLocal上でAppを開発し、その後Kafka Cluster構築時に作成したAMIをベースにEC2を起動し、EC2上で対象のAppを稼働させる。具体的には以下の内容を実施する。

  1. Initial setup
  2. Running Producer app
  3. Running Consumer app

なお使用したMaven ver.は以下 (Mavenのinstallは省略する。Maven – Installing Apache Mavenを参照)。

# Mac$ mvn -versionApache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)Maven home: /opt/apache-maven-3.6.3Java version: 1.8.0_231, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home/jreDefault locale: en_IE, platform encoding: UTF-8OS name: "mac os x", version: "10.14.6", arch: "x86_64", family: "mac"
# EC2$ mvm -version

1. Initial setup#

今回はProducer/Consumerとも同じApp directoryを使用するため、共通のpackage設定を行っておく。

$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes \-DarchetypeArtifactId=maven-archetype-simple -DgroupId=com.tomtongue.sample.kafkaapp \-DartifactId=kafkaSampleApp \-DinteractiveMode=false[INFO] Scanning for projects...Downloading from central: https://repo.maven.apache.org/maven2/org/apache/maven/plugins/maven-clean-plugin/2.5/maven-clean-plugin-2.5.pom...[INFO][INFO] ------------------< org.apache.maven:standalone-pom >-------------------[INFO] Building Maven Stub Project (No POM) 1[INFO] --------------------------------[ pom ]---------------------------------[INFO][INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>[INFO][INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<[INFO][INFO][INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---...[INFO] ----------------------------------------------------------------------------[INFO] Using following parameters for creating project from Archetype: maven-archetype-simple:1.4[INFO] ----------------------------------------------------------------------------[INFO] Parameter: groupId, Value: com.tomtongue.sample.kafkaapp[INFO] Parameter: artifactId, Value: kafkaSampleApp[INFO] Parameter: version, Value: 1.0-SNAPSHOT[INFO] Parameter: package, Value: com.tomtongue.sample.kafkaapp[INFO] Parameter: packageInPathFormat, Value: com/tomtongue/sample/kafkaapp[INFO] Parameter: version, Value: 1.0-SNAPSHOT[INFO] Parameter: package, Value: com.tomtongue.sample.kafkaapp[INFO] Parameter: groupId, Value: com.tomtongue.sample.kafkaapp[INFO] Parameter: artifactId, Value: kafkaSampleApp[INFO] Project created from Archetype in dir: /<YOUR_DIR>[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time:  02:43 min[INFO] Finished at: 2020-02-02T17:08:09Z[INFO] ------------------------------------------------------------------------
$ tree kafkaSampleAppkafkaSampleApp├── pom.xml└── src    ├── main    │   └── java    │       └── com    │           └── tomtongue    │               └── sample    │                   └── kafkaapp    │                       └── App.java    ├── site    │   └── site.xml    └── test        └── java            └── com                └── tomtongue                    └── sample                        └── kafkaapp                            └── AppTest.java

Using InterlliJ IDEA#

InterlliJでもproject作成できる。ここではInterlliJ IDEA 2019.3.2 (Community Edition)を使用している。

以下のようなproject dirが作成される。

kafkaSampleAppIntellij├── kafkaSampleAppIntellij.iml├── pom.xml└── src    ├── main    │   ├── java    │   └── resources    └── test        └── java
6 directories, 2 files

今回はLocal上ではmvn commandで作成したpackageをimportしてProducer/Consumer appを作成する。

Kafka clusterをSetup Apache Kafka - Confluent ver. (JPver.)においてConfluent ver.で構築したので、今回もConfluet ver.のKafka Clientを使用する。Kafka Clientとしては、Kafka Clients — Confluent Platformに記載されているものがあるが、今回はJava clientを使用する。Kafka Java Client — Confluent Platformを参考に先ほど作成したprojectにおけるpom.xmlを以下のように記載する。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>...  <repositories>    <repository>      <id>confluent</id>      <url>https://packages.confluent.io/maven/</url>    </repository>  </repositories>...  <dependencies>    ...    <dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka_2.12</artifactId>      <version>5.4.0-ccs</version>    </dependency>  </dependencies>...<!-- (Option) For Fat Jar -->  <build>    <plugins>      <plugin>        <groupId>org.apache.maven.plugins</groupId>         <artifactId>maven-assembly-plugin</artifactId> <!-- Ref: https://maven.apache.org/plugins/maven-jar-plugin/ -->        <version>3.2.0</version>        <configuration>          <descriptorRefs>            <descriptorRef>jar-with-dependencies</descriptorRef>          </descriptorRefs>        </configuration>        <executions>          <execution>            <phase>package</phase>            <goals>              <goal>single</goal>            </goals>          </execution>        </executions>      </plugin>    </plugins>...  </build>

正常に実行できるか念の為この時点で試しておく。App.javaを削除 (または変更)し、./src/main/java配下にProducerSample.javaというソースを作成する。

package com.tomtongue.sample.kafkaapp;
import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class ProducerSample {    public static void main(String[] args) {        System.out.println("Hello Kafka");    }}

以下のようにHello Kafkaが出力されるか確認する。

2. Running Producer app#

2-1. Development App on Local#

ここではProducerのcodeを作成する。以下では単純に100回ほどKafkaに対しPublishするprogramである。先ほど作成したproduverSample.javaを以下のように記載する。<HOSTNAME_NUM>にはserver.propertiesで指定したhostnameを指定する必要があるが、今回はprivateDNSを指定したため、実際にpublishする場合にはKafka cluster側のhostnameを書き換えて起動し直す必要がある (多分この変更が一番楽なはず...?)。

package com.tomtongue.sample.kafkaapp;
import org.apache.kafka.clients.producer.*;import java.util.Properties;
public class ProducerSample {    public static void main(String[] args) {        Properties conf = new Properties();        conf.setProperty("bootstrap.servers", "<HOSTNAME_1>:9092, <HOSTNAME_2>:9092, <HOSTNAME_3>:9092"); // !Specify!        conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // For using Integer as a key        conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // For using String as a value
        String topicName = "test-topic";        Producer<Integer, String> producer = new KafkaProducer<>(conf);
        System.out.println("Start to send messages...\n");
        for(int i = 0; i < 100; i++) {            String value = String.format("Message from Producer: #%s", i);            producerSend(producer, topicName, i, value);        }
        producer.close(); // Finish the producer    }
    public static void producerSend(Producer<Integer, String> producer, String topic, int key, String value) {        // Create payload        ProducerRecord<Integer, String> payload = new ProducerRecord<>(topic, key, value);
        // Send message        producer.send(payload, new Callback() {            @Override            public void onCompletion(RecordMetadata recordMetadata, Exception e) {                if (recordMetadata != null) { // If success to send                    String infoOutput = String.format("[INFO] Success: Partition: %s, Offset: %s", recordMetadata.partition(), recordMetadata.offset());                    System.out.println(infoOutput);                } else { // If fail to send                    String errorOutput = String.format("[ERROR] Failed to send message: %s", e.getMessage());                    System.out.println(errorOutput);
                }            }        });    }}

なお上記コードにおいて、K/Vを送信するにあたり、Kafka製のSerializerが必要となることに注意する (Consumer側はDeserが必要になることも注意する)。

ここでは一応Buildに成功することと、接続できている状況を確認する。正しく繋げていないと以下のようなエラーが発生する。

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)    at com.tomtongue.sample.kafkaapp.ProducerSample.main(ProducerSample.java:14)Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)    ... 2 more
Process finished with exit code 1

2-2. Runnning App on EC2#

先ほどのCodeをBuildし、JARファイルを作成する。Package your application in a JAR - Help | IntelliJ IDEAに具体的な方法が載っているので、詳細手順はこちらを参照 (なおMaven経由でBuildする場合はProject root dirにて、mvn packag (-DskipTests)を実行すれば良い)。

Buildしたのちに、作成されたJARファイルをAMIから作成したEC2インスタンスに移動させ、対象のJARファイルをこれから実行しProducerのテストを行う。事前に以下のようにConsumerを起動しておく。

[ec2-user@ip-172-31-11-62 ~]$ kafka-console-consumer --bootstrap-server ip-172-31-14-76.ec2.internal:9092, ip-172-31-11-62.ec2.internal:9092, ip-172-31-6-79.ec2.internal:9092 --topic test-topic

Consumer側の準備が整ったらProducerからPublishしてみる。

[ec2-user@ip-172-31-13-42 ~]$ java -cp ~/kafkaSampleApp.jar com.tomtongue.sample.kafkaapp.ProducerSampleSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Start to send messages...
[INFO] Success: Partition: 1, Offset: 1[INFO] Success: Partition: 1, Offset: 2[INFO] Success: Partition: 1, Offset: 3[INFO] Success: Partition: 1, Offset: 4...

Consumer側で以下のようにmessageを取得できることも確認する。

[ec2-user@ip-172-31-11-62 ~]$ kafka-console-consumer --bootstrap-server ip-172-31-14-76.ec2.internal:9092, ip-172-31-11-62.ec2.internal:9092, ip-172-31-6-79.ec2.internal:9092 --topic test-topicMessage sent from Producer: #0Message sent from Producer: #3Message sent from Producer: #4Message sent from Producer: #9Message sent from Producer: #10...

3. Running Consumer app#

Producer側で問題なく実行できたあとは、同様に以下のcodeで、Consumer Appも作成、BuildしてJARファイルを作成し、EC2上で実行する。

package com.tomtongue.sample.kafkaapp;
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class ConsumerSample {    public static void main(String[] args) {        Properties conf = new Properties();        conf.setProperty("bootstrap.servers", "<HOSTNAME_1>:9092, <HOSTNAME_2>:9092, <HOSTNAME_3>:9092"); // !Specify!        conf.setProperty("group.id", "ConsumerSampleGroup");        conf.setProperty("enable.auto.commit", "false");        conf.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); // For using Integer as a key        conf.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // For using String as a value
        String topicName = "test-topic";        Consumer<Integer, String> consumer = new KafkaConsumer<>(conf);
        // Set topic array        List<String> topics = new ArrayList<>(1);        topics.add(topicName);        consumer.subscribe(topics);

        for(int i = 0; i < 200; i++) {            ConsumerRecords<Integer, String> records = consumer.poll(1); // Deprecated            for(ConsumerRecord<Integer, String> record: records) {                printlnPayload(record);                offsetCommit(record, consumer);            }            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }
        }
    }
    // Output subscribed payload    public static void printlnPayload(ConsumerRecord<Integer, String> record) {        String payload = String.format("Received (%s, %s)", record.key(), record.value());        System.out.println(payload);    }
    // Commit offset    public static void offsetCommit(ConsumerRecord<Integer, String> record, Consumer<Integer, String> consumer) {        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);        Map<TopicPartition, OffsetAndMetadata> commit = Collections.singletonMap(topicPartition, offsetAndMetadata);        consumer.commitSync(commit);    }}

Producer同様にJARファイルを作成したら、以下のようにEC2上で実行する。また今回は簡単のため同じEC2 instanceから実行する。

Producer

[ec2-user@ip-172-31-13-42 ~]$ java -cp ~/kafkaSampleApp.jar com.tomtongue.sample.kafkaapp.ProducerSampleSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Start to send messages...
[INFO] Success: Partition: 1, Offset: 63[INFO] Success: Partition: 1, Offset: 64[INFO] Success: Partition: 1, Offset: 65[INFO] Success: Partition: 1, Offset: 66[INFO] Success: Partition: 1, Offset: 67[INFO] Success: Partition: 1, Offset: 68...

Consumer

[ec2-user@ip-172-31-13-42 ~]$ java -cp ~/kafkaSampleApp.jar com.tomtongue.sample.kafkaapp.ConsumerSampleSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received (2, Message sent from Producer: #2)Received (5, Message sent from Producer: #5)Received (6, Message sent from Producer: #6)Received (12, Message sent from Producer: #12)Received (16, Message sent from Producer: #16)Received (18, Message sent from Producer: #18)...