В мире веб-разработки реактивное программирование становится все более популярным подходом. Основано на разработке асинхронных, неблокирующих компонентов и потоков данных, что позволяет создавать высокопроизводительные отзывчивые приложения. Концепция реактивного программирования открывает новые возможности для построения масштабируемых систем, способных справляться с огромными потоками данных.
В этой статье мы рассмотрим основные концепции реактивного программирования, популярные Java-фреймворки для его реализации, а также сравним их производительность на примере демо приложения.
Реактивные потоки
В Java 9 была введена спецификация реактивных потоков, определяющая четыре основных интерфейса:
- Publisher — источник потенциально неограниченного числа событий;
- Subscriber — подписчик, т.е. тот, кто будет потреблять события;
- Subscription — активный объект подписки, который будет возвращать события либо отменяться;
- Processor — по сути Publisher + Subscriber в одном флаконе.
Существует несколько библиотек, реализующих концепцию работы с реактивными потоками таких как RxJava, Project Reactor, Smallrye Mutiny, Akka Streams. Все они предоставляют готовые реализации реактивных потоков, а также различные операторы для их обработки, такие как map, filter, reduce и другие.
Рассмотрим работу реактивного потока на примере библиотеки Project Reactor.
С помощью Flux.just создаем реактивный поток из элементов от 0 до 5. После этого применяем ряд операций над нашим потоком:
- задерживаем появление каждого элемента на 5 миллисекунд с помощью delayElements;
- с помощью filter отбрасываем четные элементы;
- еще раз задерживаем появление каждого элемента на 5 миллисекунд;
- возводим каждый из оставшихся элементов в квадрат с помощью map;
- снова задерживаем появление элементов на 5 миллисекунд;
- суммируем все оставшиеся элементы, превращая наш поток из нескольких элементов в поток из одного элемента.
Реактивные системы
Само по себе наличие реактивных потоков не делает наше приложение полностью реактивным. Существует манифест реактивных систем, в котором описаны принципы и характеристики, придерживаясь которых можно построить такую систему. Реактивными считаются системы:
1) Основанные на обмене сообщениями. Реактивные системы используют асинхронный обмен сообщениями. Это позволяет вынести границы, организовать наблюдаемость и back pressure (обратное давление);
2) Отзывчивые. То есть способные отвечать своевременно. Отзывчивые системы ориентированы на быстрый отклик;
3) Устойчивые. Системы, которые в случае каких-либо ошибок, отказов или чего-то иного, полностью не разрушаются, а изолируют ошибки внутри компонентов, сохраняя работоспособность системы в целом;
4) Гибкие. Способные реагировать на изменяющиеся нагрузки. Когда нагрузка высокая, система не падает. Просто начинает потреблять больше ресурсов, а также чуть медленнее отвечает, однако при этом работает. Если же нагрузка упала, то система просто потребляет меньше ресурсов, однако работает в штатном режиме.
Типичными примерами полностью реактивных систем являются онлайн-чаты, мессенджеры, системы мониторинга и уведомлений, стриминговые сервисы, а также другие системы, в которых присутствует бесконечный поток данных.
Для создания полностью асинхронной и реактивной системы необходимо, чтобы все ее компоненты взаимодействовали реактивным образом. Это означает использование таких технологий, как Websockets, Server-Sent Events (SSE) или же RSocket.
Использование брокеров сообщений для взаимодействия между микросервисами, а также реактивный доступ к базам данных.
Механизм Back Pressure
Механизм Back Pressure позволяет получателю регулировать поступающую в систему нагрузку, предотвращая ее перегрузку и падение.
Например, у нас есть сервис А, который способен обрабатывать 7 500 событий в секунду. И есть сервис B, который шлет 10 000 событий в секунду в сервис A. В итоге сервис A не справляется с нагрузкой.
Каким образом Back Pressure решает эту проблему?
Существует три основные стратегии:
1) Контроль скорости поставщика. Наиболее предпочтительная стратегия, когда получатель ограничивает скорость источника, сообщая ему максимально допустимую нагрузку. Однако эта стратегия не всегда реализуема, например, в системах, где мы не можем контролировать поведение пользователей.
В нашем примере сервис A сообщил бы сервису B, что он готов принимать по 7 500 событий. И сервис B в свою очередь должен был ограничить свои скорость до 7 500 событий в секунду.
2) Буферизация. Все лишние сообщения, которые получатель не может обработать сразу, складываются в буфер. Таким образом, получатель не перегружается и может забирать события из буфера в допустимом для него темпе. Однако буферы имеют ограниченный размер, и если источник постоянно генерирует больше событий, чем может обработать получатель, буфер в конечном итоге переполнится.
3) Отбрасывание. Все события, которые невозможно обработать, просто отбрасываются. Пример — debounce, техника обработки событий, при которой игнорируются повторные вызовы в течение некоторого времени. Например, в автозаполнении поля ввода.
Демо-приложение
Для демонстрации принципов реактивного программирования было разработано демо-приложение «Трейдер» для отслеживания, а также торговли акциями Apple и Tesla. Приложение использует реактивные потоки для получения данных о ценах акций, применяет стратегию Back Pressure реализует логику покупки/продажи в зависимости от текущей цены. Код приложения демонстрирует использование реактивных библиотек, таких как Project Reactor, создание асинхронных компонентов и применение базовых операторов.
Как это выглядит:
Справа — график стоимости акций. Слева — количество акций, которое у нас сейчас на руках. А сверху — баланс.
Рассмотрим код.
StocksProducer
Ядром приложения является компонент StocksProducer, который используем для получения стоимостей акций. Этот компонент генерирует бесконечный поток чисел — будущих цен акций.
Для предотвращения перегрузки системы применяется механизм Back Pressure — появление каждого нового числа задерживается на 100 миллисекунд. Сгенерированный поток чисел далее преобразуется в поток объектов, представляющих акции с соответствующими ценами. Таким образом, реализуется генератор стоимостей акций.
Следует отметить, что без delayElements бесконечный поток данных сразу же привел бы к ошибке и падению системы.
Controller
Следующим компонентом выступает контроллер, отправляющий поток событий со стоимостями акций в браузер и отслеживающий динамику двух акций — Apple и Tesla. Полученные от API данные о ценах используются для принятия решений о покупке или продаже акций в зависимости от заданных пороговых значений.
На уровне контроллера два отслеживаемых потока данных с ценами объединяются в единый поток для более удобной работы с ними. Далее применяется механизм Back Pressure в виде оператора sample. Он отбирает последний элемент потока — цену акции, поступившую за определенный интервал времени, в данном случае 500 миллисекунд.
Преобразованный таким образом поток цен используется компонентом, реализующим логику торговли акциями. На основе текущих значений цен принимается решение о необходимости купить или продать каждую из отслеживаемых акций.
Frontend
На стороне клиента, во фронтенд-части приложения, создается слушатель для получения потока событий с данными о ценах от сервера. Визуализация происходит путем отображения графиков изменения стоимости акций.
Популярные фреймворки
На рынке Java существует несколько популярных фреймворков для реактивного программирования с разными особенностями и возможностями:
- Spring WebFlux — основан на Project Reactor. Органично интегрируется в экосистему Spring, поддерживает различные протоколы, такие как WebSocket, RSocket, SSE. Также имеет реактивные версии таких компонентов, как Spring Security и Spring Data. Код можно писать на Java и Kotlin;
- Vert.x — отличается быстрым запуском, малым потреблением памяти и развитой экосистемой асинхронных компонентов. Построен на событийно-ориентированной архитектуре с использованием вертиклов (легковесных потоков). Поддерживает множество языков, включая Java, JavaScript, Ruby, Scala и Kotlin.
- Micronaut — позиционируется как фреймворк для микросервисов с внедрением зависимостей на этапе компиляции. Имеет встроенные возможности Service Discovery, Circuit Breaker, Health Check, трассировки запросов и брокеры с сообщениями «из коробки». Поддерживает Java, Groovy и Kotlin;
- Quarkus — основан на Vert.x и ориентирован на Serverless, Kubernetes и микросервисы. Имеет интеграцию с брокерами сообщений и Kubernetes. Код пишется на Java и Kotlin. Quarkus позиционируется как фреймворк для создания нативных исполняемых файлов с очень быстрым стартом;
- Akka — значительно отличается от других фреймворков, так как построен на акторной модели программирования. Имеет возможности кластеризации, шардирования и распределенных вычислений «из коробки». Изначально Akka разрабатывался для языка Scala и менее популярен в Java-сообществе. С 2022 года часть функциональности Akka стала платной.
Демо-приложение поиска билетов
Для сравнения производительности различных фреймворков было разработано еще одно демо-приложение для поиска и бронирования авиабилетов с тремя основными функциями: асинхронный поиск билетов по заданным параметрам, получение списка перелетов и бронирование выбранного рейса.
Архитектура приложения состоит из трех слоев:
- HTTP-слой с контроллерами для обработки запросов;
- Слой сервисов, содержащий бизнес-логику;
- Слой базы данных для хранения и извлечения данных о рейсах, аэропортах, самолетах и местах.
Код HTTP-слоя и слоя сервисов выглядит практически идентично на всех фреймворках, за исключением некоторых мелких отличий, таких как отсутствие контроллеров в Vert.x. Вместо них используются вертиклы.
Различия появляются только в слое взаимодействия с БД для Spring Webflux + R2DBC. Из-за отсутствия автоматического разрешения связей между сущностями требуется выполнять дополнительные запросы и затем превращать самостоятельно в объекты. Придется самому создавать рейс, объекты аэропортов, парсить поля из JSON, координаты, места и т.д.
А вот в версии с Hibernate Reactive (Vert.x, Micronaut, Quarkus, Webflux) за счет автоматического разрешения связей код оказывается проще и не сильно отличается от классического Spring.
Теперь перейдем к слою контроллеров. Код здесь выглядит нестандартно, так как взаимодействие с базой происходит в виде стримов, из стрима достается поисковый запрос. Следующей операцией этот запрос передается в метод поиска рейсов для получения результатов. Отдельным запросом можем получать места в самолете:
Тестирование
Для тестирования приложения будем использовать JMeter.
- Нагрузку увеличиваем в течение 50 секунд до максимума в 1500 пользователей;
- Продолжительность теста 1 минута.
Конфигурация выглядит следующим образом:
Тестировать будем два запроса:
- Запрос на поиск (POST с JSON в теле);
- Запрос на получение рейсов.
Используем дополнительно HTTP Header Manager для того, чтобы указать заголовок Content-type.
Поскольку запрос возвращает json с идентификатором результатов поиска, необходимо еще добавить и JSON-экстрактор, чтобы сохранить этот идентификатор для следующего запроса на получение результатов поиска.
Для получения результатов поиска используем GET /flights/search?requestId=xxx.
Результаты сохраним в influxDb.
Результаты тестирования
Лучшую производительность показал фреймворк Vert.x. Spring WebFlux (WebFlux 1) с использованием R2DBC оказался самым медленным. Всему виной неоптимальный запрос для получения связей.
В качестве оптимизации решили выбирать места в перелетах отдельным запросом.
В результате производительность возросла вдвое (WebFlux 2), но это все еще медленнее, чем Webflux + Hibernate Reactive из-за дополнительных оптимизаций внутри него.
Сравнительные таблицы результатов:
Заключение
Реактивное программирование предлагает мощный подход к созданию высокопроизводительных и масштабируемых веб-приложений, способных обрабатывать огромные потоки данных. Но оно может быть не всегда оправдано и быть излишне сложным для простых приложений с малым количеством асинхронных операций и распределенных компонентов, поэтому для начала нужно проанализировать, подходит ли конкретно вашему проекту такой подход.