怖くないR2DBC

注意:R2DBCはまだ本番では使わないようにね(2018-11-18現在)

昨日R2DBCを初めて触ってみて「ふーん、いまいちよく分かってないなぁ」と思いながら寝たんだけど。

bufferings.hatenablog.com

朝起きたら「あ、そういうことか」ってなったので、考えの整理。

## 全体概要

f:id:bufferings:20181118081104p:plain

実装で気にするのはConnectionFactoryを生成する部分くらいで、あとはr2dbc-spiを見れば良さそう。

r2dbc-spiは各RDBMSに対して実装しやすいようにシンプルなインターフェースになってるのでこれを直接使うとしんどい。利用者としてはこれを直接触るのではなく、r2dbc-clientみたいに人にとって使いやすい形にラッピングしてくれるやつを使う。

r2dbc-spiはSpring Frameworkとは全然関係なくて、reactive-streamsだけに依存しててPublisherを返す。r2dbc-clientやspring-data-r2dbcはMonoFluxを使ってる。

## 今日の予定

  1. PostgresqlConnectionFactoryを見てみる
  2. r2dbc-spiを見てみる

r2dbc-clientとspring-data-r2dbcはまた今度見る。

## 1. PostgresqlConnectionFactoryを見てみる

https://github.com/r2dbc/r2dbc-postgresql/blob/v1.0.0.M5/src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

PostgresqlConnectionConfigurationを受け取ってインスタンスを生成。

    public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configuration) {
        this(Mono.defer(() -> {
            Objects.requireNonNull(configuration, "configuration must not be null");

            return ReactorNettyClient.connect(configuration.getHost(), configuration.getPort()).cast(Client.class);
        }), configuration);
    }

    PostgresqlConnectionFactory(Mono<? extends Client> clientFactory, PostgresqlConnectionConfiguration configuration) {
        this.clientFactory = Objects.requireNonNull(clientFactory, "clientFactory must not be null");
        this.configuration = Objects.requireNonNull(configuration, "configuration must not be null");
    }

ClientFactoryMonoになってて、Connectionの生成部分ではMonoを返してる。

    @Override
    public Mono<PostgresqlConnection> create() {
        return this.clientFactory
            .delayUntil(client ->
                StartupMessageFlow
                    .exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase().orElse(null), this.configuration.getUsername())
                    .handle(PostgresqlServerErrorException::handleErrorResponse))
            .map(client -> new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE, new IndefiniteStatementCache(client)));
    }

へー。こんな風に書くのか(まだリアクティブ周りよく分かってない。勉強しなきゃ。

あ、そういえば、r2dbc-postgresqlPublisherじゃなくてMonoFlux使ってるのか。ふーん。

### 使い方

使い方はr2dbc-clientのREADMEにこう書いてる。

PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration.builder()
    .host("<host>")
    .database("<database>")
    .username("<username>")
    .password("<password>")
    .build();

R2dbc r2dbc = new R2dbc(new PostgresqlConnectionFactory(configuration));

このR2dbcはr2dbc-client側のクラスなんだけど、ConnectionFactoryを受け取って中で必要に応じてコネクションを生成して処理するんだろうな。

## 2. r2dbc-spiを見てみる

https://github.com/r2dbc/r2dbc-spi/tree/v1.0.0.M5/r2dbc-spi/src/main/java/io/r2dbc/spi

シンプル。だいたいこんな感じか。

f:id:bufferings:20181118091943p:plain

ConnectionStatementPublisherを返してる。

それとこの図には書いてないけどStatementと同じ並びにBatchインターフェースがある。

### Result

Resultには2つのメソッドがあって

    Publisher<Integer> getRowsUpdated();

    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> f);

getRowsUpdatedは更新された行数をPublisherで返す。

mapRow (とそのメタデータ)を受け取って、自分の好きな型に変換してPublisherで返す。

### 使い方

直接使うとこんな感じになる。r2dbc-postgresqlMonoFluxを使ってるからこんな感じに書けるけど、もし純粋にr2dbc-spiに対して書くならPublisherを使わなきゃだから、もうちょっとごちゃっとしそう?

  private Flux<String> sample() {
    var connectionFactory = getPostgresqlConnectionFactory();
    return connectionFactory.create()
        .flatMapMany(connection ->
            connection.createStatement("SELECT city FROM weather")
                .execute()
                .flatMap(result ->
                    result.map((row, metadata) ->
                        row.get("city", String.class)
                    )
                )
        );
  }

PostgresqlConnectionFactorycreate()で生成したMono<PostgresqlConnection>から結果のFluxを返すためにflatMapManyを使用。

コネクションからステートメントを生成して実行。

その結果のFlux<PostgresqlResult>に対するflatMapRowからStringを生成してFlux<String>返してる。

最初にも書いた通り、実際にはこれを直接使うんじゃなくて、r2dbc-clientのようなラッパーを使う。

ふむふむ。ちょっと理解が進んだ。