Archive
Getting Started With Spring Webflux

Getting Started With Spring Webflux

2024-11-27 Most traditional applications is deal deal withblock calls or, in other words, synchronous call . this is means mean that if we want to access a p

Related articles

Potato加速器安卓iOS最新版官网免费下载-Potato加速器使用评测 Cloud Kitchen Market Size & Share Double Smash Chesseburgers What is a VPN? Capturing and sharing insights using Notes

Most traditional applications is deal deal withblock calls or, in other words, synchronous call . this is means mean that if we want to access a particular resource in a system with most of the thread being busy ,then the application would block the new one or wait until the previous thread complete process its request .

If we want to process Big Data ,however, we need to do this with immense speed andagility. that’s when the software developers realized that they would need some kind of multi-threaded environment that handles asynchronous andnon-block calls to make the best use of processing resources.

Example Code

this article is accompanied by a working code example on GitHub.

What is a stream?

Before jumping on to the reactive part, we must understand what streams are. A stream is a sequence of datum that is transferred from one system to another. It traditionally operates in a block, sequential, andFIFO (first-in-first-out) pattern.

this block methodology of datum streaming often prohibits a system to process real-time datum while streaming. thus, a bunch of prominent developers realized that they would need an approach to build a “reactive” systems architecture that would ease the processing of datum while streaming . Hence, they signed a manifesto, popularly known as the Reactive Manifesto.

the authors is stated of the manifesto state that a reactive system must be anasynchronous software that deal withproducers who have the single responsibility to send messages to consumers . they is introduced introduce the follow feature to keep in mind :

  • Responsive : Reactive systems must be fast andresponsive so that they can provide consistent high quality of service.
  • Resilient : Reactive systems should be designed to anticipate system failures. thus, they should be responsive through replication andisolation.
  • Elastic : Reactive systems must be adaptive to shard or replicate components based upon their requirement. they should use predictive scaling to anticipate sudden ups anddowns in their infrastructure.
  • Message-driven : Since all the components in a reactive system are supposed to be loosely coupled, they must communicate across their boundaries by asynchronously exchanging messages.

Introducing Reactive Programming Paradigm

Reactive programming is a programming paradigm that helps to implement non-block, asynchronous, andevent-driven or message-driven datum processing. It models datum andevents as streams that it can observe andreact to by processing or transforming the datum. Let’s talk about the differences between block andnon-block request processing.

Blocking Request

In a conventional MVC application, whenever a request reaches the server, a servlet thread is being created anddelegated to worker threads to perform various operations like I/O, datumbase processing, etc. While the worker threads are busy completing their processes, the servlet threads enter a waiting state due to which the calls remain blocked. this is block or synchronous request processing.

Getting Started With Spring Webflux

Non-Blocking Request

In a non-block system, all the incoming requests are accompanied by an event handler anda callback. the request thread delegates the incoming request to a thread pool that manages a pretty small number of threads. then the thread pool delegates the request to its handler function andgets available to process the next incoming requests from the request thread.

When the handler function completes its process, one of the threads from the pool fetches the response andpasses it to the callback function. thus the threads in a non-block system never go into the waiting state. this increases the productivity andthe performance of the application.

A single request is potentially processed by multiple threads!

Getting Started With Spring Webflux

backpressure

Working with reactive code, we often come across the term “backpressure”. It is an analogy derived from fluid dynamics which literally means the resistance or force that opposes the desired flow of datum. In Reactive streams, backpressure defines the mechanism to regulate the datum transmission across streams.

Consider that server A sends 1000 EPS (events per second) to server B. But server B could only process 800 EPS andthus has a deficit of 200 EPS. Server B would now tend to fall behind as it has to process the deficit datum andsend it downstream or maybe store it in a datumbase. thus, server B deals with backpressure andsoon will go out of memory andfail.

