Кратно ускоряем потоки данных
Как правило мы находимся в условиях, когда за короткий срок нужно выдать максимум функциональности, обеспечив при этом должный уровень производительности приложения. И если функциональным требованиям уделяют основное внимание, то вопрос производительности воспринимают как само собой разумеющееся. А что делать, если сроки сжаты до предела, уже имеется какой-то рабочий прототип, а нагрузка к моменту основного релиза обещает вырасти в десятки раз? В такой момент мы хотим найти простое решение, которое избавит нас от большей части проблем.
Обобщив свой опыт я выделил пять простых и практичных архитектурных приемов, которые позволяют кратно ускорить потоки данных.
Далее я раскрою суть и условия применимости каждого приема.
Данная статья является текстовым вариантом моего доклада с конференции TeamLeadConf 2024.
Приём №1: Расширение контекста
Контекст события ДОЛЖЕН содержать необходимую и достаточную информацию для его обработки, ЕСЛИ ожидаемый поток событий не приводит к превышению лимитов по выделенным ресурсам.
Под событием подразумеваю какой-то сигнал в информационной системе; а под контекстом - данные, которые с ним связаны. Контекст используется при обработке события. Физическое представление события, передаваемое по сети, буду называть сообщением.
Если данных контекста недостаточно, обработчик вынужден запрашивать их извне: из базы данных, у внешних служб и т.п. Обычно такое происходит, когда хотят минимизировать размер сообщения. Например, в сообщении вместо содержимого документа передается только его идентификатор.
При таком подходе возможны две проблемы. Во-первых, появляется альтернативный канал связи, что провоцирует гонку по данным и увеличивает требования к хранилищу данных (см. Linearizability и Read Your Writes). Во-вторых, время, которое тратится на восстановление нужного контекста, гарантированно снижает производительность обработчика, особенно в момент нагрузки на систему.
Резюмируя, скажу так: расширяйте контекст события, если это не приведет к недопустимой нагрузке на сетевую и дисковую подсистему. Для этого проанализируйте существующий или возможный поток данных: интенсивность возникновения событий и размер сообщений.
Приём №2: Типизация событий
События ДОЛЖНЫ направляться в разные топики, ЕСЛИ события явно ИЛИ неявно относятся к разным типам, И ЕСЛИ тип события можно определить перед отправкой.
Вроде всё понятно, но есть коварный случай – неявная типизация. Это тот случай, когда реально разные типы событий попадают в один топик и обрабатываются одним и тем же обработчиком. Казалось бы, бред! Но в реальности такое можно увидеть достаточно часто. В основном это вызвано недостаточным знанием предметной области.
Если мы недостаточно знаем предметную область, то можем сделать ложное обобщение. Например, для логически разных событий использовать универсальную структуру данных. Об этом же предостерегает и Роберт Мартин в книге "Чистая архитектура" (см. главу "Ложное дублирование").
В чем же основной недостаток иметь один топик для разных типов событий? В первую очередь - это невозможность контроля в распределении ресурсов. Разные типы событий могут иметь разную частоту возникновения, разную скорость обработки. Скорей всего, для наиболее частотных и/или медленно обрабатываемых событий вы захотите выделить больше ресурсов для их обработки.
Итак, как же выявить неявную типизацию? Можно выделить следующие отличительные признаки.
- Контекст события уже на этапе публикации содержит какой-то ключ-классификатор (один или несколько атрибутов), по которому можно судить о типе события.
- Алгоритм обработчика события имеет ветвление, которое производится по некоторым атрибутам контекста события. При этом логика обработки каждой ветки существенно разнится. В этом случае можно говорить, что типизация событий производится на стороне получателя.
- Код обработчика не имеет ветвлений, но время обработки событий существенно варьируется. В этом случае можно говорить о вариативной вычислительной сложности обработки. Как правило, такое происходит, если обработчик делегирует основную работу внешнему компоненту. Если так, то см. код внешнего компонента и п. 2.
Приём №3: Конвейеризация вычислений
Обработчик события ДОЛЖЕН быть поделен на стадии, где каждая стадия – отдельный топик, ЕСЛИ выполнение следующей стадии зависит от предыдущей, И ЕСЛИ время выполнения каждой стадии ощутимо большое.
Иначе говоря, мы рассматриваем ситуацию, когда алгоритм обработки состоит из нескольких стадий, где выходные данные одной являются входными для другой - следующей (или следующих в случае, если есть условия выбора следующей стадии). И при этом каждая стадия вносит существенный вклад в общее время обработки. В этом случае следует задуматься о конвейеризации и для каждой стадии конвейера предусмотреть отдельный тип события и топик.
Почему это должно ускорить поток данных? Тут следует начать с того, что конвейер позволяет осуществлять внеочередное исполнение. Как это происходит, лучше посмотреть на простом арифметическом примере. Допустим, наш алгоритм состоит из двух стадий: A и B; а в очереди находится два события: E1 и E2.
Если у нас один обработчик, который последовательно выполняет все стадии, общее время обработки двух событий будет равно сумме задержек на всех этапах: Ts=A(E1)+B(E1)+A(E2)+B(E2).
Если у нас конвейерная обработка, т.е. стадии A и B выполняют разные обработчики, то для разных событий они могут работать параллельно и независимо друг от друга. И в таком случае общее время обработки двух событий будет равно сумме: Tp=A(E1)+max(B(E1),A(E2))+B(E2). Не трудно увидеть, что Ts>Tp.
Отдельно стоит отметить тот случай, когда решение об окончании обработки можно принять на ранних стадиях, досрочно закончив весь цикл, откинув ненужных "хвост" конвейера.
А теперь, отбросив всю эту теорию, приведу пару практических примеров для закрепления материала (делитесь в комментариях своими более сложными и интересными кейсами).
- Проверка возможности выполнения обработки и обработка. Например, проверяем счет клиента, а уже затем формируем для него коммерческое предложение.
- Подготовка к обработке данных и обработка. Например, перед формированием отчета аккумулируем всех необходимые данные в одном месте.
Приём №4: Распараллеливание вычислений
События в топике ДОЛЖНЫ обрабатываться параллельно, ЕСЛИ не важен порядок их обработки.
Вот с такого простого и очевидного утверждения начинается долгая история.
Когда мы говорим о распараллеливании, нужно помнить про условия обеспечения эффективности параллелизма. И тут я выделяю два условия.
- Основное время уходит на полезную работу. Тут мы сводим к минимуму программные блокировки и ожидания результатов при вызове внешних компонентов. Избавляемся от необходимости в синхронизации потоков, минимизируем количество IPC, оптимизируем запросы к БД и т.п. Пожалуй, именно с выполнения этого условия и нужно начинать процесс повышения эффективности параллелизма.
- Вычислительная сложность обработки одинакова для всех событий. В самом деле, выполнение этого условия дает шансы на то, что время обработки каждого события будет плюс-минус одинаковое. Это значит, что простои обработчиков будут сведены к минимуму, поскольку они будут начинать и закачивать работу примерно в одно и тоже время.
Если же вычислительная сложность разнится, то и время обработки каждого события будет разниться, провоцируя простой самых быстрых обработчиков в группе.
Ниже я рассмотрю как эта теория соотносится с Log-based- и Queue-based-брокерами, что может повлиять на выбор брокера или на пересмотр сценариев по работе с ними.
Распараллеливание вычислений в Log-based-брокерах
В контексте распараллеливания вычислений я выделяю две важных особенности Log-based-брокеров:
- Уровень параллелизма зависит от количества партиций. Мы не можем создать обработчиков больше, чем количество партиций.
- Неравномерная загрузка обработчиков, если время обработки сильно варьируется. В разные партиции могут попасть сообщения с разной вычислительной сложностью.
Первая проблема с нехваткой обработчиков решается двумя способами, которые можно использовать совместно:
- Увеличение числа партиций (сразу много или добавляем по мере возрастания нагрузки). Существует ряд недостатков данного решения, в том числе дополнительные накладные расходы на обслуживание партиций; увеличение времени балансировки; значительные сложности при сокращении числа партиций.
- Параллельная обработка пачки сообщений в разных потоках одного процесса ОС. Иначе говоря, потребитель считывает N-сообщений и каждое обрабатывает в отдельном потоке. Когда заканчивается обработка всех N-сообщений, происходит смещение к следующей пачке. Недостатки - необходимость ожидания окончания обработки всей пачки сообщений. Подход подробно описан тут, имеет готовую реализацию. Более глобально эту проблему пытаются решить на уровне Apache Kafka в рамках KIP-932.
Вторая проблема куда более значительная для Log-based-брокеров, поскольку по замыслу они не предназначены для нагрузки подобного рода. Если от сообщения к сообщению время обработки варьируется существенно, то это может привести к ребалансировке. Один из потребителей может уйти в долгую обработку, перестав подавать признаки жизни, в результате чего брокер может счесть потребителя нерабочим и начать процесс перераспределения партиций. Это сложный и долгий процесс, который частично или полностью останавливает обработку событий топика.
В случае с Apache Kafka решить проблему можно "на глазок": установить более точные настройки для max.poll.records (сделать поменьше) и/или max.poll.interval.ms (сделать побольше). Или усложнить логику взаимодействия с брокером так, чтобы потребитель подавал брокеру признаки жизни, даже в случае долгой/зависшей обработки (см. методы pause/resume).
Иногда эту проблему удается решить частично с помощью приёма "Типизация событий" (см. выше). Например, предположим, что событием является запрос на формирование сложного финансового отчета. Если пользователь укажет слишком большой календарный период, придется анализировать слишком много данных, это увеличит время формирования отчета. Если пользователь задаст более точный фильтр, это ожидаемо сократит время обработки. Таким образом, на стороне отправителя можно сделать небольшую эвристику - типизацию отчётов, например, на "простые" ("быстрые") и "сложные" ("медленные"). Для каждого типа завести свой тип и топик. Обработчик каждого типа будет обслуживать очередь в своем темпе, со своими настройками.
Распараллеливание вычислений в Queue-based-брокерах
Пожалуй, тут я начну с совета:
Для увеличения параллелизма МОЖНО отказаться от Log-based-брокера в пользу Queue-based, ЕСЛИ не важен порядок обработки событий, И ЕСЛИ не нужен лог событий.
Этот совет особенно ценен в случае, когда время обработки событий сильно варьируется, поскольку это та нагрузка, к которой Queue-based-брокеры особенно хорошо приспособлены. Как только обработчик заканчивает обработку очередного сообщения, брокер передает ему новое. Обработчики работают non-stop, исключая простои, увеличивая пропускную способность и по-максимуму утилизируя выделенные ресурсы.
Можно сделать мини-вывод, что Queue-based-брокеры отлично подходят:
- для задач с малопредсказуемой продолжительностью выполнения;
- когда гораздо важней как можно быстрей выполнить весь объем задач.
Примеры идеальных задач для Queue-based-брокеров:
- задача формирование отчета по запросу-фильтру пользователя;
- поиск и анализ ссылок в документе;
- сборка мусора - удаление ненужных данных и т.п.
В свою очередь Log-based-брокеры хорошо подходят для потоковой обработки событий, когда важен порядок следования и обработки, нужен лог событий и возможность запуска повторной обработки.
Приём №5: Локальность данных
Пожалуй, это один из основных принципов обработки данных в распределенных системах. Самое простое определение звучит так: держите данные ближе к месту их обработки. Этот приём направлен на оптимизацию вычислений, чтобы основное время уходило на полезную работу. Такой подход особенно оправдан при частом и одновременном обращении к некоторой части данных.
Я предлагаю присмотреться к частному случаю этого принципа, т.к. обычно он более прост в реализации и, как правило, накладывает меньше архитектурных ограничений.
Данные ДОЛЖНЫ находиться в месте их использования, ЕСЛИ они изменяются редко.
Существует множество способов реализации этого приёма, выбор зависит от решаемой задачи, доступного времени и средств. Примеры реализации:
- Локальный (файловый) кэш/индекс редко меняющихся данных. Сюда же можно отнести алгоритм "Hash Join". В большинстве случаев значительно ускоряет обработку, но требует усилий по поддержанию кэша/индекса в актуальном состоянии.
- Использование общей области памяти (shared memory) или общего диска. Также значительно ускоряет обработку, но в какой-то степени увеличивает связность между взаимодействующими компонентами, добавляет зависимость от технологии взаимодействия. "Читатель" знает, куда, что и как пишет "писатель", и наоборот. Помимо этого, взаимодействующие компоненты могут не ужиться на одной машине (разные требования к окружению, ресурсам, масштабированию и т.п.); общий диск может оказаться сетевым, что еще хуже скажется на производительности.
Теперь про жесткие архитектурные ограничения. Если рассматривать принцип локальности данных (data locality) в целом, способ его реализации может существенным образом повлиять на всё решение. Поэтому важно проанализировать все достоинства и недостатки каждого варианта. В качестве иллюстрации можно рассмотреть следующие примеры.
- Модель хранения данных - это тоже способ локализации данных. Например, key-value, column-family, индексы, да даже файловая система. Мы выбираем ту модель, которая наиболее удобна для большинства сценариев приложения. И я думаю, что многие сталкивались с ситуацией, когда модель хранения не отвечает существующим требованиям. Это способствует появлению неуклюжей и запутанной логики в коде.
- Использование денормализации данных или даже документно-ориентированных хранилищ (например, MongoDB). Если необходимые данные распределены по множеству таблиц, потребуются достаточные усилия, чтобы собрать их вместе. Это создаст нагрузку на дисковую и сетевую инфраструктуру, увеличит время обработки. С другой стороны, мы получим избыточность данных и, если мы будем использовать только малую часть большого документа, это может негативно отразиться на сценариях чтения и записи.
- Использование локальной файловой системы для генерации отчета вместо того, чтобы генерировать его в оперативной памяти или сразу производить запись в BLOB-хранилище. Преимущества очевидны - минимальное потребление памяти, сокращение количества сетевых обращений. Недостатки - места на локальном диске может не хватить; приложение должно позаботиться об удалении временных файлов.
- Алгоритм "Partitioned Hash Join" - партиционирование разных потоков данных по одному ключу, чтобы иметь возможность эффективно объединять потоки на стороне обработчика. Этот подход применим только в случае, когда партиции могут быть помещены в оперативную память.
Подводя итог, скажу, что локализация данных (data locality) позволяет добиться значительных или даже прорывных успехов в сокращении времени обработки. Это очень сильный приём, но в этом же кроется и его слабость, т.к. за мощную оптимизацию приходится платить. И если на сегодняшний день эта цена может показаться не такой большой, то завтра, когда поменяются требования к системе, это может стать серьезным препятствием, чтобы двигаться дальше.
Заключение
Рассмотренные приёмы позволяют достаточно быстро добиться значительного ускорения потоков данных приложения. Не обязательно применять все, их можно использовать выборочно, оценив свои возможности и риски. Как минимум, можно подвергнуть свое решение критическому анализу и рассмотреть потоки данных приложения через призму предложенных подходов.
P.s. Если вам интересна данная тематика, присоединяйтесь к моей новостной ленте в Telegram или здесь. Буду рад поделиться опытом. ;-)