In this tutorial, we will discuss about Reactive programming and how to achieve Reactive programming in Java.
Reactive Programming Overview
Reactive programming is a programming paradigm where the focus is on developing asynchronous and non-blocking applications in an event-driven form. It is a declarative programming model that enables the processing of data streams in a more efficient, scalable, and resilient way.
Reactive programming deals with the continuous flow of data, rather than individual events or requests, and enables the handling of complex asynchronous operations in a more streamlined way. In Reactive Programming, data streams are modeled as streams of events that are pushed asynchronously to data consumers. These events trigger reactions in the data consumers, which perform some action in response.
Reactive programming is built on the idea of composing streams of events and applying functional transformations to these streams. Reactive Programming aims to provide a more efficient and scalable way of processing data streams. It enables the development of highly responsive and fault-tolerant applications that can handle large volumes of data in real time. It also enables the development of more modular and composable applications by separating concerns and enabling the composition of small, reusable components.
Reactive Programming is gaining popularity in the development of modern applications, such as web and mobile applications, IoT applications, and real-time data processing systems. Some popular frameworks for implementing Reactive Programming in Java include Reactor, RxJava, and Akka.
Reactive Streams Specification in Java
Reactive Streams Specification is a set of rules or set of guidelines that you need to follow when designing a reactive stream.
These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.
Publisher
The Publisher is a data source that will always publish events.
Subscriber
The Subscriber will subscribe/consume the events from the Publisher.
Subscription
The Subscription represents the unique relationship between a Subscriber and a Publisher interface.
Processor
It represents a processing stage – which is both a Subscriber and a Publisher and MUST obey the contracts of both.
- Project Reactor
- RxJava
- JDK 9 Flow Reactive Stream
In this tutorial, we are going to use Project Reactor reactive library.
Project Reactor
The Project Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.
Mono and Flux Implementations
Project reactor libraries provide two implementations of the Publisher interface:
- Mono
- Flux
The Mono API allows producing only one value.
Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.
Mono vs Flux
Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.
When we're expecting multiple results from our computation, database, or external service call, then we should use Flux.
Reactive Stream Workflow
Let's understand the above Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.
2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.
3. Next, the Subscriber will call the request(n) method of Subscription to request data from the Publisher.
4. Next, Publisher call onNext(data) method to send data to the Subscriber. Publisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.
5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
Mono and Flux Examples
Let's create an example of Mono and Flux to understand Reactive Stream workflow in an action.
To use Mono and Flux, make sure that you add Project Reactor Core dependency:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.1</version>
</dependency>
Mono Example
Let's create an example to demonstrate the usage of Mono implementation of the Publisher interface:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MonoFluxDemo {
@Test
public void testMono(){
Mono<?> stringMono = Mono.just("java guides")
.log();
stringMono.subscribe((element) -> {
System.out.println(element);
});
}
}
Output:
14:36:23.902 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:36:23.909 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
14:36:23.910 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onNext(java guides)
java guides
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onComplete()
Note that the Mono publisher called onComplete() method to notify Subscriber that all the data has been sent successfully.
Next, let me demonstrate the onError() method. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
public class MonoFluxDemo {
@Test
public void testMono(){
Mono<?> stringMono = Mono.just("java guides")
.then(Mono.error(new RuntimeException("Exception occured while emitting the data")))
.log();
stringMono.subscribe((element) -> {
System.out.println(element);
}, throwable -> System.out.println(throwable.getMessage()));
}
}
Output:
14:41:36.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:41:36.795 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
14:41:36.796 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 - onError(java.lang.RuntimeException: Exception occured while emitting the data)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 -
java.lang.RuntimeException: Exception occured while emitting the data
....
Exception occured while emitting the data
Flux Example
Let's create an example to demonstrate the usage of Flux implementation of the Publisher interface:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
public class MonoFluxDemo {
@Test
public void testFlux(){
Flux<String> stringFlux = Flux.just("Apple", "Banana", "Orange", "Mango")
.log();
stringFlux.subscribe((element) -> {
System.out.println(element);
});
}
}
Output:
14:44:22.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:44:22.218 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Apple)
Apple
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Banana)
Banana
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Orange)
Orange
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Mango)
Mango
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onComplete()
Next, let me demonstrate get usage of the onError() method:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
public class MonoFluxDemo {
@Test
public void testFlux(){
Flux<String> stringFlux = Flux.just("orange", "banana", "mango", "apple")
.concatWithValues("watermelon")
.concatWith(Flux.error(new RuntimeException("Exception while emitting data")))
.log();
stringFlux.subscribe((element) -> {
System.out.println(element);
}, throwable -> System.out.println(throwable.getMessage()));
}
}
Output:
14:45:18.406 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:45:18.411 [main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(orange)
orange
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(banana)
banana
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(mango)
mango
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(apple)
apple
14:45:18.413 [main] INFO reactor.Flux.ConcatArray.1 - onNext(watermelon)
watermelon
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.RuntimeException: Exception while emitting data)
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 -
java.lang.RuntimeException: Exception while emitting data
at net.javaguides.springbootwebfluxdemo.MonoFluxDemo.testFlux(MonoFluxDemo.java:10)
.....
Exception while emitting data
Related Java Reactive Programming Examples
Spring Boot WebFlux MongoDB CRUD REST API Tutorial
In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.
Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient
In this tutorial, we will learn how to write Integration tests to test Spring WebFlux reactive CRUD REST APIs using WebTestClient.
Spring WebFlux Functional Endpoints CRUD REST API Example
In this tutorial, we will new functional-style programming model to build reactive CRUD REST APIs using Spring Boot 3, Spring WebFlux, MongoDB, and IntelliJ IDEA.
Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito
In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.
Comments
Post a Comment
Leave Comment