Гарантированная отправка сообщений

На конференции TeachLeadConf после доклада (видео) мне был задан вопрос про гарантию отправки сообщений в брокер. Если коротко, то вопрос можно сформулировать так: "Как гарантировать отправку сообщений, сохранив их порядок?" Оригинальная формулировка была не столь понятна для меня, поэтому пришлось ответить позже в закрытом чате конференции. Поскольку вопрос достаточно популярный, я решил дать более развернутый и публичный ответ.

Гарантированная отправка сообщений

К вопросам обеспечения гарантий доставки и сохранения порядка я бы добавил еще исключение дубликатов и/или обеспечение идемпотентной обработки. Эти требования являются критичными для широкого спектра задач. Например, в финансовой сфере помещение денег на счет и их снятие в дальнейшем сильно отличается от снятия с последующим помещением, не говоря о том, что эти операции не должны быть задублированы по ошибке.

Не нужно думать, что в современных распределенных системах все эти проблемы может решить какой-то особенный инструмент или способ отправки сообщений. Данную задачу нужно решать в комплексе - на уровне способа отправки и на уровне самого приложения. Причём, на мой взгляд, вопросов к самому приложению в этом плане должно быть гораздо больше, чем к возможному инструментарию.

Требования к способу отправки сообщений

При подобной постановке вопроса, очевидно, что из всего многообразия вариантов межпроцессного взаимодействия (IPC), выбор будет сделан в пользу брокеров сообщений на основе очередей. Поскольку в вопросе обозначена важность порядка следования сообщений, то в качестве брокера следует рассматривать какую-то разновидность Log-based, например, Kafka или Redpanda.

На сегодняшний день эти брокеры являются популярным выбором для большинства, поэтому дальнейшее изложение в первую очередь будет относиться к ним. Конфигурационные параметры продюсеров Kafka и Redpanda будут приводиться без указания конкретного брокера, т.к. оба имеют практически идентичный набор настроек.

Пожалуй, я не совру, если скажу, что вне зависимости от типа брокера все клиентские драйверы имеют защиту от кратковременных (сетевых) сбоев, буферизируя сообщения на стороне отправителя до тех пор, пока не получат от брокера подтверждения записи. Речь о так называемом in-flight-буфере. Но клиентская буферизация имеет ряд особенностей, которые оказывают влияние не только на гарантию доставки сообщений, но и на сохранение их порядка.

Размер буфера

Ясно, что размер буфера ограничен, и если сбой будет длительным, потеря данных неизбежна. При переполнении буфера в зависимости от выбранной стратегии придётся отбрасывать самые старые или самые новые сообщения. Драйверы некоторых брокеров предоставляют возможность выбора поведения в подобной ситуации.

Размер буфера Kafka-клиента регулируется параметром "buffer.memory". Предполагается, что при переполнении буфера отправитель будет заблокирован в течение настроенного таймаута ("max.block.ms"), после чего, если буфер не успеет освободиться, отправка завершится исключением. Иначе говоря, будут отброшены самые новые сообщения.

Можно установить размер буфера с запасом, минимизировав вероятность потерь при отправке. Для этого нужно оценить интенсивность потока данных, размер сообщений, а также потенциально возможную/допустимую продолжительность сбоя. Это частично решит проблему с гарантией отправки, но не решит проблему с порядком. Иначе говоря, важен не только размер буфера, но и то, как он используется во время отправки.

Количество одновременных отправок

У Kafka-клиента есть параметр "max.in.flight.requests.per.connection", управляющий количеством сообщений, которые производитель может отправить брокеру, не получая подтверждения. Высокое значение этого параметра приведет к повышенному использованию памяти, но одновременно к увеличению пропускной способности. Однако слишком высокое значение уменьшит пропускную способность вследствие снижения эффективности пакетной обработки. Значение 1 гарантирует запись сообщений в брокер в порядке их отправки даже в случае переотправок из-за ошибок. Значение по умолчанию равно 5!

Нетрудно догадаться, что значения больше 1 могут спровоцировать перестановку в порядке следования сообщений. Когда это может произойти, будет раскрыто далее.

