In this tutorial, we will discuss about Reactive programming and how to achieve Reactive programming in Java.
Reactive Programming Overview
Reactive Streams Specification in Java
Publisher
Subscriber
Subscription
Processor
- Project Reactor
- RxJava
- JDK 9 Flow Reactive Stream
Project Reactor
Mono and Flux Implementations
- Mono
- Flux
The Mono API allows producing only one value.
Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.
Mono vs Flux
Reactive Stream Workflow
Let's understand the above Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.
2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.
4. Next, Publisher call onNext(data) method to send data to the Subscriber. Publisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.
5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
Mono and Flux Examples
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.1</version>
</dependency>
Mono Example
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MonoFluxDemo {
@Test
public void testMono(){
Mono<?> stringMono = Mono.just("java guides")
.log();
stringMono.subscribe((element) -> {
System.out.println(element);
});
}
}
Output:
14:36:23.902 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:36:23.909 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
14:36:23.910 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onNext(java guides)
java guides
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onComplete()
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
public class MonoFluxDemo {
@Test
public void testMono(){
Mono<?> stringMono = Mono.just("java guides")
.then(Mono.error(new RuntimeException("Exception occured while emitting the data")))
.log();
stringMono.subscribe((element) -> {
System.out.println(element);
}, throwable -> System.out.println(throwable.getMessage()));
}
}
Output:
14:41:36.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:41:36.795 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
14:41:36.796 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 - onError(java.lang.RuntimeException: Exception occured while emitting the data)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 -
java.lang.RuntimeException: Exception occured while emitting the data
....
Exception occured while emitting the data
Flux Example
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
public class MonoFluxDemo {
@Test
public void testFlux(){
Flux<String> stringFlux = Flux.just("Apple", "Banana", "Orange", "Mango")
.log();
stringFlux.subscribe((element) -> {
System.out.println(element);
});
}
}
Output:
14:44:22.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:44:22.218 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Apple)
Apple
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Banana)
Banana
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Orange)
Orange
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Mango)
Mango
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onComplete()
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
public class MonoFluxDemo {
@Test
public void testFlux(){
Flux<String> stringFlux = Flux.just("orange", "banana", "mango", "apple")
.concatWithValues("watermelon")
.concatWith(Flux.error(new RuntimeException("Exception while emitting data")))
.log();
stringFlux.subscribe((element) -> {
System.out.println(element);
}, throwable -> System.out.println(throwable.getMessage()));
}
}
Output:
14:45:18.406 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:45:18.411 [main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(orange)
orange
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(banana)
banana
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(mango)
mango
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(apple)
apple
14:45:18.413 [main] INFO reactor.Flux.ConcatArray.1 - onNext(watermelon)
watermelon
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.RuntimeException: Exception while emitting data)
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 -
java.lang.RuntimeException: Exception while emitting data
at net.javaguides.springbootwebfluxdemo.MonoFluxDemo.testFlux(MonoFluxDemo.java:10)
.....
Exception while emitting data
Related Java Reactive Programming Examples
Spring Boot WebFlux MongoDB CRUD REST API Tutorial
In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.
Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient
Spring WebFlux Functional Endpoints CRUD REST API Example
Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito
In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.
Comments
Post a Comment
Leave Comment