Spring WebFlux

Spring%20Webflux%20Docs 2088E9?logo=quickLook&logoColor=white Spring%20Boot%20Webflux%20Docs 2088E9?logo=quickLook&logoColor=white

Reactor%20Core%20Docs 2088E9?logo=quickLook&logoColor=white Reactor%20Core%20API%20Docs DF7716?logo=devbox&logoColor=white

Maven Dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Overview

The Spring Framework is kind of split in half:

  • Spring Web MVC traditionally incorporates the Servlet API and also operates within Servlet Container. It uses blocking components. Also, JDBC by its nature is also going to be blocking.

  • Spring WebFlux can use @Controller / @RequestMapping annotations, but it also has Router Functions which use functional programming paradigm. It does not operate within Servlet Container.

Spring WebFlux is not compatible with Spring Web MVC.

Spring Framework 5 introduced two reactive types:

🟠 Mono<T>/🟠 Flux<T> operations

🟠 Mono<T> operations:

Modifier and Type Method Description

T

block()

Subscribe to this 🟠 Mono<T> and block indefinitely until a next signal is received. Returns that value, or null if the 🟠 Mono<T> completes empty.

Mono<Person> personMono = personRepository.getById(1);
Person person = personMono.block();
System.out.println(person);

Not recommended to use as it blocks indefinitely

Disposable

subscribe(Consumer<? super T> consumer)

Subscribe a ⚪ Consumer<T> to this 🟠 Mono<T> that will consume all the sequence

personMono.subscribe(System.out::println);

This is recommended to use

<R> Mono<R>

map(Function<? super T,? extends R> mapper)

Transform the item emitted by this 🟠 Mono<T> by applying a synchronous function to it

personMono.map(Person::getFirstName).subscribe(System.out::println);

🟠 Flux<T> operations:

Modifier and Type Method Description

T

blockFirst()

Subscribe to this 🟠 Flux<T> and block indefinitely until the upstream signals its first value or completes.

Flux<Person> personFlux = personRepository.findAll();
Person person = personFlux.blockFirst();
System.out.println(person);

Not recommended to use as it blocks indefinitely

Disposable

subscribe(Consumer<? super T> consumer)

Subscribe a ⚪ Consumer<T> to this 🟠 Flux<T> that will consume all the elements in the sequence.

personFlux.subscribe(System.out::println);

This is recommended to use

<V> Flux<V>

map(Function<? super T,? extends V> mapper)

Transform the items emitted by this 🟠 Flux<T> by applying a synchronous function to each item.

personFlux.map(Person::getFirstName).subscribe(System.out::println);

Flux<T>

filter(Predicate<? super T> p)

Evaluate each source value against the given ⚪ Predicate<T>.

personFlux.filter(person -> person.getFirstName().equals("Diana"))
    .map(Person::getFirstName)
    .subscribe(System.out::println);

Mono<List<T>>

collectList()

Collect all elements emitted by this 🟠 Flux<T> into a ⚪ List that is emitted by the resulting 🟠 Mono<T> when this sequence completes, emitting the empty ⚪ List if the sequence was empty.

Mono<List<Person>> listMono = personFlux.collectList();
listMono.subscribe(list -> list.forEach(System.out::println));

Mono<T>

next()

Emit only the first item emitted by this 🟠 Flux<T>, into a new 🟠 Mono<T>. If called on an empty 🟠 Flux<T>, emits an empty 🟠 Mono<T> (it does not signal exception!)

Mono<T>

single()

Expect and emit a single item from this 🟠 Flux<T> source or signal 🟢 NoSuchElementException for an empty source, or 🟢 IndexOutOfBoundsException for a source with more than one element.

personFlux.filter(person -> person.getId() == id).single().doOnError(throwable -> {
    System.err.println("Error occurred in the Mono<Person> during single");
    System.err.println(throwable.toString());
}).subscribe(System.out::println, throwable -> {
    System.err.println("Error occurred in the Mono<Person> during subscribe");
    System.err.println(throwable.toString());
});

can output:

Error occurred in the Mono<Person> during filter
java.util.NoSuchElementException: Source was empty
Error occurred in the Mono<Person> during subscribe
java.util.NoSuchElementException: Source was empty

Testing

Reactor%20Test%20Docs 2088E9?logo=quickLook&logoColor=white Reactor%20Test%20API%20Docs DF7716?logo=devbox&logoColor=white

Instead of:

@Test
void testGetByIdNotFound() {
    Mono<Person> personMono = personRepository.getById(8);
    assertFalse(personMono.hasElement().block());
}

You can use

⚪ StepVerifier

Provides a declarative way of creating a verifiable script for an async ⚪ Publisher<T> sequence, by expressing expectations about the events that will happen upon subscription. The verification must be triggered after the terminal expectations (completion, error, cancellation) have been declared, by calling one of the verify() methods.

@Test
void testGetByIdNotFoundStepVerifier() {
    Mono<Person> personMono = personRepository.getById(8);
    StepVerifier.create(personMono).expectNextCount(0).verifyComplete();
    personMono.map(Person::getFirstName).subscribe(System.out::println);
}