ScalaPB integration
Table of contents
ScalaPB is a protocol buffer compiler (protoc) plugin for Scala. It will generate Scala case classes, parsers and serializers for your protocol buffers. ScalaPB also supports a thin wrapper around grpc-java, and provides you with an interface that is based on Scala's standard library Future, while streaming is based on the Observer pattern. If you integrate ScalaPB with Armeria, you can leverage the followings:
- Using both gRPC server and gRPC client features such as gRPC-over-HTTP/1 and gRPC-Web protocol powered by Armeria
- Browsing the list of available RPC operations and invoking a service operation via a web console
- Converting either a Protocol Buffers or JSON to and from an scalapb.GeneratedMessage in annotated service
First, you need the armeria-scalapb_2.12
or armeria-scalapb_2.13
dependency to use the above features:
libraryDependencies += "com.linecorp.armeria" %% "armeria-scalapb" % "1.31.3"
Running a gRPC service
You need to register a ScalaPB gRPC stub to a GrpcService
using a GrpcServiceBuilder
and add it to the ServerBuilder
:
import com.linecorp.armeria.common.scalapb.ScalaPbJsonMarshaller
import com.linecorp.armeria.server.Server
import com.linecorp.armeria.server.docs.DocService
import com.linecorp.armeria.server.grpc.GrpcService
// Creates GrpcService with your gRPC stub generated by ScalaPB.
val grpcService =
GrpcService
.builder()
// Add your ScalaPB gRPC stub using `bindService()`
.addService(YourServiceGrpc.bindService(
new YourServiceImpl, ExecutionContext.global))
// Register `ScalaPbJsonMarshaller` for supporting gRPC JSON format.
.jsonMarshallerFactory(_ => ScalaPbJsonMarshaller())
.enableUnframedRequests(true)
.build()
// Creates Armeria Server for ScalaPB gRPC stub.
Server.builder()
.http(httpPort)
.https(httpsPort)
.service(grpcService)
// Add DocService for browsing the list of gRPC services and
// invoking a service operation from a web form.
// See https://armeria.dev/docs/server-docservice for more information.
.serviceUnder("/docs", new DocService())
.build()
Please see gRPC service for more information.
Calling a gRPC service
You can also call a gRPC service using a ScalaPB gRPC client.
ScalaPbJsonMarshaller
should be registered with GrpcClientBuilder.jsonMarshallerFactory()
to
support gRPC JSON serialization format.
import com.linecorp.armeria.client.grpc.GrpcClients
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats
import com.linecorp.armeria.common.scalapb.ScalaPbJsonMarshaller
val client =
GrpcClients.builder("http://127.0.0.1:8080/")
.serializationFormat(GrpcSerializationFormats.JSON)
// Register 'ScalaPbJsonMarshaller' for enabling gRPC JSON serialization format
.jsonMarshallerFactory(_ => ScalaPbJsonMarshaller())
.build(classOf[HelloServiceBlockingStub])
val request = HelloRequest("Armerian World")
val reply = helloService.hello(request)
assert(reply.message == "Hello, Armerian World!")
Please see gRPC client for more information.
Supporting ScalaPB in annotated services.
Converting an HTTP request to a ScalaPB's GeneratedMesage
- A Protocol Buffers is automatically converted to a
GeneratedMessage
only when the content type is one of followings:application/probuf
application/octet-stream
application/x-protobuf
- no media type negotiated
- A JSON is automatically converted to a
GeneratedMessage
only when the content type is eitherapplication/json
or ends with+json
.
If you have the following proto file,
syntax = "proto3";
package com.example.testing;
option java_package = "com.example.testing";
message SimpleRequest {
string payload = 1;
int32 size = 2;
}
message SimpleResponse {
string message = 1;
int32 status = 2;
}
the SimpleRequest
generated by ScalaPB could be used for a parameter of your service method.
import com.example.testing.SimpleRequest
import com.linecorp.armeria.server.annotation.{ConsumesJson, ConsumesProtobuf, Post}
class GreetingService {
// If a content type is not set, Protoco Buffers' parser is used by default.
@Post("/no-content-type")
def noContentType(request: SimpleRequest): String = s"Hello, ${request.payload}!"
// Convert the Protocol Buffers in an HTTP payload into the 'SimpleRequest'
@Post("/protobuf")
@ConsumesProtobuf
def consumeProtobuf(request: SimpleRequest): String = s"Hello, ${request.payload}!"
// Convert the JSON in an HTTP payload into a 'SimpleRequest'
@Post("/json")
@ConsumesJson
def consumeJson(request: SimpleRequest): String = "Hello, Armeria!"
}
Returning a ScalaPB's GeneratedMesage
A GeneratedMesage
can be converted to either Protocol Buffers or JSON.
import com.example.testing.SimpleResponse
import scala.concurrent.Future
class GreetingService {
// Convert 'SimpleResponse' into Protocol Buffers wire format
@Get("/protobuf")
@ProducesProtobuf
def produceProtobuf: SimpleResponse = SimpleResponse("Hello, Armeria!")
// Convert 'SimpleResponse' into JSON format
@Get("/json")
@ProducesJson
def produceJson: SimpleResponse = SimpleResponse("Hello, Armeria!")
// Convert 'SimpleResponse' into Protocol Buffers wire format with Scala Future
@Get("/protobuf+async")
@ProducesProtobuf
def produceProtobufFuture: Future[SimpleResponse] =
Future { SimpleResponse("Hello, Armeria!") }
}
However, a sequence of GeneratedMesage
s can be only converted to JSON array because Protocol Buffers
wire format is not self-delimiting.
The following collection types can be converted to JSON array:
scala.List
scala.Vector
scala.Set
- Reactive Stream Publisher
java.util.List
java.util.Set
java.util.stream.Stream
Injecting an ExecutionContext to an annotated service method
An ExecutionContext
could be automatically injected as a method parameter to execute an asynchronous service
on top of an Armeria's event loop or blocking task executor.
import com.linecorp.armeria.server.ServiceRequestContext
import com.linecorp.armeria.server.annoation.Blocking
import scala.concurrent.Future
class MyAsyncService {
@Post("/async")
def asyncService(req: SimpleRequest)(implicit ec: ExecutionContext): Future[String] =
Future {
// This callback will be executed in an Armeria's event loop
assert(ServiceRequestContext.current() != null)
"Hello, Armeria!"
}
@Blocking
@Post("/blocking-task")
def blockingAsyncService(req: SimpleRequest)(implicit ec: ExecutionContext): Future[String] =
Future {
// This callback will be executed in an Armeria's blocking task executor
assert(ServiceRequestContext.current() != null)
// Perform a long running task
"Hello, Armeria!"
}
}
}