Skip to main content
info

This post is also available in Korean and English.

Let’s play Reactive Streams with Armeria vol.2

· 16 min read
Um Ikhun
Um Ikhun
Ikhun is developing for open-source Armeria and Central Dogma at LINE Plus.

こんにちは。LINE Plus でオープンソースソフトウェアの Armeria と Central Dogma を開発している UM IKHUN です。前回の記事では、Reactive Streams の概念を解説しました。今回の記事では、Reactive Streams をオープンソース非同期の HTTP/2、RPC、REST クライアント/サーバーライブラリーである Armeria で使用する方法について紹介したいと思います。

What’s Armeria?

Armeria は、Java 8 および Netty、Thrift、gRPC をベースにしたオープンソース非同期 HTTP/2、RPC、REST クライアント/サーバーライブラリーです。Armeria は、軽量(Lightweight)のマイクロサービスフレームワークですが、サポートする機能は、他のフルスタック(full stack)ウェブフレームワークと比較しても劣りません。

まず、Armeria で Reactive Streams を活用したサーバーを実現するために、基本的に知っておくべきことを説明します。

サポートするプロトコル

Armeria では、HTTP/1 と HTTP/2 を両方ともサポートしており、この 2 つのプロトコルはcleartextと TLS(Transport Layer Security)暗号化通信をすべてサポートしています。HTTP/1 から HTTP/2 への互換性をサポートするためのプロトコルアップグレードについては、HTTP/2 の「connection preface」と HTTP/1 の「upgrade request」を両方ともサポートしています。

また、Armeria では gRPC と Thrift が、HTTP/1 と HTTP/2 の両方で動作します。これは Armeria ならではの特別な機能です。gRPC は HTTP/1 をサポートせず、既存の Thrift では HTTP/2 をサポートしません。しかし、Armeria ではすべてサポートしており、多様なビジネス環境で柔軟に使用できます。また、Linux 環境では、JNI(Java Native Interface)ベースのソケット IO と BoringSSL ベースの TLS により、一層スピーディに本番環境で使用できます。

では、サンプルコードを見ながら Armeria について 1 つずつ説明します。

サンプルコードで見る Armeria

Armeria は、ユーザーに優しい API です。コードが簡潔で使いやすくなっています。「Hello world」サーバーを実行したいときは、以下のように 5 行を作成するたけで済みます。

// Build your own server under 5 lines.
var server = Server.builder()
.http(8080)
.service("/", (ctx, req) -> HttpResponse.of("Hello, World!"))
.build();

server.start();

サーバーを簡単に実行できるというのは、マイクロサービス環境において各ビジネスコンポーネントを分離し、独立したサーバーとして管理するときにポイントになることです。

また、サーバーのアーキテクチャをシンプルにすることができます。HTTPS や HTTP/2 を使用するために別途のサイドカー(sidecar)である Nginx や Apache Httpd のような静的 Web サーバーを実行する必要がありません。前述のように、Linux 環境では JNI ベースのソケット IO と BoringSSL をサポートするため、別途の性能低下を考慮する必要はありません。なお、JS や CSS、画像のような静的ファイルをホストする機能も提供しています(参照)。

var server = Server.builder()
.http(8080)
.https(8443) // HTTPS support
.tlsSelfSigned()
.service("/", (ctx, req) -> HttpResponse.of("Hello, World!"))
.build();
server.start();

ネットワークホップ(hop)を追加せずに済むため、障害点を減らすことができ、リソースを節約できるので効率よく通信できます。さらに、アーキテクチャがシンプルでモニタリングが簡単になり、サーバーを柔軟に拡張できます。

その他にも有効な機能が多くあります。その 1 つが、Armeria でビルトインで提供するアノテーション(annotation)です。アノテーションを利用すると、Armeria の機能をより簡単に使用できます。例えば、以下のように hello をプリフィックス(prefix)として、name を経路変数として設定し、ルーティングするコードを簡単に作成できます(Armeria の公式文書を参照すると、その他に多様なアノテーションをより詳しく見ることができます)。

import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.PathPrefix;

// Use built-in annotations for mapping path and parameter injection
@PathPrefix("/hello")
class HelloService {
@Get("/:name")
public String hello(@Param String name) {
return String.format("Hello, %s!", name);
}
}