Возможность переотправок при сбоях

Обычно для обеспечения надёжного функционирования распределенной системы используются различные техники обеспечения устойчивости к сбоям. Один из таких подходов - это шаблон "Повтор" (Retry). Он же используется в Kafka-клиенте в сценарии отправки сообщений. Если отправка сообщения завершается неудачей, предпринимается столько попыток, сколько указано в параметре "retries".

Если для параметра "retries" задано ненулевое значение, а для "max.in.flights.requests.per.connection" значение, превышающее 1, то брокер, которому не удалось записать первый пакет сообщений, сможет успешно записать второй (уже отправленный), а затем вернуться к первому и успешно повторить попытку его записи. В результате этого порядок будет изменен на противоположный!

Поэтому, если обеспечение упорядоченности критически важно, следует задать "max.in.flight.requests.per.connection=1". Это гарантирует, что во время повтора не будут отправляться другие сообщения, поскольку это может привести к изменению порядка. Но и тут есть особенности, которые связаны со следующей настройкой - исключение дубликатов.

Исключение дубликатов сообщений

Параметр "enable.idempotence", имеющий по умолчанию значение "true", заставляет отправителя сообщения проверять, что сообщение записано брокером в топик только один раз. Если данная настройка выключена ("false"), то возможна множественная запись, например, в случае повторной отправки из-за сетевого сбоя. Сообщение было отправлено и записано, но из-за сетевого сбоя ответ не дошел до клиента, в следствие чего была предпринята попытка повторной отправки.

Если параметр "enable.idempotence" равен "true", в каждое сообщение добавляется идентификатор отправителя (Producer ID) и порядковый номер сообщения. Брокер, получив такое сообщение, сверяет его с уже имеющимися и отбрасывает устаревшие сообщения (сообщения с меньшим порядковым номером, чем у предыдущего).

Для полного и гарантированного исключения дубликатов придется установить "acks=all". Это будет означать, что успешность отправки должны подтвердить все реплики брокера. Между тем это также означает, что отправка сообщений будет производиться дольше и производительность отправителя будет снижена.

Таким образом, при включенном "enable.idempotence" порядок сообщений будет сохранен даже при условии, что "max.in.flight.requests.per.connection" установлен больше 1.

Резюме по настройкам

Для гарантированной отправки и сохранения порядка следования сообщений необходимо установить следующие настройки Kafka-клиента.

  • Включены подтверждения о доставке - "acks=1" или "acks=all".
  • Значение "buffer.memory" должно соответствовать бизнес-требованиям.
  • Значение "retries" больше 0, но и 2147483647, скорей всего, будет перебором. Либо уточнить значение "delivery.timeout.ms" (по умолчанию 2 минуты).
  • Если "enable.idempotence=true", то "max.in.flight.requests.per.connection>=1"; иначе "max.in.flight.requests.per.connection=1".

Требования к приложению

Как бы хорошо ни работал инструмент доставки сообщений, приложение также должно быть готово к различным ситуациям. Сбои на этапе отправки, сбои на этапе обработки, повторная доставка сообщений (redelivery), дупликация сообщений и т.п. Всё это может происходить не по вине брокера, но и из-за сбоев работы компонентов системы, сети или неправильного конфигурирования. Ниже я обозначу только вектор, в котором следует рассматривать поставленные в самом начале вопросы.

Реализация продюсера

Для гарантированной отправки сообщений при выполнении бизнес-сценария в рамках ACID-транзакции следует использовать шаблон Outbox. Пожалуй, это основной подход, который одновременно обеспечивает и гарантию отправки, и порядок следования. Реализация данного шаблона имеет свои сложности, но о них лучше поговорить в другой раз. Более подробно этот шаблон описан в книге Криса Ричардсона "Microservices Patterns: With examples in Java". Он же является автором сайта microservices.io и фреймворка Eventuate Tram, в котором предлагает свою реализацию Outbox.

Реализация консюмера

В идеале обработчик должен быть готов к повторной доставке сообщений (redelivery), возможно, за счет идемпотентности обработки идентичных сообщений и/или контроля того, что сообщение уже было обработано ранее. Это требование достаточно размытое, поскольку есть множество способов его реализации, каждый из которых требует отдельного рассмотрения.