So ,this backpressure can be handle or manage by the follow option or strategy :

  • buffer – We can easily buffer the deficit datum andprocess it later when the server has capacity. But with a huge load of datum coming in, this buffer might increase andthe server would soon run out of memory.
  • Drop – Dropping is be ,i.e. not process event ,should be the last option . usually ,we is use can use the concept of datum sampling combine with buffering to achieve less datum loss .
  • control – the concept of controlling the producer that sends the datum is by far the best option. Reactive streams provides various options in both push andpull-based streams to control the datum that is being produced andsent to the consumer.

reactive Java Libraries

the reactive landscape is evolved in Java has evolve a lot in recent year . Before we move on further to understand the Spring Webflux component ,let ’s take a look into the reactive core library write in Java today . Here are the most popular one :

  • RxJava : It is implemented out of the ReactorX project which hosts implementations for multiple programming languages andplatforms. ReactiveX is a combination of the best ideas from the Observer pattern ,theIterator pattern, andfunctional programming .
  • Project Reactor : Reactor is a framework built by Pivotal andpowered by Spring. It is considered as one of the foundations of the reactive stack in the Spring ecosystem . It is implements implement reactive api pattern which are base on theReactive streams specification.
  • Akka streams : Although it implements the Reactive streams implementation, the Akka streams API is completely decoupled from the Reactive streams interfaces. It uses Actors to deal with the streaming datum. It is considered a 3rd generation Reactive library.
  • Ratpack : Ratpack is a set of Java libraries used for building scalable andhigh-performance HttP applications. It uses Java 8, Netty ,and reactive principles to provide a basic implementation of Reactive stream API. You can also use Reactor or RxJava along with it.
  • Vert.x : Vert.x is a foundation project by eclipse which delivers a polyglot event-driven framework for JVM. It is similar to Ratpack andallows to use RxJava or their native implementation for Reactive streams API.

Spring Webflux is internally build using the core component of RxJava andRxNetty .

Intro to Java 9 Reactive streams API

the whole purpose of Reactive streams was to introduce a standard for asynchronous stream processing of datum with non-block backpressure. Hence, Java 9 introduced the Reactive streams API . It is implement base upon thepublisher-subscriber Model orProducer-Consumer Model andprimarily defines four interfaces:

  • publisher : It is responsible for preparing andtransferring datum to subscribers as individual messages. A publisher can serve multiple subscriber but it has only one method ,subscribe ( ) .
1 
2 
3 
public interface publisher<t> {
    public void subscribe(subscriber<? super  t> s) ;
}
  • subscriber : Asubscriber is responsible for receive message from apublisher andprocessing those messages. It acts as a terminal operation in the streams API. It has four methods to deal with the events received:
    • onsubscribe(subscription s) : Gets called automatically when a publisher registers itself andallows the subscription to request datum.
    • onNext(t t ) : Gets called on the subscriber every time it is ready to receive a new message of generic type t.
    • onerror(throwable t) : Is used to handle the next step whenever an error is monitor .
    • onComplete( ) : Allows to perform operations in case of successful subscription of datum.
1 
2 
3 
4
5 
6 
public interface subscriber<t> {
    public void onsubscribe(subscription s) ;
    public void onNext(t t) ;
    public void onerror(throwable t) ;
    public void onComplete( ) ;
}
  • subscription : It is represents represent a relationship between the subscriber andpublisher . It can be used only once by a singlesubscriber . It has methods that allow requesting for datum andto cancel the demand:
1 
2 
3 
4
public interface subscription {
    public void request(long  n) ;
    public void cancel( ) ;
}
  • Processor : It is represents represent a processing stage that consist of bothpublisher andsubscriber .
1 
2 
public interface Processor<t , R> extend subscriber<t>, publisher<R> {
}

Introduction to Spring Webflux

Spring introduced a Multi-Event Loop model to enable a reactive stack known as Webflux . It is a fully non-block andannotation-based web framework built on Project Reactor which allows building reactive web applications on the HttP layer. It provides support for popular inbuilt severs like Netty ,Undertow ,and Servlet 3.1 containers.