Armeria には、他の種類の RPC プロトコルを同時に扱える特別な機能もあります。1 つのサーバーで REST API と gRPC、Thrift をすべて提供できるため、ビジネスの要望やアーキテクチャの変化に柔軟に対応できます。また、単一ポートで提供しているので、リソースを効率よく使用してセキュリティの面で不要な露出を最低限に抑え、管理ポイントを削減できるメリットがあります。[1]

var server = Server.builder()
.http(8080)
.service("/hello/rest", (ctx, req) -> HttpResponse.of("Hello, world!"))
.service("/hello/thrift", THttpService.of(new ThriftHelloService()))
.service("/hello/grpc", GrpcService.builder()
.addService(new GrpcHelloService())
.build())
.build();

エンタープライズソフトウェアの開発者は一般的に、認証やロギングなどをいかに効率よく処理できるかに関心を持って悩むと思います。Armeria では、separation of concernsと呼んでいるこのような部分を別途decoratorで管理できる機能を提供しています。[2]

また、標準で提供されていない decorator が必要であれば、直接簡単に実装し、特定経路やサービスにバインディングして使用することもできます。例えば、下図で実装した AuthService では、request に認証情報が含まれているときのみサービスを呼び出し、そうでなければ「401 unauthorized」エラーを発生させるようにしました。

次に、Armeria で Reactive Streams をどのようにサポートしているかを説明します。

Armeria 内の HTTP/2 ストーリム

ストリーム(stream)は流れている水のように、有機的に繋がり続ける必要があります。ある 1 か所だけ開いていて他のところは詰まっていると、すぐ溢れてしまいます。Armeria では、データの流れを有機的にコントロールするために、サーバー内では Reactive Streams のバックプレッシャーを利用してトラフィックをコントロールします。また、WINDOW_UPDATEを利用したHTTP/2 stream flow controlで、バックプレッシャーが配下のネットワークレイヤーで Armeria のサーバーと皆さんが実装したサービス、そしてデータのリポジトリまで有機的に繋げられるようにしました。

もし皆さんのサービスで使用しているサーバーを、今すぐ Armeria の Reactive サーバーに取り換えることができない事情があるなら、以下のように Armeria を Reactive プロキシサーバーとして活用することもできます(参照)。

// Use Armeria’s async & reactive HTTP/2 client.
var client = HttpClient.of("h2c://backend");
var server = Server.builder()
.http(8080) // Forward all requests reactively
.service("prefix:/", (ctx, req) -> client.execute(req))
.build();

Armeria プロキシサーバーを前に配置すると、皆さんのサーバーを厳しい外部のインターネットから安全に保護できます。

Reactive Streams と Armeria の統合

Armeria で直接 Reactive Streams をサポートするサーバーを作ると、より多様な機能を使用できます。Armeria で Reactive Streams を活用する方法とビルトイン publisher についてより詳しく説明します。

Armeria HTTP レスポンスの Publisher

Armeria のレスポンスは、ヘッダーを表現するHttpHeadersとデータを表現するHttpDataで構成されています。ここでは、RxJava のObservableを Armeria のHttpResponseに取り換える過程を、ステップごとに見てみましょう。

データの準備が終わったら、まず Observable の map 演算子を利用して dataStream を HttpData でラップします。

// 1. Fetch data from Reactive Streams Publisher
Observable<String> dataStream = Observable.just("a", "b", "c", "d", "e");

// 2. Convert string to Armeria HttpData
Observable<HttpData> httpDataStream = dataStream.map(HttpData::ofUtf8);

どのようにレスポンスするかを決めてレスポンスヘッダーを準備した後、concat 演算子を利用して、上記で準備した httpDataStream と合わせると、HttpHeaders から HttpData に繋がる 1 つのストリームが完成します。

// 3. Prepare response headers
ResponseHeaders httpHeaders = ResponseHeaders.of(HttpStatus.OK);

// 4. Concat http header and body stream
Observable<HttpObject> responseStream = Observable.concat(Observable.just(httpHeaders), httpDataStream);

完成したストリームを Observable の toFlowable 関数を利用して Reactive Streams のFlowableに変換した後、最終的に Armeria の HttpResponse でラップすれば終わりです。

// 5. Convert Observable to Armeria Response Stream
HttpResponse response = HttpResponse.of(responseStream.toFlowable(BackpressureStrategy.BUFFER));

Armeria のビルトイン Publisher

