Schema Registryについて

昨日の続き。

bufferings.hatenablog.com

Avroを使ってメッセージをシリアライズ・デシリアライズするのに、スキーマを保存しておいてくれる場所があると便利だよなーってことで、Schema Registryが(僕の中に)登場。

Schema Registry?

Schema Registry — Confluent Platform 3.2.2 documentation

Avroのスキーマを保存したり取得したりするためのRESTfulなインターフェイスを持ってる。スキーマのバージョンを保存してるので、スキーマエボリューション的な用途で使えるっぽい。僕は、まだスキーマエボリューションについては勉強してないので、「ふーんバージョンがあるんだね」ってくらい。

Confluent Platform?

Kafkaを作ったチームがConfluentって会社を立ち上げて、Kafkaを真ん中に置いたConfluent Platformっていうプラットフォームを提供してるみたい。Kafkaを中心として便利なツールを色々提供してくれてて、Open Source版とEnterprise版がある。

f:id:bufferings:20170626175453p:plain

Confluent: Apache Kafka & Streaming Platform for the Enterprise より

んで、Schema RegistryはそのOpen Source側にも含まれてる。

f:id:bufferings:20170626175925p:plain

Download Confluent Open Source & Confluent Enterprise より

ソースはこちら。Apache License 2.0ね。

GitHub - confluentinc/schema-registry: Schema registry for Kafka

Schema Registryを動かしてみる

ということなので、Confluent Platformの中に含まれてそうなんだけど、僕はもうKafkaは別で動かしてたから、今回はConfluentが用意してくれてるDocker Imageを使うことにした。

この辺読みながら → Quickstart — Confluent Platform 3.2.2 documentation

こいつを使う → https://hub.docker.com/r/confluentinc/cp-schema-registry/

僕は発表のデモ用にDocker Composeを使ってローカルで色々立ち上げたんだけど(あ、うまく動いてなかったけど)その中からSchema Registryの部分だけを抜き出すとこんな感じ。

  schema-registry:
    image: "confluentinc/cp-schema-registry:3.2.1"
    network_mode: "host"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "localhost:2181"
      SCHEMA_REGISTRY_HOST_NAME: "localhost"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: "*"
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: "GET,POST,PUT,OPTIONS"
    depends_on:
      - "kafka"

内容的には

  • ホストネットワークモードで動かして、ホスト側のlocalhost:8081で動く
  • SCHEMA_REGISTRY_ってプレフィックスの環境変数で、Schema Registryに渡す設定を指定できる
  • KAFKASTORE_CONNECTION_URLで、Zookeeperの場所を指定
  • その環境変数ACCESS_CONTROL_ALLOW_ORIGINとMETHODSのところはなくても大丈夫。Schema ResgistryをWebUIで見られるようにしてくれるツールを使うために設定してる

ちな、docker-compose.yml全体はこちら。

https://github.com/bufferings/kanjava-party-2017/blob/master/environment/infra/docker-compose.yml

動作確認

# "Kafka-key"ってSubjectで登録
~ $  curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
>     --data '{"schema": "{\"type\": \"string\"}"}' \
>     http://localhost:8081/subjects/Kafka-key/versions
{"id":6}
~ $

# 登録されてる全部のSubjectを取得(デモで使ったので他のも登録されてる)
~ $ curl -X GET http://localhost:8081/subjects
["InventoryRecoveryRequestedEvent","Kafka-key","Tweet","InventoryReservationRequestedEvent","TweetWordTop5","DifferentSchema"]

動いてる!

Schema Registry UIを動かしてみる

Schema RegistryをRESTで使うんでいいんだけど、WebUIあると便利だなーって思ってうろうろしてたらLandoopってところがSchema Registry UIを出してた。Kafka周りの色んなツールを提供してるみたいね。

Landoop | Kafka Web Tools

Schema Registry UIのソースはこち

GitHub - Landoop/schema-registry-ui: Web tool for Avro Schema Registry |

ライセンスはBSLってので、Kafkaのサーバーインスタンスが4台までは無料で使えるみたい。

http://www.landoop.com/bsl/

例によってDockerがあるのでこれで試した。これを使うのにさっきのACCESS_CONTROL_ALLOW_ORIGINとMETHODSが必要だから忘れずにね。

https://hub.docker.com/r/landoop/schema-registry-ui/

  schema-registry-ui:
    image: "landoop/schema-registry-ui:0.9.1"
    environment:
      SCHEMAREGISTRY_URL: "http://localhost:8081"
    ports:
      - "9001:8000"
    depends_on:
      - "schema-registry"

