Introduction to Reactive Programming
Introduction to Reactive Programming
This introduction to Reactive programming discusses the observable and oberserver model, as well as the operators and an example.
Apr. 08, 20 · Java Zone ·
Comment (0)
Join the DZone community and get the full member experience.
Reactive programming is about dealing with data streams and the propagation of change.
Reactive systems are applications whose architectural approach make them responsive, resilient, elastic and message-driven.
- Responsive: Systems should respond in a timely manner.
- Message Driven: Systems should use asynchronous message communication between components to ensure loose coupling.
- Elastic: Systems should stay responsive under high load.
- Resilient: Systems should stay responsive when some components fail.
According to the “Reactive Manifesto”:
“Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback.”
There are Reactive libraries available for many programming languages that enable this programming paradigm.
Such libraries from the “ReactiveX” family are:
“..used for composing asynchronous and event-based programs by using observable sequences. It extends the observer patternto support sequences of data or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.”
(definition by ReativeX)
So, it’s possible to avoid the “callback hell” problem and abstract other issues concerning threads and low-level asynchronous computations. That makes our code more readable and focused in business logic.
The Observable x Observer Model
Simply put, an observable is any object that emits (stream of) events, that the observer reacts to. The Observer Object subscribes to an Observable to listen whatever items the observable emits, so it gets notified when the observable state changes. The observer is also called subscriber or reactor, depending on the library used.
The Observer stands ready to react appropriately when the Observable emits items in any point in time. This pattern facilitates concurrent operations because it doesn’t need to block while waiting for the Observable to emit items.
The Observer contract expects the implementation of some subset of the following methods:
-
OnNext
: Whenever the Observable emits an event, this method is called on our Observer, which takes as parameter the object emitted so we can perform some action on it. -
OnCompleted
: This method is called after the last call of the onNext method, indicating that the sequence of events associated with an Observable is complete and it has not encountered any errors. -
OnError
: This method is called when it has encountered some error to generate the expected data, like an unhandled exception.
Operators
Operator is a function that, for every element the source Observable emits, it applies that function to that item, and then emit the resulting element in another Observable.
So, operators operate on an Observable and return another Observable. This way, operators can be combined one after other in a chain to create data flows operations on the events.
The behavior of each operator is usually illustrated in marble diagrams like this (Rx Marbles):
Reactive operators have many similarities to those of functional programming, bringing better (and faster) understanding of them.
Some of the most used core operators in ReactiveX libraries are:
-
map
: transforms items emitted by an Observable by applying a function to each them. flatMap
: transforms the objects emitted by an Observable into Observables (“nested Observables”), then flatten the emissions from those into a single Observable.filter
: emits only items from an Observable that pass a predicate test.just
: converts objects into an Observable that emits those objects.takeWhile
: discards items emitted by an Observable after a specified condition becomes false.distinct
: suppresses duplicate objects emitted by an Observable.
There is also an important concept of backpressure, which provides solutions when an Observable
is emitting items more quickly than a Observer
can consume them.
“Show Me the Code!"
After some background theory, let’s get to the fun part!
Below let’s go through a hands-on approach, to provide an understanding by seeing the magic in motion!
The examples use the RxJava (version 1.3.8) library:
<!-- maven dependency -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.3.8</version>
</dependency>
Here it is a simple inline “Hello World” code using an observable and immediate subscription:
// simplest way (in-line)
public static void helloWorld1() {
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
}
It’s possible to do implicit or more explicit calls to observer functions/methods:
<dependency>
0
<dependency>
1
<dependency>
2
<dependency>
3
<dependency>
4
<dependency>
5
<dependency>
6
<dependency>
7
<dependency>
8
<dependency>
9
<groupId>io.reactivex</groupId>
0
<groupId>io.reactivex</groupId>
1
<groupId>io.reactivex</groupId>
2
<groupId>io.reactivex</groupId>
3
<groupId>io.reactivex</groupId>
4
Segregating Observable and Observer objects:
<groupId>io.reactivex</groupId>
5
<groupId>io.reactivex</groupId>
6
<groupId>io.reactivex</groupId>
7
<groupId>io.reactivex</groupId>
8
<groupId>io.reactivex</groupId>
9
<artifactId>rxjava</artifactId>
0
<artifactId>rxjava</artifactId>
1
<artifactId>rxjava</artifactId>
2
<artifactId>rxjava</artifactId>
3
<artifactId>rxjava</artifactId>
4
<artifactId>rxjava</artifactId>
5
<artifactId>rxjava</artifactId>
6
<artifactId>rxjava</artifactId>
7
<artifactId>rxjava</artifactId>
8
<artifactId>rxjava</artifactId>
9
<version>1.3.8</version>
0
<version>1.3.8</version>
1
The code above prints:
<version>1.3.8</version>
2
<version>1.3.8</version>
3
<version>1.3.8</version>
4
Since it is emitted just one item, it can be a single object:
<version>1.3.8</version>
5
<version>1.3.8</version>
6
<version>1.3.8</version>
7
<version>1.3.8</version>
8
<version>1.3.8</version>
9
</dependency>
0
Operators examples:
</dependency>
1
</dependency>
2
</dependency>
3
</dependency>
4
</dependency>
5
</dependency>
6
</dependency>
7
</dependency>
8
</dependency>
9
// simplest way (in-line)
0
// simplest way (in-line)
1
// simplest way (in-line)
2
// simplest way (in-line)
3
The output of the code above is:
// simplest way (in-line)
4
// simplest way (in-line)
5
// simplest way (in-line)
6
// simplest way (in-line)
7
// simplest way (in-line)
8
// simplest way (in-line)
9
public static void helloWorld1() {
8
public static void helloWorld1() {
9
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
0
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
1
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
2
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
3
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
4
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
5
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
6
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
7
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
8
Observable.just("Hello Reactive World 1!").subscribe(System.out::println);
9
}
0
}
1
}
2
}
3
}
4
}
5
}
6
}
7
Of course, we can set <nerd mode on>
and implement a Star Wars battle using Reactive Programming (source code here):
}
8
}
9
<dependency>
00
<dependency>
01
<dependency>
02
<dependency>
03
<dependency>
04
<dependency>
05
<dependency>
06
<dependency>
07
<dependency>
08
<dependency>
09
<dependency>
10
<dependency>
11
<dependency>
12
<dependency>
13
<dependency>
14
<dependency>
15
<dependency>
16
<dependency>
17
<dependency>
18
<dependency>
19
<dependency>
20
<dependency>
21
<dependency>
22
<dependency>
23
<dependency>
24
<dependency>
25
<dependency>
26
<dependency>
27
<dependency>
28
<dependency>
29
<dependency>
30
<dependency>
31
<dependency>
32
<dependency>
33
<dependency>
34
<dependency>
35
<dependency>
36
<dependency>
37
<dependency>
38
<dependency>
39
<dependency>
40
<dependency>
41
<dependency>
42
<dependency>
43
The output of the code above may be (troopers ID numbers are random):
<dependency>
44
<dependency>
45
<dependency>
46
<dependency>
47
<dependency>
48
<dependency>
49
<dependency>
50
<dependency>
51
<dependency>
52
Resources and Further Readings
Like This Article? Read More From DZone
Comment (0)
Published at DZone with permission of Tiago Albuquerque . See the original article here.
Opinions expressed by DZone contributors are their own.
Java Partner Resources
- {{ node.blurb }}
{{ editionName }}
{{ parent.title || parent.header.title}}
{{ parent.tldr }}
{{ parent.linkDescription }}
{{ message }}
{{ $dialog.title }}