Гарантированная отправка сообщений
На конференции TeachLeadConf после доклада (видео) мне был задан вопрос про гарантию отправки сообщений в брокер. Если коротко, то вопрос можно сформулировать так: "Как гарантировать отправку сообщений, сохранив их порядок?" Оригинальная формулировка была не столь понятна для меня, поэтому пришлось ответить позже в закрытом чате конференции. Поскольку вопрос достаточно популярный, я решил дать более развернутый и публичный ответ.
К вопросам обеспечения гарантий доставки и сохранения порядка я бы добавил еще исключение дубликатов и/или обеспечение идемпотентной обработки. Эти требования являются критичными для широкого спектра задач. Например, в финансовой сфере помещение денег на счет и их снятие в дальнейшем сильно отличается от снятия с последующим помещением, не говоря о том, что эти операции не должны быть задублированы по ошибке.
Не нужно думать, что в современных распределенных системах все эти проблемы может решить какой-то особенный инструмент или способ отправки сообщений. Данную задачу нужно решать в комплексе - на уровне способа отправки и на уровне самого приложения. Причём, на мой взгляд, вопросов к самому приложению в этом плане должно быть гораздо больше, чем к возможному инструментарию.
Требования к способу отправки сообщений
При подобной постановке вопроса, очевидно, что из всего многообразия вариантов межпроцессного взаимодействия (IPC), выбор будет сделан в пользу брокеров сообщений на основе очередей. Поскольку в вопросе обозначена важность порядка следования сообщений, то в качестве брокера следует рассматривать какую-то разновидность Log-based, например, Kafka или Redpanda.
На сегодняшний день эти брокеры являются популярным выбором для большинства, поэтому дальнейшее изложение в первую очередь будет относиться к ним. Конфигурационные параметры продюсеров Kafka и Redpanda будут приводиться без указания конкретного брокера, т.к. оба имеют практически идентичный набор настроек.
Пожалуй, я не совру, если скажу, что вне зависимости от типа брокера все клиентские драйверы имеют защиту от кратковременных (сетевых) сбоев, буферизируя сообщения на стороне отправителя до тех пор, пока не получат от брокера подтверждения записи. Речь о так называемом in-flight-буфере. Но клиентская буферизация имеет ряд особенностей, которые оказывают влияние не только на гарантию доставки сообщений, но и на сохранение их порядка.
Размер буфера
Ясно, что размер буфера ограничен, и если сбой будет длительным, потеря данных неизбежна. При переполнении буфера в зависимости от выбранной стратегии придётся отбрасывать самые старые или самые новые сообщения. Драйверы некоторых брокеров предоставляют возможность выбора поведения в подобной ситуации.
Размер буфера Kafka-клиента регулируется параметром "buffer.memory". Предполагается, что при переполнении буфера отправитель будет заблокирован в течение настроенного таймаута ("max.block.ms"), после чего, если буфер не успеет освободиться, отправка завершится исключением. Иначе говоря, будут отброшены самые новые сообщения.
Можно установить размер буфера с запасом, минимизировав вероятность потерь при отправке. Для этого нужно оценить интенсивность потока данных, размер сообщений, а также потенциально возможную/допустимую продолжительность сбоя. Это частично решит проблему с гарантией отправки, но не решит проблему с порядком. Иначе говоря, важен не только размер буфера, но и то, как он используется во время отправки.
Количество одновременных отправок
У Kafka-клиента есть параметр "max.in.flight.requests.per.connection", управляющий количеством сообщений, которые производитель может отправить брокеру, не получая подтверждения. Высокое значение этого параметра приведет к повышенному использованию памяти, но одновременно к увеличению пропускной способности. Однако слишком высокое значение уменьшит пропускную способность вследствие снижения эффективности пакетной обработки. Значение 1 гарантирует запись сообщений в брокер в порядке их отправки даже в случае переотправок из-за ошибок. Значение по умолчанию равно 5!
Нетрудно догадаться, что значения больше 1 могут спровоцировать перестановку в порядке следования сообщений. Когда это может произойти, будет раскрыто далее.
Возможность переотправок при сбоях
Обычно для обеспечения надёжного функционирования распределенной системы используются различные техники обеспечения устойчивости к сбоям. Один из таких подходов - это шаблон "Повтор" (Retry). Он же используется в Kafka-клиенте в сценарии отправки сообщений. Если отправка сообщения завершается неудачей, предпринимается столько попыток, сколько указано в параметре "retries".
Если для параметра "retries" задано ненулевое значение, а для "max.in.flights.requests.per.connection" значение, превышающее 1, то брокер, которому не удалось записать первый пакет сообщений, сможет успешно записать второй (уже отправленный), а затем вернуться к первому и успешно повторить попытку его записи. В результате этого порядок будет изменен на противоположный!
Поэтому, если обеспечение упорядоченности критически важно, следует задать "max.in.flight.requests.per.connection=1". Это гарантирует, что во время повтора не будут отправляться другие сообщения, поскольку это может привести к изменению порядка. Но и тут есть особенности, которые связаны со следующей настройкой - исключение дубликатов.
Исключение дубликатов сообщений
Параметр "enable.idempotence", имеющий по умолчанию значение "true", заставляет отправителя сообщения проверять, что сообщение записано брокером в топик только один раз. Если данная настройка выключена ("false"), то возможна множественная запись, например, в случае повторной отправки из-за сетевого сбоя. Сообщение было отправлено и записано, но из-за сетевого сбоя ответ не дошел до клиента, в следствие чего была предпринята попытка повторной отправки.
Если параметр "enable.idempotence" равен "true", в каждое сообщение добавляется идентификатор отправителя (Producer ID) и порядковый номер сообщения. Брокер, получив такое сообщение, сверяет его с уже имеющимися и отбрасывает устаревшие сообщения (сообщения с меньшим порядковым номером, чем у предыдущего).
Для полного и гарантированного исключения дубликатов придется установить "acks=all". Это будет означать, что успешность отправки должны подтвердить все реплики брокера. Между тем это также означает, что отправка сообщений будет производиться дольше и производительность отправителя будет снижена.
Таким образом, при включенном "enable.idempotence" порядок сообщений будет сохранен даже при условии, что "max.in.flight.requests.per.connection" установлен больше 1.
Резюме по настройкам
Для гарантированной отправки и сохранения порядка следования сообщений необходимо установить следующие настройки Kafka-клиента.
- Включены подтверждения о доставке - "acks=1" или "acks=all".
- Значение "buffer.memory" должно соответствовать бизнес-требованиям.
- Значение "retries" больше 0, но и 2147483647, скорей всего, будет перебором. Либо уточнить значение "delivery.timeout.ms" (по умолчанию 2 минуты).
- Если "enable.idempotence=true", то "max.in.flight.requests.per.connection>=1"; иначе "max.in.flight.requests.per.connection=1".
Требования к приложению
Как бы хорошо ни работал инструмент доставки сообщений, приложение также должно быть готово к различным ситуациям. Сбои на этапе отправки, сбои на этапе обработки, повторная доставка сообщений (redelivery), дупликация сообщений и т.п. Всё это может происходить не по вине брокера, но и из-за сбоев работы компонентов системы, сети или неправильного конфигурирования. Ниже я обозначу только вектор, в котором следует рассматривать поставленные в самом начале вопросы.
Реализация продюсера
Для гарантированной отправки сообщений при выполнении бизнес-сценария в рамках ACID-транзакции следует использовать шаблон Outbox. Пожалуй, это основной подход, который одновременно обеспечивает и гарантию отправки, и порядок следования. Реализация данного шаблона имеет свои сложности, но о них лучше поговорить в другой раз. Более подробно этот шаблон описан в книге Криса Ричардсона "Microservices Patterns: With examples in Java". Он же является автором сайта microservices.io и фреймворка Eventuate Tram, в котором предлагает свою реализацию Outbox.
Реализация консюмера
В идеале обработчик должен быть готов к повторной доставке сообщений (redelivery), возможно, за счет идемпотентности обработки идентичных сообщений и/или контроля того, что сообщение уже было обработано ранее. Это требование достаточно размытое, поскольку есть множество способов его реализации, каждый из которых требует отдельного рассмотрения.
Глобальный порядок следования
В оригинальном вопросе явно не затрагивается тема обеспечения глобального порядка следования сообщений, но скажу об этом пару слов. Во-первых, я с трудом представляю задачу, где бы требовалось обеспечить упорядоченность всех сообщений в системе. Чаще всего нас интересует порядок сообщений, соотнесенных с каким-то объектом. Например, пользователем, пациентом, банковским счетом и т.п. Во-вторых, обеспечение глобального порядка очень сложная тема, как теоретически, так и практически, т.к. это, как минимум, требует процесса согласования всех узлов в кластере. В-третьих, упорядоченный поток данных, который невозможно партиционировать, невозможно обрабатывать параллельно. Следовательно, это требование накладывает серьезные ограничения на пропускную способность системы.
P.s. Если вам интересна данная тематика, присоединяйтесь к моей новостной ленте в Telegram или здесь. Буду рад поделиться опытом. ;-)