ScalaPB integration

ScalaPB is a protocol buffer compiler (protoc) plugin for Scala. It will generate Scala case classes, parsers and serializers for your protocol buffers. ScalaPB also supports a thin wrapper around grpc-java, and provides you with an interface that is based on Scala's standard library Future, while streaming is based on the Observer pattern. If you integrate ScalaPB with Armeria, you can leverage the followings:

  • Using both gRPC server and gRPC client features such as gRPC-over-HTTP/1 and gRPC-Web protocol powered by Armeria
  • Browsing the list of available RPC operations and invoking a service operation via a web console
  • Converting either a Protocol Buffers or JSON to and from an scalapb.GeneratedMessage in annotated service

First, you need the armeria-scalapb_2.12 or armeria-scalapb_2.13 dependency to use the above features:

build.sbt
libraryDependencies += "com.linecorp.armeria" %% "armeria-scalapb" % "1.13.3"

Running a gRPC service

You need to register a ScalaPB gRPC stub to a GrpcService using a GrpcServiceBuilder and add it to the ServerBuilder:

import com.linecorp.armeria.common.scalapb.ScalaPBJsonMarshaller
import com.linecorp.armeria.server.Server
import com.linecorp.armeria.server.docs.DocService
import com.linecorp.armeria.server.grpc.GrpcService

// Creates GrpcService with your gRPC stub generated by ScalaPB.
val grpcService =
      GrpcService
        .builder()
        // Add your ScalaPB gRPC stub using `bindService()`
        .addService(YourServiceGrpc.bindService(
          new YourServiceImpl, ExecutionContext.global))
        // Register `ScalaPBJsonMarshaller` for supporting gRPC JSON format.
        .jsonMarshallerFactory(_ => ScalaPBJsonMarshaller())
        .enableUnframedRequests(true)
        .build()

// Creates Armeria Server for ScalaPB gRPC stub.
Server.builder()
      .http(httpPort)
      .https(httpsPort)
      .service(grpcService)
      // Add DocService for browsing the list of gRPC services and
      // invoking a service operation from a web form.
      // See https://armeria.dev/docs/server-docservice for more information.
      .serviceUnder("/docs", new DocService())
      .build()

Please see gRPC service for more information.

Calling a gRPC service

You can also call a gRPC service using a ScalaPB gRPC client. ScalaPbJsonMarshaller should be registered with GrpcClientOptions.GRPC_JSON_MARSHALLER_FACTORY to support gRPC JSON serialization format.

import com.linecorp.armeria.client.Clients
import com.linecorp.armeria.client.grpc.GrpcClientOptions
import com.linecorp.armeria.common.scalapb.ScalaPbJsonMarshaller

val client =
  Clients.builder("gproto+http://127.0.0.1:8080/")
         // Register 'ScalaPBJsonMarshaller' for enabling gRPC JSON serialization format
         .option(GrpcClientOptions.GRPC_JSON_MARSHALLER_FACTORY
                                  .newValue(_ => ScalaPbJsonMarshaller()))
         .build(classOf[HelloServiceBlockingStub])

val request = HelloRequest("Armerian World")
val reply = helloService.hello(request)
assert(reply.message == "Hello, Armerian World!")

Please see gRPC client for more information.

Supporting ScalaPB in annotated services.

Converting an HTTP request to a ScalaPB's GeneratedMesage

  • A Protocol Buffers is automatically converted to a GeneratedMessage only when the content type is one of followings:
    • application/probuf
    • application/octet-stream
    • application/x-protobuf
    • no media type negotiated
  • A JSON is automatically converted to a GeneratedMessage only when the content type is either application/json or ends with +json.

If you have the following proto file,

syntax = "proto3";
package com.example.testing;
option java_package = "com.example.testing";

message SimpleRequest {
  string payload = 1;
  int32 size = 2;
}

message SimpleResponse {
  string message = 1;
  int32 status = 2;
}

the SimpleRequest generated by ScalaPB could be used for a parameter of your service method.

import com.example.testing.SimpleRequest
import com.linecorp.armeria.server.annotation.{ConsumesJson, ConsumesProtobuf, Post}

class GreetingService {
  // If a content type is not set, Protoco Buffers' parser is used by default.
  @Post("/no-content-type")
  def noContentType(request: SimpleRequest): String = s"Hello, ${request.payload}!"

  // Convert the Protocol Buffers in an HTTP payload into the 'SimpleRequest'
  @Post("/protobuf")
  @ConsumesProtobuf
  def consumeProtobuf(request: SimpleRequest): String = s"Hello, ${request.payload}!"

  // Convert the JSON in an HTTP payload into a 'SimpleRequest'
  @Post("/json")
  @ConsumesJson
  def consumeJson(request: SimpleRequest): String = "Hello, Armeria!"
}

Returning a ScalaPB's GeneratedMesage

A GeneratedMesage can be converted to either Protocol Buffers or JSON.

import com.example.testing.SimpleResponse
import scala.concurrent.Future

class GreetingService {

  // Convert 'SimpleResponse' into Protocol Buffers wire format
  @Get("/protobuf")
  @ProducesProtobuf
  def produceProtobuf: SimpleResponse = SimpleResponse("Hello, Armeria!")

  // Convert 'SimpleResponse' into JSON format
  @Get("/json")
  @ProducesJson
  def produceJson: SimpleResponse = SimpleResponse("Hello, Armeria!")

  // Convert 'SimpleResponse' into Protocol Buffers wire format with Scala Future
  @Get("/protobuf+async")
  @ProducesProtobuf
  def produceProtobufFuture: Future[SimpleResponse] =
    Future { SimpleResponse("Hello, Armeria!") }
}

However, a sequence of GeneratedMesages can be only converted to JSON array because Protocol Buffers wire format is not self-delimiting. The following collection types can be converted to JSON array:

  • scala.List
  • scala.Vector
  • scala.Set
  • Reactive Stream Publisher
  • java.util.List
  • java.util.Set
  • java.util.stream.Stream

Injecting an ExecutionContext to an annotated service method

An ExecutionContext could be automatically injected as a method parameter to execute an asynchronous service on top of an Armeria's event loop or blocking task executor.

import com.linecorp.armeria.server.ServiceRequestContext
import com.linecorp.armeria.server.annoation.Blocking
import scala.concurrent.Future

class MyAsyncService {
  @Post("/async")
  def asyncService(req: SimpleRequest)(implicit ec: ExecutionContext): Future[String] =
    Future {
      // This callback will be executed in an Armeria's event loop
      assert(ServiceRequestContext.current() != null)
      "Hello, Armeria!"
    }

  @Blocking
  @Post("/blocking-task")
  def blockingAsyncService(req: SimpleRequest)(implicit ec: ExecutionContext): Future[String] =
    Future {
      // This callback will be executed in an Armeria's blocking task executor
      assert(ServiceRequestContext.current() != null)
      // Perform a long running task
      "Hello, Armeria!"
    }
  }
}