ARCHITECTURE

Display realtime crypto prices with Vaadin and Spring Reactive

14 March, 2022
Vaadin + Spring + Reactive + Crypto

In this article, we’re going to create a simple application to display crypto currencies trades price in realtime using Vaadin Flow and Spring Reactive WebSockets API.

The WebSocket protocol (RFC 6455) provides a standardized way to establish a full-duplex, two-way communication channel between client and server over a single TCP connection. For this application, we are going to use Finnhub WebSockets API as it has a free tier that streams realtime trades for US stocks, forex and crypto.

The frontend choice is Vaadin Flow, version 23.0.1 at the time of writing, as it provides a simple, fast and powerful way to build UIs. Vaadin also supports server push over WebSockets, exactly what we need for displaying prices in realtime without the user explicitly requesting for updates.

Communication with prices WebSocket API will be performed using the fully non-blocking Spring Reactive WebClient. It has a functional, fluent API based on Reactor, which enables declarative composition of asynchronous logic without the need to deal with threads or concurrency. You can find a more detailed introduction about Reactive programming on Spring Reactive and Reactive Streams.

Is not the purpose of this article to deep dive into Spring Reactive and WebSocket specification. If you are interested in learning more about them, please refer to their corresponding documentation.

Show me the code

Let’s get straight into our tutorial and code.

The application stack is based on Maven 3.2+, Java 11+, Vaadin Flow 23, Spring Framework 5 and Spring Boot 2.6. You also need to have Lombok setup on your IDE.

WebSocket and Reactive configuration in Spring

Take a look at the most important blocks of the configuration in AppConfiguration class.

Define a WebSocketClient bean based on ReactorNettyWebSocketClient implementation.

    @Bean
    WebSocketClient webSocketClient() {
        return new ReactorNettyWebSocketClient();
    }

We also need a WebClient bean for fetching the list of available Exchanges and CryptoCurrencies from a REST API.

    @Bean
    WebClient finnhubWebClient(@Value("${api.finnhub.base.endpoint}") String baseUrl,
            @Value("${api.finnhub.token}") String token,
            WebClient.Builder webClientBuilder) {
        return webClientBuilder.baseUrl(baseUrl)
                .defaultHeader("X-Finnhub-Token", token)
                .build();
    }

Then define the reactive Sink and Flux beans needed for handling the WebSocket session. It’s really important to set autoCancel=false when creating priceSink in order to avoid Sink completion when last subscriber cancels.

@Bean
Many requestSink() {
    return Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
}

@Bean
Flux requestFlux() {
    return requestSink().asFlux();
}

@Bean
Many priceSink() {
    // set autoCancel=false to avoid Sink completion when last subscriber cancels
    return Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
}

@Bean
Flux priceFlux() {
    return priceSink().asFlux();
}

WebSocket session handler and API interaction

Let’s jump to the backend service that consumes the WebSocket API using the reactive WebClient API. This service class implements WebSocketHandler interface. Its handle method takes WebSocketSession and returns Mono<Void> to indicate when application handling of the session is complete. The session is handled through two streams, one for inbound and one for outbound messages.

First of all, open the WebSocket session after bean construction.

    @PostConstruct
    void init() {
        // open websocket session
        webServiceSubscription = webSocketClient.execute(URI.create(webserviceEnpoint), this).subscribe();
    }

The service implements handle() method that receives the WebSocket session as a parameter. Received messages are sent to priceSink and requests are consumed from requestFlux.

@Override
public Mono handle(WebSocketSession session) {
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .log()
            .map(this::map)
            .filter(tradeResponse -> Types.TRADE.getType().equals(tradeResponse.getType()))
            .map(TradeResponse::getTrades)
            .filter(Objects::nonNull)
            .flatMap(trades -> Flux.fromStream(trades.stream()))
            .subscribe(priceSink::tryEmitNext);

    return session.send(requestFlux.map(this::map).map(session::textMessage))
            .then();
}

Client subscription to reactive flux