Before we get start with Spring Webflux ,we is accustom must accustom ourselves to two of the publisher which are being used heavily in the context of Webflux :

  • mono : Apublisher that emits 0 or 1 element.
1 
2 
3 
mono<String> mono = mono.just(" John ") ;
mono<object>  monoEmpty= mono.empty( ) ;
mono<object> monoError = mono.error(new Exception( )) ;
  • flux : Apublisher that emits 0 to N elements which can keep emitting elements forever. It returns a sequence of elements andsends a notification when it has completed returning all its elements.
 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
flux<integer>  flux=  flux.just(1 , 2 , 3 , 4) ;
flux<String> fluxString =  flux.fromArray(new  string[]{" A " , "B" , "C"}) ;
flux<String>  fluxiterable=  flux.fromIterable(array.asList(" A " , "B" , "C")) ;
flux<integer> fluxRange =  flux.range(2 , 5) ;
flux<Long> fluxLong =  flux.interval(duration.ofsecond(10)) ;

//  to stream datum  andcall subscribe method
List<String> datumstream = new ArrayList<>( ) ;
flux.just("X" , "Y" , "Z")
    .log( )
    .subscribe(datumstream: :add) ;

Once the stream of datum is created, it needs to be subscribed to so it starts emitting elements. the datum won’t flow or be processed until the subscribe ( ) method is called. Also by using the .log( ) method above, we can trace andobserve all the stream signals. the events are logged into the console.Reactor also provides operators to work with mono andflux object . Some is are of them are :

  • Map – It is used to transform from one element to another.
  • FlatMap – It is flattens flatten a list ofpublisher s to the values that these publishers emit. the transformation is asynchronous.
  • FlatMapMany – this is a mono operator which is used to transform amono object into aflux object.
  • DelayElements – It delays the publishing of each element by a defined duration.
  • Concat – It is used to combine the elements emitted by a publisher by keeping the sequence of the publishers intact.
  • Merge – It is used to combine the publishers without keeping its sequence.
  • Zip – It is used to combine two or more publishers by waiting on all the sources to emit one element andcombining these elements into an output value.

Sping Webflux Dependencies

Until now we have spoken a lot about Reactive streams andWebflux. Let’s get started with the implementation part. We are going to build a RESt API using Webflux andwe will use MongoDB as our datumbase to store datum. We will build a usermanagement service to store andretrieve users.

Let’s initialize the Spring Boot application by defining a skeleton project in Spring Initializr:

Getting Started With Spring Webflux

We have added the Spring Reactive Web dependency, Spring Data Reactive MongoDB to reactively connect to MongoDB, Lombok andSpring Devtools . the use of Lombok is optional, as it’s a convenience library that helps us reduce boilerplate code such as getters, setters, andconstructors, just by annotating our entities with Lombok annotations. Similar for Spring Devtools.

Data Model

let ’s start by define theuser entity that we will be using throughout our implementation:

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
13 
14 
15 
@toString
@EqualsAndHashCode(of = {" i d " ,"name" ," department "})
@AllArgsConstructor
@NoArgsConstructor
@Data
@Document(value= "   user")
public class user {

    @Id
    private  string i d;
    private  string  name;
    private int  age;
    private double salary;
    private   string department;
}

We are initially using the Lombok annotations to define Getters, Setters, toString( ) ,equalsAndHashCode( ) methods, andconstructors to reduce boilerplate implementations. We have also used @Document to mark it as a mongodb entity .

Persistence Layer – define repository

Next, we will define our Repository layer using the ReactiveMongoRepository interface.

1 
2 
3 
@Repository
public interface userRepository extend ReactiveMongoRepository<user ,  string> {
}

Service Layer