8000番は別のアプリで使いたかったので、こっちはブリッジネットワークモードでで9001をマッピングしといた。

こんな感じでSchema Registryの内容が見られる。

f:id:bufferings:20170626182750p:plain

(∩´∀`)∩ワーイ

スキーマデータはKafkaに保存されてるっぽい

もういっこLandoopのツールで、Topicの中身を見れるツールを使って見てみたら、デフォルトで_schemaって名前のTopicにスキーマ情報が入ってるっぽいね。

f:id:bufferings:20170626184234p:plain

ここに説明が書いてあった。

Design Overview — Confluent Platform 3.2.2 documentation

まとめ

Schema RegistryをDockerで起動して叩いてみて、その後UIを立ち上げて見てみた。って話でした。

今日はここまで。次は、Javaで書いたKafkaクライアントからどうやって使うかってところかな。

Avroについて

昨日、関ジャバで喋ったのしかったよー(∩´∀`)∩

bufferings.hatenablog.com

その発表のために色々と新しいことを学んだので、ちょこちょこ気が向いたときにアウトプットしておこうと思う。数カ月後に全て忘れているであろう自分のために。

てことで、今日はAvro。余談ですが、勉強会に行くと勉強になるけど、勉強会で登壇するともっと勉強になるからおすすめです!

Avroとの出会い

同じ学年にAvroさんという人がいるのは知ってたけど、違うクラスだから喋ることもないかなーって思ってたくらいの距離感(なんの話?)

名前を初めて聞いたのは、去年のSpring Oneに行った時。Spring Cloud StreamでData Microserviceだー!って言葉が押し出されてて。Avroの名前がでてきてたのはSchema Evolutionという文脈の中だったなという印象。「メッセージング系の処理やってると悩むのは、メッセージのスキーマを変更したいときよね。この問題をどうやって扱おう?そこでAvroを使って、スキーマの進化について考えてみよう」って感じだったかな。

でも、僕はSpring Cloud Streamは別に追いかけなくていいかなーと思ってて(好き嫌いとかじゃなくて、単純に優先順位の問題ね)。なので、Avroのことも「ふーん、そんなのあるんだー」くらい。

スキーマがあると良さそう?

登壇駆動ってことで、タイトルを先に決めてから「よし!Kafkaを勉強するぞー!」って触りはじめて。

といっても、実は去年の秋のJJUG CCCで発表したときも少しKafkaを触ってたんだけど、その時はドメインイベントのオブジェクトをJSONシリアライズ・デシリアライズしてた。

今回も同じようにしたらいいかなーと思ってたんけど、去年から一歩進めて複数のサービスに分割してイベントを扱おうと思った時に「はて、これは、決めごとがあるほうが良さそう?」って思った。

同じアプリの中でメッセージのやり取りをするんだったら、そのクラスをやり取りすればいいからJSONでもいいかなーって思ってたんだけど。サービスごとに複数のチームに分かれて作業するとかを考えるとインターフェイスみたいなものを決める方が良さそうだなーって思った。「ここ、数字だと思ってたら文字列も入ってくるのか!」とか「nullの場合あるの?初期値とかは?」とかならないように。しかもプログラミング言語に依存しない形で。

だから、スキーマがある方が良さそう。ということで、Avroさんに会いに行くことにした。

Avroとは?

公式サイト: Welcome to Apache Avro!

今ぐぐってみたんだけど、日本語だとOracleのサイトの説明が分かりやすい 第7章 Avroスキーマ Oracleで使用するケースの話みたいだけどね。

スキーマ定義に従って、データをコンパクトなバイナリ形式でシリアライズ/デシリアライズする。という感じなのかな。

Javaのサンプル

Apache Avro™ 1.8.2 Getting Started (Java)

スキーマ定義ファイルを元にJavaのクラスの生成もできるみたいだけど、好みの問題で、僕はそれは使わずに、GenericRecordを使うことにした。Mapみたいなものね。

だから、上記のドキュメントの「Serializing and deserializing without code generation」の方を見てた。

シリアライズ

例えば OrderItemCreatedEvent.avsc って名前のファイルを用意して

{
  "name": "OrderItemCreatedEvent",
  "type": "record",
  "fields": [
    {"name": "orderGroupId", "type": "string"},
    {"name": "orderItemId", "type": "string"},
    {"name": "orderGuestId", "type": "int"},
    {"name": "orderGuestName", "type": "string"},
    {"name": "productId", "type": "string"},
    {"name": "quantity", "type": "int"},
    {"name": "orderedOn",  "type": "string"}
  ]
}

