Kafka StreamsをSpring Bootで(その2

これの続き。ちょーっとずつ進んでるのかなぁ。

bufferings.hatenablog.com

こうなった

f:id:bufferings:20170610205644p:plain

日本語のsampleストリーム

今度は日本語のsampleストリームを取ってくるようにしてみた。

  @Override
  public void run(String... args) throws Exception {
    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      @Override
      public void onStatus(Status status) {
        template.send("TweetTexts", status.getText());
      }
    });
    twitterStream.sample("ja");
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/TwitterToKafka.java

ので、kuromojiで分解。

  @Bean
  public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) {
    KStream<String, String> stream = kStreamBuilder.stream("TweetTexts");
    // @formatter:off
    stream
      .flatMapValues(value -> tokenize(value))
      .groupBy((key, word) -> word)
      .count("TweetTextWords");
    // @formatter:on
    return stream;
  }

  private static final Tokenizer tokenizer = new Tokenizer();

  private List<String> tokenize(String value) {
    // @formatter:off
    return tokenizer.tokenize(value).stream()
      // 辞書にあって名詞で2文字以上のにしてみた
      .filter(token -> token.isKnown() && token.getPartOfSpeechLevel1().equals("名詞") && token.getBaseForm().length() >= 2)
      .map(token -> token.getBaseForm())
      .collect(Collectors.toList());
    // @formatter:on
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/KStreamConfig.java

全部返しても見づらいから

強引に上位100件だけ返すようにしてみた。本当に作るときはこんなことしないけどね。

  @GetMapping
  public List<KeyValue<String, Long>> query() {
    final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("TweetTextWords",
        QueryableStoreTypes.keyValueStore());

    final List<KeyValue<String, Long>> results = new ArrayList<>();
    final KeyValueIterator<String, Long> range = store.all();
    while (range.hasNext()) {
      results.add(range.next());
    }

    // 強引に上位100件をとってみた
    Collections.sort(results, (r1, r2) -> {
      return r2.value.intValue() - r1.value.intValue();
    });
    return results.subList(0, Math.min(results.size(), 100));
  }

https://github.com/bufferings/kafka-streams-demo/blob/v0.1/src/main/java/com/example/demo/StateStoreViewer.java

ま、今回も面白かった。次はAvro試してみたいなー。