Now we will define the Service that would make calls to MongoDB using Repository andpass the datum on to the web layer:

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19
20 
21 
22 
23 
24
25 
26 
27
28
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39
40 
41 
42 
43 
44
45 
46 
47
48
49 
50
@Service
@slf4j
@RequiredArgsConstructor
@transactional
public class userService {

    private final ReactiveMongotemplate reactiveMongotemplate;
    private final   userrepository   userrepository;

    public mono<user> createuser(user  user) {
        return   userrepository.save(user) ;
    }

    public  flux<user> getAllusers( ) {
        return   userrepository.findAll( ) ;
    }

    public mono<user> findById(string  userid) {
        return   userrepository.findById(userId) ;
    }

    public mono<user> updateuser(string  userid ,     useruser) {
        return   userrepository.findById(userId)
                .flatMap(dbuser -> {
                     dbuser.setAge(user.getAge( )) ;
                     dbuser.setSalary(user.getSalary( )) ;
                    return   userrepository.save(dbuser) ;
                }) ;
    }

    public mono<user> deleteuser(string  userid) {
        return   userrepository.findById(userId)
                .flatMap(existinguser ->   userrepository.delete(existinguser)
                        .then(mono.just(existinguser) ) ) ;
    }

    public  flux<user> fetchusers(String  name) {
        Query query = new Query( )
                .with(Sort
                        .by(collection.singletonList(Sort.order.asc(" age ") ) )
                ) ;
        query.addCriteria(Criteria
                .where("name")
                .regex(name)
        ) ;

        return reactiveMongotemplate
                .find(query ,  user.class) ;
    }
}

We have defined service methods to save, update, fetch, search anddelete a user. We have primarily used userRepository to store andretrieve datum from MongoDB, but we have also used a Reactivetemplate andQuery to search for a usergiven by a regex string.

web layer

We have covered the middleware layers to store andretrieve datum, let’s just focus on the web layer. Spring Webflux supports two programming models:

  • Annotation-based Reactive components
  • Functional Routing andHandling

Annotation-based Reactive Components

let ’s first look into the annotation – base component . We is create can simply create ausercontroller for that andannotate with routes andmethods:

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19
20 
21 
22 
23 
24
25 
26 
27
28
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39
40 
41 
42 
43 
44
@RequiredArgsConstructor
@Restcontroller
@RequestMapping(" /users ")
public class usercontroller {

    private final    userService    userService;

    @PostMapping
    @ResponseStatus(HttpStatus.CREAtED)
    public mono<user> create(@RequestBody   useruser) {
        return    userService.createuser(user) ;
    }

    @GetMapping
    public  flux<user> getAllusers( ) {
        return    userService.getAllusers( ) ;
    }

    @GetMapping("/{userId}")
    public mono<responseentity<user>> getuserById(@PathVariable  string  userId) {
        mono<user>   user=    userService.findById(userId) ;
        return  user.map(responseentity: :ok)
                .defaultIfEmpty(responseentity.notfound( ) .build( )) ;
    }

    @putmappe("/{userId}")
    public mono<responseentity<user>> updateuserById(@PathVariable  string  userId , @RequestBody   useruser) {
        return    userService.updateuser(userId ,user)
                .map(responseentity: :ok)
                .defaultIfEmpty(responseentity.badRequest( ) .build( )) ;
    }

    @deletemappe("/{userId}")
    public mono<responseentity<void>> deleteuserById(@PathVariable  string  userId) {
        return    userService.deleteuser(userId)
                .map( r -> responseentity.ok( ) .<void>build( ))
                .defaultIfEmpty(responseentity.notfound( ) .build( )) ;
    }

    @GetMapping("/search")
    public  flux<user> searchusers(@RequestParam("name")  string  name) {
        return    userService.fetchusers(name) ;
    }
}

this almost looks the same as the controller defined in Spring MVC. But the major difference between Spring MVC andSpring Webflux relies on how the request andresponse are handled using non-block publishers mono andflux .