In order to start receiving prices data, clients must subscribe to priceFlux. Furthermore, if this is the first subscriber for a given symbol, requestSink must emit a new request. WebSocket handler will consume it and send a subscribe message to the API. All this logic is implemented on PriceFluxSubscriptionContext.subscribe() method.

    public SubscriptionResult subscribe(String symbol, CryptoPricesSubscriber subscriber) {
        var symbolSubscribers = subscribers.computeIfAbsent(symbol, key -> new CopyOnWriteArrayList());
        symbolSubscribers.add(subscriber);

        var emitResult = EmitResult.OK;

        if (symbolSubscribers.size() == 1) {
            // emit request only if this is the first subscriber
            emitResult = requestSink.tryEmitNext(SymbolRequest.subscribe(symbol));
        }

        if (emitResult.isSuccess()) {
            subscriber.subscribe(priceFlux, symbol);
            return SubscriptionResult.ok();
        }

        return SubscriptionResult.error();
    }

Invoking PriceFluxSubscriptionContext.unsubscribe() method removes the client subscription. If there’s no more subscribers for a symbol, requestSink emits an unsubscribe request so the WebSocket API aborts sending prices for that symbol.

public void unsubscribe(String symbol, CryptoPricesSubscriber subscriber) {
    var symbolSubscribers = subscribers.get(symbol);

    if (symbolSubscribers != null) {
        symbolSubscribers.remove(subscriber);
        if (symbolSubscribers.isEmpty()) {
            // emit an unsubscribe request if symbol has no more subscribers
            requestSink.tryEmitNext(SymbolRequest.unsubscribe(symbol));
        }
    }

}

Frontend setup and view

Finally, let’s take a look at frontend code and how the view in order to display cryptocurrencies prices in realtime.

First of all, enable Vaadin server push support in our application by adding @Push annotation.

...
@Push
public class ReactiveCryptoApplication extends SpringBootServletInitializer implements AppShellConfigurator {
   ...
}

TradesListView class implements CryptoPricesSubscriber interface, whose methods subscribe and unsubscribe allow the interaction with PriceFluxSubscriptionContext.

Clicking Subscribe button kicks off the subscription.

    subscribeButton.addClickListener(e -> {
        ...
        var result = subscriptionContext.subscribe(getSymbol(), this);
        ...
    });

Then PriceFluxSubscriptionContext invokes subscribe() method from client. The latter subscribes to priceFlux in order to start consuming elements from this flux and display prices as the arrive. Note that UI updating logic is executed inside UI.access() method. The reason is that user session requires locking when making changes to a UI from another thread and pushing them to the browser.

@Override
public void subscribe(Flux cryptoPrices, String symbol) {
        progressBar.setVisible(true);
        subscribeButton.setVisible(false);
        unsubscribeButton.setVisible(true);
        exchangesComboBox.setEnabled(false);
        symbolsComboBox.setEnabled(false);

        var priceSubscription = priceFlux.subscribe(trade -> getUI().ifPresent(ui -> ui.access(() -> {
            if (trade.getSymbol().equals(symbol)) {
                progressBar.setVisible(false);
                tradesPanel.add(trade);
            }
        })));

        // keep for later unsubscription
        priceSubscriptionMaybe = Optional.of(priceSubscription);
}

Clicking Unsubscribe button stops the interaction between client and backend.

unsubscribeButton.addClickListener(e -> subscriptionContext.unsubscribe(getSymbol(), this));

Running the application

First, you’re going to need a free account at https://finnhub.io in order the get access to their API and get the authetication token.
Then, open application.properties file and replace your_token string with the corresponding token you obtained.

api.finnhub.endpoint=wss://ws.finnhub.io?token=your_token
api.finnhub.token=your_token

You can run the application from the command line with Maven by running.

mvnw spring-boot:run

Now open localhost:8080 on your browser and you should see the application page.

Crypto Reactive Realtime

Summary

We’ve presented a simple example on how to consume and interact with a WebSocket in a reactive way. Support provided by Spring Framework and Project Reactor makes this task a bit simpler. And thanks to Vaadin push support, we were able to push price data to UI without any further interaction required from user’s side.

As always, the full example can be found in our GitHub repository, and an online demo here.

Felipe Lang
By Felipe Lang

System Engineer. Crafting software for more than 20 years. SOLID, KISS and DRY FTW!

Join the conversation!
Profile Picture

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.