これの続き。ちょーっとずつ進んでるのかなぁ。
こうなった
日本語のsampleストリーム
今度は日本語のsampleストリームを取ってくるようにしてみた。
@Override public void run(String... args) throws Exception { TwitterStream twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addListener(new StatusAdapter() { @Override public void onStatus(Status status) { template.send("TweetTexts", status.getText()); } }); twitterStream.sample("ja"); }
ので、kuromojiで分解。
@Bean public KStream<String, String> kStream(KStreamBuilder kStreamBuilder) { KStream<String, String> stream = kStreamBuilder.stream("TweetTexts"); // @formatter:off stream .flatMapValues(value -> tokenize(value)) .groupBy((key, word) -> word) .count("TweetTextWords"); // @formatter:on return stream; } private static final Tokenizer tokenizer = new Tokenizer(); private List<String> tokenize(String value) { // @formatter:off return tokenizer.tokenize(value).stream() // 辞書にあって名詞で2文字以上のにしてみた .filter(token -> token.isKnown() && token.getPartOfSpeechLevel1().equals("名詞") && token.getBaseForm().length() >= 2) .map(token -> token.getBaseForm()) .collect(Collectors.toList()); // @formatter:on }
全部返しても見づらいから
強引に上位100件だけ返すようにしてみた。本当に作るときはこんなことしないけどね。
@GetMapping public List<KeyValue<String, Long>> query() { final ReadOnlyKeyValueStore<String, Long> store = kStreamBuilder.getKafkaStreams().store("TweetTextWords", QueryableStoreTypes.keyValueStore()); final List<KeyValue<String, Long>> results = new ArrayList<>(); final KeyValueIterator<String, Long> range = store.all(); while (range.hasNext()) { results.add(range.next()); } // 強引に上位100件をとってみた Collections.sort(results, (r1, r2) -> { return r2.value.intValue() - r1.value.intValue(); }); return results.subList(0, Math.min(results.size(), 100)); }
ま、今回も面白かった。次はAvro試してみたいなー。