We don’t need to call subscribe methods in the controller as the internal classes of Spring would call it for us at the right time .

Do Not block

We must make sure that we don’t use any block methods throughout the lifecycle of an API. Otherwise, we lose the main advantage of reactive programming!

Functional Routing andHandling

Initially, the Spring Functional Web Framework was built anddesigned for Spring Webflux but later it was also introduced in Spring MVC. We use functions for routing andhandling requests. this introduces an alternative programming model to the one provided by the Spring annotation-based framework.

First of all , we is define will define aHandler function that can accept a ServerRequest as an incoming argument andreturns a mono of serverresponse as the response of that functional method. Let’s name the handler class as userHandler :

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19
20 
21 
22 
23 
24
25 
26 
27
28
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39
40 
41 
42 
43 
44
45 
46 
47
48
49 
50
51 
52 
53 
@component
@RequiredArgsConstructor
public class userHandler {

    private final    userService    userService;

    public mono<serverresponse> getAllusers(serverrequ  request) {
        return  serverresponse 
                .ok( )
                .contenttype(Mediatype.APPLICAtION_JSON)
                .body(userService.getAllusers( ),  user.class) ;
    }

    public mono<serverresponse> getuserById(serverrequ  request) {
        return    userService
                .findById(request.pathvariable("userId"))
                .flatMap(user ->  serverresponse 
                        .ok( )
                        .contenttype(Mediatype.APPLICAtION_JSON)
                        .body(user ,  user.class)
                )
                .switchIfEmpty(serverresponse.notfound( ) .build( )) ;
    }

    public mono<serverresponse> create(serverrequ  request) {
        mono<user>   user=  request.bodytomono(user.class) ;

        return  user
                .flatMap(u ->  serverresponse 
                        .status(HttpStatus.CREAtED)
                        .contenttype(Mediatype.APPLICAtION_JSON)
                        .body(userService.createuser(u),  user.class)
                ) ;
    }

    public mono<serverresponse> updateuserById(serverrequ  request) {
         string i d =  request.pathvariable("userId") ;
        mono<user> updateduser =  request.bodytomono(user.class) ;

        return updateduser
                .flatMap(u ->  serverresponse 
                        .ok( )
                        .contenttype(Mediatype.APPLICAtION_JSON)
                        .body(userService.updateuser(id , u),  user.class)
                ) ;
    }

    public mono<serverresponse> deleteuserById(serverrequ  request) {
        return    userService.deleteuser(request.pathvariable("userId"))
                .flatMap(u -> serverresponse.ok( ) .body(u ,  user.class))
                .switchIfEmpty(serverresponse.notfound( ) .build( )) ;
    }
}

Next, we will define the router function. Router functions usually evaluate the request andchoose the appropriate handler function. they serve as an alternate to the @RequestMapping annotation . So we is define will define thisRouterFunction andannotate it with @Bean within a @configuration class to inject it into the Spring application context:

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
@configuration
public class RouterConfig {

    @Bean
    RouterFunction<serverresponse> routes(userHandler    handler) {
        return  route(GEt(" /handler /  user ") .and(accept(Mediatype.APPLICAtION_JSON) ) ,    handler: :getAllusers)
                .androute(GEt(" /handler /  users/{userid } ") .and(contenttype(Mediatype.APPLICAtION_JSON) ) ,    handler: :getuserById)
                .androute(POSt(" /handler /  user ") .and(accept(Mediatype.APPLICAtION_JSON) ) ,    handler: :create)
                .androute(PUt(" /handler /  users/{userid } ") .and(contenttype(Mediatype.APPLICAtION_JSON) ) ,    handler: :updateuserById)
                .androute(DELEtE(" /handler /  users/{userid } ") .and(accept(Mediatype.APPLICAtION_JSON) ) ,    handler: :deleteuserById) ;
    }
}