こんなコードを書くと

    Schema schema = new Schema.Parser().parse(new File("OrderItemCreatedEvent.avsc"));

    GenericRecord record = new GenericData.Record(schema);
    record.put("orderGroupId", "b78cbc2c-ee1b-4c03-9b8c-70abf121b0d1");
    record.put("orderItemId", "7e570f08-65bd-4b1f-be75-3d977d818023");
    record.put("orderGuestId", 123456);
    record.put("orderGuestName", "bufferings");
    record.put("productId", "1d8bcd93-2b9c-4225-a525-8f750d4c444c");
    record.put("quantity", 4);
    record.put("orderedOn", "2017-06-24T06:57:14.090");

    File outputFile = new File("data.avro");
    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
      dataFileWriter.create(schema, outputFile);
      dataFileWriter.append(record);
    }

こうなった

f:id:bufferings:20170625133757p:plain

おぉ。スキーマも一緒に出力されるのか。されないと思ってた。そっか、これでもJSONと比べていいのは、複数レコードを保存するときにスキーマを最初に一回だけ書くところなのかな。

シリアライズ

公式サイトに書いてある通りにこんな感じ。インスタンスを再利用してるの面白いね。大量レコード対応って感じすね。僕は1レコードのシリアライズにしか今は興味がないけど。

    Schema schema = new Schema.Parser().parse(new File("OrderItemCreatedEvent.avsc"));
    File dataFile = new File("data.avro");

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
    GenericRecord record = null;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(dataFile, datumReader)) {
      while (dataFileReader.hasNext()) {
        record = dataFileReader.next(record);
        System.out.println(record);
      }
    }

出力はこんな感じになる。

{"orderGroupId": "b78cbc2c-ee1b-4c03-9b8c-70abf121b0d1", "orderItemId": "7e570f08-65bd-4b1f-be75-3d977d818023", "orderGuestId": 123456, "orderGuestName": "bufferings", "productId": "1d8bcd93-2b9c-4225-a525-8f750d4c444c", "quantity": 4, "orderedOn": "2017-06-24T06:57:14.090"}

んでも、スキーマ情報がデータファイルに入ってるのに、デシリアライズのときにもスキーマファイルが必要なのは不思議だ。合ってるかどうかチェックでもするのかな?

ま、気にしない。

Kafkaの場合

んで、Kafkaについて考えてみると、↑の方式でみたいに毎回スキーマ情報を書き込むのってなんかいけてない感じがする。それより、どっかにスキーマ情報を登録しといて、それを使ったほうが良くね?って思った。ら、ちゃんとあった。それがSchema Registry。

Schema Registry — Confluent Platform 3.2.2 documentation

今日はこんなとこかな。次はSchema RegistryとKafkaについて書こうかな。

広告を非表示にする

#KanJavaParty で喋ったよ(ヽ´ω`)

kanjava.connpass.com

f:id:bufferings:20170624165221p:plain

直前までコード触ってた。ローカルでデモを見てもらうけど、実際のアプリも触ってもらえたらいいなと思ってAzureでも動かした。

んで、ほんとにギリギリでAzure上で動き始めて、そしてなぜかローカルが動かなくなった。Azureでも動いてなかったら何も見せられかったから危なかった。なんで動いてないのか気になるけど、今はもう気力がない。


資料

これで喋ったよ。僕の中では関ジャバは、ゆるくみんなで話をする場所だから。こういうのがいいかなーと思って。

デモアプリ

Azureで動かしてる。今日の夜には落とそうかな。


ソースコード

あとは、しゃべりながら見せたやつの、紹介ね。

Event Sourcing

docs.microsoft.com

いつもの人

martinfowler.com

Kafka Streams

Apache Kafka

Confluentのサイトがサンプルとかいっぱいあっていい感じ。

Kafka Streams — Confluent Platform 3.2.2 documentation

Kafka Streams & Event Sourcing

これくらいしか見つからないかな。

Event sourcing, CQRS, stream processing and Apache Kafka: What’s the connection? - Confluent

Azure Container Service: DC/OS

Running DC/OS on Azure

無事(?)終わった!

みんながあたたかく見守ってくれたおかげで無事終わりましたー。これをきっかけに、色々と情報交換ができたら嬉しいな。Event Sourcingとか。

解放感

Using multiple Avro schema in one Kafka Topic

I tried Kafka with Avro on Spring Boot in the previous blog post(Japanese).

bufferings.hatenablog.com

