How to Create a Reactive Stream in Java

Introduction

In Java 9, the introduction of the Reactive Streams API allows you to handle asynchronous data processing in a non-blocking manner. Reactive streams provide a way to process data in a backpressure-aware manner, which helps prevent overwhelming consumers with data they can't handle. 

The Flow API introduced in Java 9 is the foundation for building reactive streams. It includes the key components: Publisher, Subscriber, Subscription, and Processor.

This guide will walk you through creating a simple reactive stream using the Java 9 Flow API.

Program Steps

  1. Define the Publisher: A Publisher generates the data and sends it to a subscriber.
  2. Create the Subscriber: A Subscriber processes the data received from the publisher.
  3. Manage Backpressure: Backpressure is handled using Subscription, which allows the subscriber to request data as it can handle.
  4. Link the Publisher and Subscriber: Connect the subscriber to the publisher and manage the data flow.

Java Program

Example: Creating a Reactive Stream Using Java 9 Flow API

import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.Flow;

public class ReactiveStreamExample {
    public static void main(String[] args) throws InterruptedException {
        // Step 1: Create a publisher
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        // Step 2: Create a subscriber
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("Subscribed");
                subscription.request(1);  // Request one item at a time
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
                subscription.request(1);  // Request the next item
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Error occurred: " + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Processing complete");
            }
        };

        // Step 3: Subscribe the subscriber to the publisher
        publisher.subscribe(subscriber);

        // Step 4: Publish items
        String[] items = {"Java", "Reactive", "Streams", "In", "Java 9"};
        for (String item : items) {
            System.out.println("Publishing: " + item);
            publisher.submit(item);  // Publish data to subscribers
        }

        // Step 5: Close the publisher
        publisher.close();

        // Ensure the program waits for all items to be processed
        Thread.sleep(1000);
    }
}

Output

Subscribed
Publishing: Java
Received: Java
Publishing: Reactive
Received: Reactive
Publishing: Streams
Received: Streams
Publishing: In
Received: In
Publishing: Java 9
Received: Java 9
Processing complete

Explanation

Step 1: Create a Publisher

The SubmissionPublisher is a simple publisher that handles publishing items to subscribers:

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

Step 2: Create a Subscriber

A custom subscriber is created by implementing the Flow.Subscriber interface. The subscriber overrides four methods:

  • onSubscribe(): Called when the subscriber is first subscribed to the publisher.
  • onNext(): Called when a new item is received.
  • onError(): Called when an error occurs during processing.
  • onComplete(): Called when all items are processed.
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);  // Request one item at a time
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1);  // Request the next item
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete");
    }
};

Step 3: Subscribe the Subscriber to the Publisher

The subscribe() method is used to connect the subscriber to the publisher:

publisher.subscribe(subscriber);

Step 4: Publish Items

The submit() method of the SubmissionPublisher is used to publish items to the subscribers:

publisher.submit(item);

Step 5: Close the Publisher

After publishing all items, the close() method is called to signal the completion of the data stream:

publisher.close();

Conclusion

In Java 9, creating a reactive stream using the Flow API is a powerful way to handle asynchronous, non-blocking data processing. The key components—Publisher, Subscriber, Subscription, and Processor—allow you to manage the flow of data and handle backpressure effectively. By using the Flow API, you can easily build reactive streams for efficient data handling in modern applications.

Comments