Sending a streaming response with back pressure

Suppose we want to serve a static file whose size is larger than your available memory. If we load the file into the memory at once, we will definitely get an OutOfMemoryError.

To prevent such error and send large files without consuming too much memory:

  1. Divide the file into chunks.
  2. Load the first chunk into memory.
  3. Send the chunk to the client.
  4. Wait until the chunk is written to the sending socket buffer. (See below for explanation.)
  5. Load the second chunk and repeat the steps 3 and 4 until we send all chunks.

Waiting for the chunk to be written is to avoid loading data into memory when the client is not ready to receive. This is called back pressure. See Let’s Play with Reactive Streams on Armeria - 1 to learn the conditions under which an OutOfMemoryError is raised and back pressure which can help you with the error.

Sending a file with back pressure

To send a file with back pressure, use HttpFile, which loads and sends file chunks one by one to the client with back pressure.

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.file.HttpFile;

HttpFile bigFile = HttpFile.of(new File("/var/www/big_file.dat"));
Server.builder()
      .service("/big_file.dat", bigFile.asService())
      .build();

Sending a streaming response using HttpResponseWriter

To send large data other than files such as database, you need to implement back pressure yourself. Let's start off with implementing a minimal HttpFile.

Prepare to send a streaming response with HttpResponseWriter returned by HttpResponse.streaming().

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.server.ServerBuilder;

// ⚠️ This code has a problem. Do not copy/paste and use it.
ServerBuilder sb = ...;
sb.service("/big_file.dat", (ctx, req) -> {
    HttpResponseWriter response = HttpResponse.streaming();
    // We must write the response headers first.
    response.write(ResponseHeaders.of(200));
    response.write(produceChunk(0));
    response.write(produceChunk(1));
    response.write(produceChunk(2));
    ... // Write more chunks until we send all chunks.
    // Call close() to end the response.
    response.close();
    return response;
}

...
private HttpData produceChunk(int index) {
    // Divide the file by the pre-defined chunk size(e.g. 8192 bytes)
    // and read it using index.
    // If index is 0, 0 to 8192 bytes from the file is read.
    // If index is 1, 8193 to 16384 is read and so on.
}

With the code above, the server would encounter OutOfMemoryError. We still need to take care of preventing loading data chunks into memory before a chunk is sent to the client. To solve the problem, implement back pressure with StreamWriter.whenConsumed():

sb.service("/big_file.dat", (ctx, req) -> {
    HttpResponseWriter response = HttpResponse.streaming();
    response.write(ResponseHeaders.of(200));
    response.whenConsumed().thenRun(() -> {
        // Produce the first chunk when the ResponseHeaders is
        // written to the socket.
        response.write(produceChunk(0));
        response.whenConsumed().thenRun(() -> {
            // Produce the second chunk when the first chunk is
            // written to the socket.
            response.write(produceChunk(1));
            ...
        });
    });
    return response;
});

StreamWriter.whenConsumed() returns a CompletableFuture that is complete when the chunk written to the HttpResponseWriter is finally written to the socket. This enables you to add the next task by adding a callback (thenRun() in the code example). The next task in the example is set to produce the next chunk.

Of course, we would need to use recursion to be free from infinite implementation of the callback.

sb.service("/big_file.dat", (ctx, req) -> {
    HttpResponseWriter response = HttpResponse.streaming();
    response.write(ResponseHeaders.of(200));
    streamingResponse(response, 0);
    return response;
});

private void streamingResponse(HttpResponseWriter response, int index) {
    if (isEndOfFile()) {
        // Close the response when we send all chunks.
        response.close();
        return;
    }
    response.whenConsumed().thenRun(() -> {
        if (response.tryWrite(produceChunk(index))) {
            streamingResponse(response, index + 1);
        } else {
            // The response is completed unexpectedly.
        }
    });
}

So far, we have implemented a simple version of HttpFile. Now, we can implement a streaming response with back pressure for any type of source (e.g. database) by simply changing the produceChunk() method to fetch data from the source.

You have other alternatives to implement back pressure with; there are libraries such as Reactor and RxJava. If you opt to use such alternative, return the response using HttpResponse.of():

sb.service("/big_data.dat", (ctx, req) -> {
    Flux<HttpData> flux = ... // Fetch data from other source with backpressure.
    return HttpResponse.of(ResponseHeaders.of(200), flux);
});