Глобальный порядок следования

В оригинальном вопросе явно не затрагивается тема обеспечения глобального порядка следования сообщений, но скажу об этом пару слов. Во-первых, я с трудом представляю задачу, где бы требовалось обеспечить упорядоченность всех сообщений в системе. Чаще всего нас интересует порядок сообщений, соотнесенных с каким-то объектом. Например, пользователем, пациентом, банковским счетом и т.п. Во-вторых, обеспечение глобального порядка очень сложная тема, как теоретически, так и практически, т.к. это, как минимум, требует процесса согласования всех узлов в кластере. В-третьих, упорядоченный поток данных, который невозможно партиционировать, невозможно обрабатывать параллельно. Следовательно, это требование накладывает серьезные ограничения на пропускную способность системы.

P.s. Если вам интересна данная тематика, присоединяйтесь к моей новостной ленте в Telegram или здесь. Буду рад поделиться опытом. ;-)

3 комментария

считаю, что грешно использовать очереди сообщений в операциях, требующих соблюденя целостности данных. для этого надо использовать трпнзакционную модель, обеспечивающую выполнение правила: всё или ничего
и никак иначе. вы навертели кучу проверочного функционала на все подсистемы и всё равно не добьетесь гарантированного выполнения, только просадите систему по ресурсам, что приведет к нестабильности её работы и снизит общую надежность. очереди сообщений тем и хороши, что легковесны и освобождены от тяжести бремени повышенной надежности. а вы их пихаете во все дыры. потому что "модно-мололежно". резюмируя. для гарантии и надежности используйте транзакционный механизм с подтвкрждением (РСУБД), для скорости, множественной публикации и маршрутизации - очереди.

Здравствуйте!

Спасибо за активность и развернутую обратную связь!

Соглашусь с вами в том моменте, что преждевременное усложнение - это крайне плохо. В идеале хорошо, если агрегат доменной модели целиком хранится в рамках одного хранилища, которое поддерживает транзакции. В остальном есть нюансы, о которых скажу ниже.

В первую очередь хотелось бы сказать, что моя заметка - это ответ на поставленный вопрос. Считайте, что архитектурное решение уже принято без нас с вами, остаётся решить технические детали. Почему было принято такое решение, где компоненты системы должны общаться асинхронно через брокер, да еще с высокими гарантиями доставки, остаётся за рамками поставленного вопроса. Думаю, что на то были веские причины.

Во-вторых, как бы нам с вами не хотелось, но мир движется в сторону распределенных систем и гетерогенному хранению данных. И это факт. Поэтому вопросы согласования данных в распределенных системах оправданно актуальны. В связи с этим инженеры вынуждены разбираться во всех этих нюансах.

В-третьих, в более сложных случаях мы вынуждены идти на декомпозицию системы, организуя ее в виде набора взаимодействующих служб. И иногда нам нужны и гарантии доставки, и порядок событий, и многое другое. Это трудно отрицать, т.к. в противном случае мы скатываемся к примитивизму, где есть только чёрное и белое, правильное и неправильное. Если бы все было так просто. Мы принимаем то или иное архитектурное решение на основании совокупности факторов, не зная которые давать оценочные суждения о удачности решения, как минимум, неправильно. Всегда ли вы можете использовать только ACID-хранилище, в которой помещается весь агрегат? Нет, не всегда, и я это могу сказать с абсолютной уверенностью. Есть предметные области, где важны гарантии согласованности, но документы хранятся в гетерогенном хранилище. Например, медицина. Часть документа хранится в РСУБД, а часть может храниться в BLOB-хранилище. И тут нет возможности использовать только ACID, тут придется использовать либо 2PC (распределенные транзакции), либо механизм повествований (saga).

Наконец, вы сказали, что без ACID нельзя добиться гарантий, согласованности и надёжности. Дело в том, что можно. Шаблон Transactional Outbox (и Inbox), ссылка на который дана в статье, как раз позволяет этого добиться. Конечно, есть нюансы в виде eventual consistency, но они также решаемы.