上記の過程がやや複雑に、あるいは長くて退屈に感じられたかもしれません。そこで、Armeria では Web でストリームデータを送信する方法の標準であるJSON Text Sequences(RFC 7464)と HTML5 の規格であるServer-sent eventsに対するビルトイン Publisher を提供します。この publisher を利用すると、Reactive Streams をより便利に Web で送信できます。

// Fetch data from Reactive Streams Publisher
Publisher<String> dataStream = Flux.just("a", "b", "c", "d", "e");

// Convert Publisher to JSON Text Sequences with Armeria HttpResponse
// with "application/json-seq" MIME type
HttpResponse httpResponse = JsonTextSequences.fromPublisher(dataStream);

// Convert Publisher to Server-sent Events with Armeria HttpResponse
// with "text/event-stream" MIME type
HttpResponse httpResponse = ServerSentEvents
.fromPublisher(dataStream, SeverSentEvent::ofData);

また、ビルトイン publisher をより簡単に使用できるように RxJava 統合をサポートしています。アノテーションのサービスに@ProducesJsonSequences アノテーションを追加し、Observable をそのまま返すと、Armeria で当該プロトコルで自動的に変換します。

import io.reactivex.Observable;
import com.linecorp.armeria.server.annotation.ProducesJsonSequences;

class RxJavaService {
@Get("/json-streaming")
// Generate JSON Text Sequences
@ProducesJsonSequences
public Observable<String> json() {
// Just return RxJava Observable!
return Observable.just("a", "b", "c");
}
};

以下のように「JsonTextSequences」でエンコードすると、JSON 文字列の最初と最後のところにレコード分離文字(separator)とラインフィード(line feed)が追加されます。また、現在のプロトコルによって HTTP/1 や HTTP/2 に送信される際、違う動作になるべきです。Armeria では現在繋がっているプロトコルに従って、適したタイプのデータを送信します。HTTP/2 の場合は、Data frameの中に JSON データを分けて送り、HTTP/1 の場合はchunked transfer encodingを利用して分けて送ります。

Spring WebFlux の統合

Armeria は、さまざまなライブラリーおよびフレームワークとの連携をサポートします。Reactive Streams を使用するために、すでにSpring WebFluxを使用している場合は、コードを別途修正せずに「armeria-spring-boot-webflux-starter」を依存性に追加(参照)するだけで、Armeria へのマイグレーションを完了できます。このような方法で、WebFlux のネットワークレイヤーであるReactor-Nettyを Armeria の Reactive エンジンに取り換えることができます。

単にエンジンを取り換えるだけなら「ただ WebFlux だけ使用すればいいんじゃないか」という疑問が湧くかもしれません。エンジンを取り換えることに、果たしてどのようなメリットがあるのでしょう。

エンジンを取り換えると、ArmeriaServerConfiguratorにより Spring で Armeria の機能を追加できるため、既存の Spring ではサポートしない Armeria ならではの機能を活用できます。例えば、以下のように REST API で構成される既存の Spring Webflux サーバーに gRPC や Thrift 機能を追加し、従来と同じく単一ポートでサービスできます。

@Configuration
public class ArmeriaConfiguration {
// Configure the server by providing an ArmeriaServerConfigurator bean.
@Bean
public ArmeriaServerConfigurator armeriaServerConfigurator() {
// Customize the server using the given ServerBuilder. For example:
return builder -> {
// Add DocService that enables you to send gRPC and Thrift requests from web browser.
builder.serviceUnder("/docs", new DocService());
// Log every message which the server receives and responds.
builder.decorator(LoggingService.newDecorator());
// Write access log after completing a request.
builder.accessLogWriter(AccessLogWriter.combined(), false);
// You can also bind asynchronous RPC services such as Thrift and gRPC:
builder.service(THttpService.of(…));
builder.service(GrpcService.builder()…build());
};
}
}

なお、他にも前述で説明したように decorator 機能を活用し、さらに機能が豊富なサービスを構築できます。

gRPC のストリームサポート

gRPC は、StreamObserverを利用してストリームをサポートします(参照)。Reactive Streams で発行(publish)されるデータをStreamObserverで簡単に送信できます。サンプルで見てみます。

まず以下のように protobuf を利用してデータをどのようにやり取りするか、インターフェースを定義します。

syntax = "proto3";
package users;
option java_package = "users";

service UserService {
// Returns all user information
rpc getAllUsers(UserRequest) returns (stream User) {}

// Push to stream of users
rpc pushToUsers(stream User) returns (Result) {}
}