Then this time, I tried to use multiple Avro schema in one topic.

Schema

"tweet.avsc" is the schema file which I used in the previous post.

{
  "name": "Tweet",
  "type": "record",
  "fields": [
    {"name": "id",  "type": "long"},
    {"name": "text", "type": "string"}
  ]
}

Then, I added another schema file "different-schema.avsc"

{
  "name": "DifferentSchema",
  "type": "record",
  "fields": [
    {"name": "id2",  "type": "long"},
    {"name": "createdAt", "type": "string"},
    {"name": "text2", "type": "string"}
  ]
}

(Please note that I don't think about Schema Evolution yet, because it's still early for me to think about it.)

Modify Serializer

I found that KafkaAvroSerializer uses the topic name for the schema registration key. Because I want to try multiple schema in one topic, I created KafkaAvroSerializerWithSchemaName to use the schema fullname for registering schema.

https://github.com/bufferings/kafka-streams-demo/blob/v0.3/src/main/java/com/example/demo/KafkaAvroSerializerWithSchemaName.java#L36

  @Override
  public byte[] serialize(String topic, Object record) {
    return serializeImpl(getSchema(record).getFullName(), record);
  }

In addition, I changed Serde for KStream to use KafkaAvroSerializerWithSchemaName for serialization.

https://github.com/bufferings/kafka-streams-demo/blob/v0.3/src/main/java/com/example/demo/GenericAvroSerializerWithSchemaName.java

Finally, I have multiple schemas in one topic :)

f:id:bufferings:20170615195802p:plain

Addition

This time I tried "schema-registry-ui" and "kafka-topics-ui" which are pretty nice web UI provided by Landoop( http://www.landoop.com/kafka/kafka-tools/ ) under BSL license( http://www.landoop.com/bsl/ ).

Schema Registry UI

To enable CORS accessing from other docker instance, I added "access.control.allow.methods" and "access.control.allow.origin" to the schema registry.

docker run -d --net=host --name=schema-registry \
    -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:2181 \
    -e SCHEMA_REGISTRY_HOST_NAME=localhost \
    -e SCHEMA_REGISTRY_DEBUG=true \
    -e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,OPTIONS \
    -e SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN=* \
    confluentinc/cp-schema-registry:3.2.1

Then run schema-regisry-ui.

docker run -d --rm -p 7000:8000 \
    -e "SCHEMAREGISTRY_URL=http://172.17.0.1:8081" \
    landoop/schema-registry-ui

Now I can see schemas like this:

f:id:bufferings:20170615194257p:plain

Kafka Topic UI

To use Kafka Topic UI, I need to run Kafka Rest Proxy.

docker run -d \
  --net=host \
  --name=kafka-rest \
  -e KAFKA_REST_ZOOKEEPER_CONNECT=localhost:2181 \
  -e KAFKA_REST_SCHEMA_REGISTRY_URL=http://localhost:8081 \
  -e KAFKA_REST_HOST_NAME=localhost \
  -e KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN=* \
  -e KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS=GET,POST,PUT,DELETE,OPTIONS \
  confluentinc/cp-kafka-rest:3.2.1

Then run kafka-topics-ui.

docker run --rm -it -p 7001:8000 \
  -e "KAFKA_REST_PROXY_URL=http://172.17.0.1:8082" \
  landoop/kafka-topics-ui

f:id:bufferings:20170615194726p:plain

It's fun!

KafkaとAvroとSchema Registryの自分メモ

これの続き
bufferings.hatenablog.com

Avro

Avroはデータのシリアライズをする何か。去年のSpringOneでちらほら名前を聞いた。Schema Evolution系のセッションだったかな。

Welcome to Apache Avro!

ということで、前回の記事では文字列だけを渡してたけど、今回はTweetのidとtextを渡してみた。

スキーマファイルはこんなの。

{
  "name": "Tweet",
  "type": "record",
  "fields": [
    {"name": "id",  "type": "long"},
    {"name": "text", "type": "string"}
  ]
}

Schema Registry

そのSchemaをSchema Registryを使ってやり取りするような感じ。

GitHub - confluentinc/schema-registry: Schema registry for Kafka

最初はSchema Registryを使わずにSchemaファイルを使おうと思ってたんだけど、それだと受け取る側がどんなスキーマなのかを事前に分かってないといけないので、違うなって思って。結局Registryを使ってみることにした。

できあがったソースはここ

https://github.com/bufferings/kafka-streams-demo/tree/v0.2

Kafkaだけじゃなくて、Schema Registry(port 8081の想定にしてます)も立ち上げてからアプリを起動すると動く。

Dockerイメージあったので使った。

Configuration — Confluent Platform 3.2.1 documentation

docker run -d   --net=host \
   --name=schema-registry \
   -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:2181 \
   -e SCHEMA_REGISTRY_HOST_NAME=localhost \
   -e SCHEMA_REGISTRY_DEBUG=true \
   confluentinc/cp-schema-registry:3.2.1

でもDocker for Macは非推奨みたいね。(僕はUbuntuで動かしてる)

Introduction — Confluent Platform 3.2.1 documentation

AvroについてもSchema Registryについても

まだ全然理解できてないけど、とりあえず、なんか動いた。

次はベスト10的なAggregateを作ってみたいな。

(つд⊂)オヤスミナサイ

Kafka StreamsをSpring Bootで(その2

これの続き。ちょーっとずつ進んでるのかなぁ。

bufferings.hatenablog.com

こうなった

f:id:bufferings:20170610205644p:plain

日本語のsampleストリーム

今度は日本語のsampleストリームを取ってくるようにしてみた。

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("TweetTexts", status.getText());
      }
    });
    twitterStream.sample("ja");
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/TwitterToKafka.java