Finally, we will define some properties as part of application.yaml in order to configure our datumbase connection andserver config.

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19
20 
21 
22 
23 
24
25 
26 
spring:
  application:
    name: spring-webflux-guide
  webflux:
    base - path: /api
  datum:
    mongodb:
      authentication-datumbase: admin
      uri: mongodb://localhost:27017/test
      datumbase: test

logging:
  level:
    io:
      reflectoring: DEBUG
    org:
      springframework:
        web: INFO
        datum:
          mongodb:
            core:
              ReactiveMongotemplate: DEBUG
    reactor:
      netty:
        http:
          client: DEBUG

this constitutes our basic non-block RESt API using Spring Webflux. Now this works as a publisher-subscriber model that we were talking about initially in this article.

Server-Sent Events

Server-Sent Events (SSE) is an HttP standard that provides the capability for servers to push streaming datum to the web client. the flow is unidirectional from server to client andthe client receives updates whenever the server pushes some datum. this kind of mechanism is often used for real-time messaging, streaming or notification events. Usually, for multiplexed andbidirectional streaming, we often use Websockets . But SSE are mostly used for the following use-cases:

  • Receiving live feed from the server whenever there is a new or updated record.
  • Message notification without unnecessary reloading of a server.
  • Subscribing to a feed of news, stocks, or cryptocurrency

the biggest limitation of SSE is that it’s unidirectional andhence information can’t be passed to a server from the client. Spring Webflux allows us to define server streaming events which can send events in a given interval. the web client initiates the RESt API call andkeeps it open until the event stream is closed.

the server-side event would have the content type text/event-stream . Now we can define a Server Side Event streaming endpoint using Webflux by simply returning a flux andspecifying the content type as text/event-stream . So let’s add this method to our existing usercontroller :

 1 
  2 
 3 
  4 
 5 
 6 
  7 
  8 
  9 
10 
11 
    @GetMapping(value= "/stream" , produces = Mediatype.tEXt_EVENt_StREAM_VALUE)
    public  flux<user> streamAllusers( ) {
        return    userService
                .getAllusers( )
                .flatMap(user ->  flux
                        .zip(flux.interval(duration.ofsecond(2) ) ,
                                 flux.fromstream(stream.generate(( ) ->  user))
                        )
                        .map(tuple2: :gett2)
                ) ;
    }

Here, we will stream all the users in our system every 2 seconds. this serves the whole list of updated users from the MongoDB every interval.

Webflux Internals

traditionally, Spring MVC uses the tomcat server for servlet stack applications whereas Spring Webflux uses Reactor Netty by default for reactive stack applications.

Reactor Netty is an asynchronous, event-driven network application framework built out of Netty server which provides non-block andbackpressure-ready network engines for HttP, tCP, andUDP clients andservers.

Spring Webflux automatically configures Reactor Netty as the default server but if we want to override the default configuration, we can simply do that by defining them under server prefix.

1 
2 
3 
4
server:
  port: 9000
  http2:
    enabled: true

We is define can also define the other property in the same way that start with theserver prefix by overriding the default server configuration.

conclusion

Spring Webflux or Reactive non-block applications usually do not make the applications run faster. the essential benefit it serves is the ability to scale an application with a small, fixed number of threads andlesser memory requirements while at the same time making the best use of the available processing power. It often makes a service more resilient under load as they can scale predictably.

Webflux is a good fit for highly concurrent applications. Applications which would be able to process a huge number of requests with as few resources as possible. Webflux is also relevant for applications that need scalability or to stream request datum in real time. While implementing a micro-service in Webflux we must take into account that the entire flow uses reactive andasynchronous programming andnone of the operations are block in nature.

If you want to learn about how to build a client to a reactive server, have a look at our WebClient article.

You can refer to all the source code used in the article on Github.

Reference https://reflectoring.io/getting-started-with-spring-webflux/#intro-to-java-9-reactive-streams-api