会員全体にメールまたはプッシュメッセージを送信したいときは、まずすべてのユーザー情報を取得する必要があります。もし会員数があまりにも多い場合、データを一度に送信することは難しいかもしれません。そのような場合は、以下のような方法で大量のユーザー情報を返すストリームサーバーを gRPC で構築することができます。

  1. ProjectReactor の Flux を Publisher として使用し、リポジトリからストリームでデータを取得します。
  2. データを、gRPC の StreamObservser に変換して外部へ送信します。
// Implement interfaces generated by gRPC
public final class UserServiceImpl extends UserServiceImplBase {
@Override
public void getAllUsers(UserRequest request, StreamObserver<User> responseObserver) {
final Flux<User> userPublisher = userRepo.findAll();
publisher.subscribe(responseObserver::onNext,
responseObserver::onError,
responseObserver::onCompleted);
}
}

StreamObserver にも Reactive Streams と同様に onNext、onError、onCompleted 関数が存在するため、それぞれの API に委譲するだけで終わります。

今度は受信する側で、すべてのユーザー情報をストリームで受けることを考えてみましょう。ストリームで受けるには、Processorを活用します。Processor は別途の API を持っておらず、Subscriber と Publisher、この 2 つのインターフェースを継承しているだけです。Processor は、データを購読し、購読したデータを再度発行する際に役に立ちます。StreamObserver の onNext で入ってきた新しいユーザー情報を Processor の onNext 関数で送信でき、それを再度購読して必要な作業を追加で行うことができます。

@Override
public StreamObserver<User> pushToUsers(StreamObserver<Result> responseObserver) {
Processor<User, User> processor = EmitterProcessor.create();
Publisher<User> publisher = processor;
Subscriber<User> subscriber = processor;
// Push one-by-one by subscribing publisher
...

return new StreamObserver<User>() {
// subscribe user data
@Override public void onNext(User user) { processor.onNext(user); }
@Override public void onError(Throwable throwable) { processor.onError(throwable); }
@Override public void onCompleted() {
responseObserver.onNext(Result.newBuilder().setStatus(200).build());
responseObserver.onCompleted();
}
};
}

作成した gRPC コードを Armeria で実行してみます。

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.grpc.GrpcService;

// Add your grpc service to Armeria GrpcService
var grpcService = GrpcService.builder()
.addService(new UserServiceImpl())
.build();

var server = Server.builder()
.http(8080)
.serviceUnder("/grpc", grpcService)
.serviceUnder("/docs", new DocService())
.build();

このようにして作ったストリームサーバーを利用し、小さいメモリーでも大量のデータをストリームで処理することができます。

マイクロサービスのための Armeria の機能

Armeria により Reactive Streams を活用すると、大量のデータとトラフィックを柔軟に処理できます。また、RPC でマイクロサービス間の通信も簡単に処理できます。

他にも以下のように、マイクロサービスに必要なさまざまな機能を提供しています。

  • クラウド環境でサーバーの位置を把握するため、Kubernetes タイプの DNS(Domain Name System)と ZooKeeper を活用したサービス検索(discovery)を提供(参照)しています。
  • 検索されたサービスは、クライアント側のロードバランシングで直接負荷を分散してサーバーと通信します。それにより障害点を減らすことができます。
  • L7ではないクライアントが、直接サーバーの状態をチェックできます。
  • ログを確認する際、分散されたサーバーにアクセスせずに確認できるように、Kafka でアクセスログを送信します。
  • Micrometer を活用して必要な数値を設定・収集し、Prometheus や Netflix の Atlas のようなアプリケーションモニタリングツールで送信できます。

おわりに

2 回に渡って Reactive Streams と Armeria について紹介しました。一緒に見てみたように Armeria を利用すると、Reactive Streams と高性能、非同期、RPC、HTTP/2 をサポートするうえ障害に柔軟な、より安全なサーバーを構築できます。

Armeria では、記事で触れた機能以外にもさらに多くの機能を提供しています。それに関しては、Armeria の公式ホームページと以下のリンクをご参照ください。


Footnotes

  1. gRPC サーバーを Armeria で起動する方法Thrift サーバーを Armeria で起動する方法をご参照ください。

  2. Armeria で標準提供するさまざまな decorator は、公式文書と以下の Javadoc でご確認できます。

Like Armeria?
Star us ⭐️

×