ので、kuromojiで分解。

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream("TweetTexts");
    // @formatter:off
    stream
      .flatMapValues(value -> tokenize(value))
      .groupBy((key, word) -> word)
      .count("TweetTextWords");
    // @formatter:on
    return stream;
  }

  private static final Tokenizer tokenizer = new Tokenizer();

  private List<String> tokenize(String value) {
    // @formatter:off
    return tokenizer.tokenize(value).stream()
      // 辞書にあって名詞で2文字以上のにしてみた
      .filter(token -> token.isKnown() && token.getPartOfSpeechLevel1().equals("名詞") && token.getBaseForm().length() >= 2)
      .map(token -> token.getBaseForm())
      .collect(Collectors.toList());
    // @formatter:on
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/KStreamConfig.java

全部返しても見づらいから

強引に上位100件だけ返すようにしてみた。本当に作るときはこんなことしないけどね。

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("TweetTextWords",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    // 強引に上位100件をとってみた
    Collections.sort(results, (r1, r2) -> {
      return r2.value.intValue() - r1.value.intValue();
    });
    return results.subList(0, Math.min(results.size(), 100));
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/StateStoreViewer.java

ま、今回も面白かった。次はAvro試してみたいなー。

Kafka StreamsをSpring Bootで試してみた

この辺とか読みながら→ Introduction — Confluent Platform 3.1.0 documentation

Twitter4Jで適当に英語のツイートをとってきて、Kafkaに流し込む。(キーとかの設定はtwitter4j.propertiesに入れておいた)

@Component
public class TwitterToKafka implements CommandLineRunner {

  @Autowired
  private KafkaTemplate<String, String> template;

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("myTopic", status.getText());
      }
    });
    twitterStream.filter(filterQuery());
  }

  private FilterQuery filterQuery() {
    // 英語のツイートで"spring"を含んでいるものにしてみた。ちょうど良いくらいの量だったので。
    FilterQuery filter = new FilterQuery();
    filter.language("en");
    filter.track("spring");
    // filter.follow(TwitterFactory.getSingleton().showUser("bufferings").getId());
    return filter;
  }
}

myTopicって名前のTopicに流し込むようにしたので、今度はそれを処理する定義を書く。単語の数を数えてみた。これがWordCountStoreってストアに保存される。

@Configuration
@EnableKafkaStreams
public class KStreamConfig {

  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs(KafkaProperties kafkaProperties) {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
    // Spring BootのAutoConfigurationに入ってるやつ使えばいいかなと思って
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    // 動作確認用に1秒ごとにコミット
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000");
    return new StreamsConfig(props);
  }

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream(Serdes.String(), Serdes.String(), "myTopic");
    // @formatter:off
    stream
      // 単語に分割して
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      // 単語でGroupBy
      .groupBy((key, word) -> word, Serdes.String(), Serdes.String())
       // カウントを取る(KTable)
      .count("WordCountStore");
    // @formatter:on
    return stream;
  }

}

ついでに、KTableの中身を表示するのを書いてみた。

@RestController
public class StateStoreViewer {

  @Autowired
  private KStreamBuilderFactoryBean kStreamBuilder;

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("WordCountStore",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    return results;
  }

}

ら、大量に出てきてしまった。しかも良くわからん感じ。

f:id:bufferings:20170608085740p:plain

でも、面白かったー!