Очередь сообщений

До сих пор мы говорили о сборе данных, сейчас обсудим как собранные данные передаются в систему анализа и дальше по цепочке.

Вспоминая общую схему потоковой системы, речь будет идти о компоненте очереди сообщений.

Необходимость очереди сообщений

Может возникнуть резонный вопрос, а зачем вообще звено очереди сообщений? Ведь если рассматривать обработку данных в пределах одной машины, данные из компонента сбора данных можно сразу передавать в звено анализа, и всё будет работать.

Ответ достаточно прост – при обработке больших массивов данных, речь чаще всего идёт о распределённых системах. Распределённые системы значительно проще строить и поддерживать в отсутствие сильных связей между компонентами (и вообще сложные системы!). Поэтому, чтобы разорвать сильную связь между компонентом сбора данных и компонентом анализа, мы вводим между ними промежуточную “прослойку”. Использование очереди сообщений позволяет нам работать на более высоком уровне абстракции – передавать сообщения, а не вызывать явно следующее звено.

Основные концепции

Определимся, что именно мы понимаем под очередями сообщений. Здесь этот термин понимается в широком смысле. Существует множество программных продуктов, реализующих абстракцию очереди сообщений (“брокеры сообщений”):

  • RabbitMQ
  • ActiveMQ
  • HornetQ
  • и др.

Из более новых:

  • NSQ
  • ZeroMQ
  • Apacke Kafka

Вообще, проект Apache Kafka имеет больше возможностей, чем просто реализация очереди сообщений, но здесь мы временно про это забудем, и ограничимся рассмотрением возможностей публикации/подписки на потоки сообщений, что достаточно близко к абстракции очереди.

Компоненты очереди сообщений

В очереди сообщений есть три главных компонента:

  • Производитель сообщений
  • Брокер сообщений
  • Потребитель сообщений

Задачи производителя и потребителя точно отражены в их названиях: производитель порождает сообщения, а потребитель их потребляет.

Обратим внимание, что вместо “очередь сообщений” на схеме написано “брокер”. Разница в том, что брокер может управлять несколькими очередями. Очереди абстрагируются брокером.

На этой схеме более очевидно, что происходит:

  • Производитель отправляет сообщение брокеру
  • Брокер получает сообщение и помещает его в очередь
  • Потребитель запрашивает сообщение у брокера
  • Брокер достаёт сообщение из очереди и передаёт потребителю

В целом всё выглядит достаточно просто и интуитивно, но обсудим подробности, которые могут быть не очень очевидны.

Изоляция производителей и потребителей

Одна из наших целей – ослабить связи между компонентами. Очередь сообщений делает это для компонентов сбора данных и анализа. Обсудим, почему именно это желательно и какие преимущества это даёт.

В зависимости от характера задачи, может оказаться, что производитель порождает сообщения быстрее, чем потребитель из может принять. Как в таком случае предотвратить “захлёбывание” компонента анализа?

Здесь отметим, что “избыточность” сообщений называется “противодавлением”, по аналогии с гидравлическими системами. Когда всё идёт хорошо, и сообщения обрабатываются быстрее, чем производятся, противодавление равно нулю. Когда же сообщения производятся быстрее, чем обрабатываются, “противодавление” растёт.

Во многих системах временный рост противодавления – абсолютно нормальное явление. Например, в пакетных системах обработки данных, потребители сообщений периодически подключаются, собирают все скопившиеся сообщения, и обрабатывают их, а между подключениями противодавление растёт. Однако в системах “почти” реального времени, рост противодавления может свидетельствовать о проблемах.

Можно предположить, что противодавлением можно управлять на стороне потребителей – если оно растёт, можно увеличить число потребителей или сделать их быстрее, чтобы они справлялись с потоком производимых сообщений. Но это не всегда может быть оправдано. В других системах, контроль противодавления осуществляется через управление производителем.

Следует отметить, что не все системы очередей сообщений поддерживают управление производителем. Тогда темп порождения сообщений остаётся целиком на совести разработчика конечного приложения. Поддержка медленных и/или периодически отключающихся потребителей в основном имеется в продуктах, в которых реализованы т.н. долговечные сообщения.

Долговечные сообщения

Долговечные сообщения могут быть интересны не только с точки зрения управления противодавлением. Представим себе, что звено сбора данных и звено анализа географически разнесены. Всё идёт хорошо, но вдруг вечером пятницы происходит авария (например, какой-нибудь экскаватор порвал оптоволоконный кабель – что уже неоднократно случалось) и связь между звеньями стала гораздо медленнее, так что весь поток сообщений перестал через неё проходить.

Как это отразится на бизнесе? Сколько данных можно потерять без катастрофических последствий? Если потеря данных недопустима, то нужна технология очереди, которая способна хранить сообщения в течение длительного времени. Это реализуется путём записи сообщений в долговременное хранилище.

Конкретные реализации долговременного хранилища очереди сообщений могут быть различными. Это может быть ФС (возможно, распределённая), или реплицируемая база данных, или даже какое-то стороннее коммерческое решение, но общий смысл один – долгосрочно надёжно хранить сообщения, которые не удаётся передать “прямо сейчас”.

Долговечные сообщения не только в какой-то мере обеспечивают устойчивость к сбоям, но и дают возможность поддержки периодически активных потребителей. Это может быть интересно и в разрезе систем потоковой обработки данных, если нам хочется обрабатывать одни и те же даныне по-разному.

Представим себе такой пример: мы разрабатываем систему навигации для водителей (типа Яндекс.Навигатор), позволяющую строить маршруты на основе текущей дорожной обстановки. Спустя некоторое время, принято решение добавить возможность воспроизведения прошлой дорожной обстановки (например, для обучения нейросети, предсказывающей дорожную обстановку в будущем, или просто для удобства конечных пользователей). В отсутствие долговечных сообщений, в системе “почти” реального времени, принятые звеном анализа сообщения удаляются из очереди сообщений, и вернуться в прошлое уже не получится. С другой стороны, если одна или несколько очередей записываются в хранилище, то потребители исторических данных могут подключаться и обрабатывать эти сообщения по мере наличия ресурсов (после чего записывать результаты обработки в постоянное хранилище).

Семантика доставки сообщений

Производитель отправляет сообщения брокеру, а потребитель запрашивает сообщения у брокера. Это очень поверхностное описание того, как работает механизм доставки сообщений. Копнём теперь глубже и посмотрим, какие гарантии дают продукты, реализующие очереди сообщений.

Есть три вида таких гарантий:

  1. Не более одного раза – сообщение может потеряться, но никогда потребитель не получит одно и то же сообщение дважды.
  2. Не менее одного раза – сообщение не может потеряться, но может быть больше одного раза получено потребителем
  3. Ровно один раз – сообщение не может потеряться и потребитель читает его ровно один раз

Какое решение лучшее? Интуитивно кажется, что “ровно один раз”, и это популярная версия ответа на этот вопрос: большинство хочет иметь сильные гарантии корректности. Однако системы, дающие такие гарантии, сложны, не слишком производительны, и имеют множество возможных точек отказа. Всё не так мрачно, но это стоит иметь ввиду.

Рассмотрим возможные точки отказа в системе, гарантирующей доставку “только один раз”:

  • Производитель – если производитель отказывает после того, как сообщение сгенерировано, но до того, как оно отправлено брокеру, сообщение будет потеряно. Или если производитель отказывает, ожидая подтверждения приёма сообщения от брокера, после восстановления он отправит повторно то же самое сообщение
  • Сеть между производителем и брокером – если выходит из строя сеть между производителем и брокером, то сообщение, отправленное производителем не дойдёт до брокера. Или если по причине ошибки сети подтверждение не дойдёт до производителя, сообщение отправится повторно.
  • Брокер – если брокер отказывает, когда сообщения ещё не сохранены в надёжном хранилище, сообщения будут потеряны. Если брокер отказывает, не успев послать подтверждение производителю, сообщение будет отправлено повторно. Аналогично на стороне потребителя.
  • Очередь сообщений – если очередь сообщений абстрагирует постоянное хранилище, то в случае отказа при попытке записи на диск, сообщения могут быть потеряны
  • Сеть между потребителем и брокером – возможна потеря сообщения или подтверждения доставки, соответственно потеря или повторная отправка сообщения
  • Потребитель – отказ потребителя может привести к необходимости повторной отправки сообщения; при наличии нескольких потребителей одно сообщение может быть доставлено нескольким потребителям повторно

Для систем, гарантирующих доставку хотя бы один раз или не больше одного раза, число сценариев отказа по понятным причинам уменьшается вдвое. Что выбрать – зависит от конкретной задачи, в любом случае приходится искать компромисс между скоростью и корректностью. Если создаётся проект для веб-аналитики, то разумным выглядит решение “не более одного раза” – потеря небольшой части сообщений скорее всего не слишком исказит статистику (повторные сообщения сделают это с большей вероятностью), а система может быть проще и производительнее (а значит, дешевле). Если же речь идёт об обнаружении случаев финансового мошенничества, то может быть имеет смысл система, дающая гарантию “ровно один раз”, чтобы не пропустить мошеннические операции с одной стороны и избежать ложно-позитивных срабатываний системы из-за повторных сообщений с другой.

При изучении систем сообщений, может выясниться, что подходящая по всем прочим параметрам система (например Kafka или ActiveMQ) не даёт гарантий “только один раз”. Это не фатально – часто системы очередей сообщений предоставляют достаточно метаданных, чтобы можно было реализовать семантику однокаратной доставки самостоятельно через координацию производителей и потребителей (это однако усложнит систему за счёт “протекающей” абстракции – абстракция очереди сообщений будет частично “размазана” между производителем и потребителем):

  • Со стороны производителя. Не пытаться отправить сообщения повторно. Для этого нужно каким-то образом запоминать, какие сообщения производители отправляют брокерам. Если ответ от брокера не поступил или сетевое соединение оборвалось, то после восстановления можно получить данные от брокера и проверить, было ли получено сообщение, для которого не пришло подтверждение. Такой подход (в разумных пределах) гарантирует, что сообщения отправляются производителем только один раз
  • Со стороны потребителя. Сохранять метаданные последнего сообщения. Нужно хранить такие данные о сообщениях, которые позволят однозначно определить, что потребитель не обрабатывает сообщения повторно. Обычно у сообщений есть некий идентификатор, позволяющий их однозначно определить. Метаданные должны сохраняться в надёжное постоянное хранилище.

Отдельно следует отметить, что хранить может быть полезно не только идентификатор сообщения, но и метаданные, позволяющие определить полезную нагрузку этого сообщения. Тогда кроме гарантий однократной доставки, можно почти “бесплатно” получить возможность аудита данных (проверки корректности)

Безопасность

В рамках этого курса мы не слишком акцентируем внимание на безопасности, но про неё не следует забывать. Конкретно при обсуждении очередей сообщений, следует не только следить за безопасностью данных при передачи и хранении, но также и за тем, разрешено ли конкретным производителям данных порождать соответствующие сообщения, а конкретным потребителям – потреблять. Иначе возможна ситуация, что скомпрометированный производитель значительно искажает данные, или скомпрометированный потребитель получает все данные из системы.

Как минимум, нужно продумать все эти моменты, и если в команде есть безопасники, подключить их к этому.

Моменты, на которые стоит обратить внимание:

  • На этапе сбора данных:
    • Можно ли аутентифицировать производителя?
    • Разрешено ли производителю порождать сообщения, которые он порождает?
  • При передаче сообщений:
    • Как обеспечить безопасность передачи (сильное шифрование; насколько сильное?)
  • При хранении сообщений:
    • Как обеспечить безопасность хранения (сильное шифрование, криптографические подписи)
  • На этапе очереди сообщений:
    • Если очередь распределённая, могут ли брокеры аутентифицировать друг друга?
  • На этапе анализа данных:
    • Можно ли аутентифицировать потребителя?
    • Разрешено ли потребителю потреблять сообщения, которые он потребляет?

Отказоустойчивость

Теперь обсудим, что произойдёт с данными, когда случится что-то нехорошее. Не если случится – когда. Рассчитывать на безотказную работу ни в коем случае нельзя.

Что случится, если один из брокеров выйдет из строя? Если брокер использует долговременное хранилище, то можно надеяться, что риску подвержены только сообщения, которые брокер не успел туда записать. Есть несколько способов снижения связанных с этим рисков:

  • Производитель может ожидать подтверждения записи сообщений на диск
  • Можно реплицировать сообщения нескольким брокерам. Риск остаётся, но с ростом числа брокеров в репликации риск падает экспоненциально (вероятность одновременного отказа двух одинаковых узлов – это вторая степень вероятности отказа одного узла)
  • Можно хранить в оперативной памяти как можно меньше данных. Этот подход чреват заметными потерями производительности, т.е. опять компромисс между производительностью и надёжностью

Что случится в случае разрыва сетевых соединений? При использовании систем сообщений, поддерживающих репликацию, данные в относительной безопасности, поскольку хранятся на нескольких узлах (брокерах). После восстановления связности сети, брокер снова присоединится к кластеру и синхронизируется с другими брокерами. При выборе продукта, следует найти ответы как минимум на следующие вопросы:

  • При отказе сети, выбирается ли в качестве реплики другой брокер?
  • Что происходит при восстановлении сети?
  • Есть ли возможность настроить временную задержку (“таймаут”), при которой считается, что сеть отказала?
  • Что произойдёт с данными, если сеть не восстановится?
  • Что произойдёт с данными, если брокер отключится от кластера в момент передачи ему сообщения от производителя?

Что произойдёт при отказе хранилища? Хотя такая ситуация грозит катастрофической потерей данных, любой, кто имел дело с СХД, знает, что с этим можно бороться. Если задействовано несколько брокеров, следует найти ответы на вопросы:

  • Существуют ли реплики потерянных в результате отказа СХД данных?
  • Если реплицируемые данные не успели записать на диск до момента отказа: будут ли эти данные потеряны?
  • Как восстановить брокера после отказа СХД?

Оценивая пригодность конкретного продукта для решения задач бизнеса, следует поставить эти вопросы и найти на них ответы.

Примеры применения в конкретных задачах

Посмотрим, как можно применить базовые концепции в рамках конкретных (умеренно фантазийных) сценариев.

Финансы: обнаружение мошенничества

Компания “ООО”Петров и ко"" предоставляет службу обнаружения мошенничества в реальном времени. Для этого собираются сведения об операциях по кредитным картам из всех доступных источников, к данным применяются различные хитроумные алгоритмы (являющиеся предметом коммерческой тайны) и в момент совершения покупки клиентам этой системы отправляется решение об одобрении или отклонении операции.

Вот некоторые вопросы, которыми необходимо задаться при проектировании такой системы:

  • Как повлияет на бизнес длительное отсутствие связи между звеньями сбора данных и анализа?
    • Ответ: катастрофически. Компания не сможет обеспечивать сервис, который продаёт, и это может нести не только репутационные потери, но и прямые финансовые потери со стороны заказчиков.
  • Данные за сколько дней можно потерять без последствий?
    • Ответ: нельзя. Вообще, с учётом характера приложения, потери данных – практически недопустимы.
  • Должны ли храниться старые данные?
    • Ответ: Скорее всего, да. Во-первых, эти данные могут быть важны при отладке и доработке алгоритмов анализа. Во-вторых, в случае претензий со стороны заказчиков, исторические данные могут быть крайне важны для решения спорных моментов. В-третьих, заказчикам могут быть интересны подробные отчёты о работе службы за какой-то преиод времени.
  • Какая семантика доставки сообщений нужна в этом случае?
    • Ответ: Скорее всего, “ровно один раз”. Как минимум, недопустима потеря сообщений. Можно ли обойтись семантикой “не менее одного раза”? Возможно. Но это сильно усложнит алгоритмы анализа или как минимум часть системы анализа, отвечающую за получение сообщений от брокера.

Интернет вещей: “социальные” автоматы для продажи лимонада

Пусть компания “ООО”Сидоров и ко"" принадлежат автоматы для продажи прохладительных напитков. Допустим, руководство приняло стратегическое решение активной рекламы через отправку твитов и push-уведомлений со специальными предложениями находящимся поблизости потребителям. К тому же, если в одном из автоматов закончился товар, он должен рекомендовать другой автомат поблизости, который может предложить выгодную (в первую очередь компании) сделку.

  • Как повлияет на бизнес длительное отсутствие связи между звеньями сбора данных и анализа? Если та же система используется для управления запасами в торговых автоматах, как это изменит ответ на вопрос?
  • Данные за сколько дней можно потерять без последствий?
  • Должны ли храниться старые данные?
  • Какая семантика доставки сообщений нужна в этом случае?
    • Что случится, если одно сообщение будет обработано несколько раз? Если сообщение будет пропущено?

Электронная коммерция: рекомендация товаров

Компания “ООО”Петров и ко"" занимается торговлей модной одеждой и ищет способы повысить конверсию посетителей сайта в покупателей. Отдел маркетинга уверяет, что социальные методики крайне действенные. Вас нанимают спроектировать и разработать систему, которая будет показывать посетителям сайта, что другие посетители недавно положили в свою корзину или купили. Например, если я открыл страницу товара “джинсы”, а другие покупатели вместе с такими же джинсами клали в корзину ботинки или рубашку, то я должен увидеть рекомендацию в ключе

Десять человек только что купили этот/эти/эту <наименование товара, ссылка на страницу товара, изображение товара, etc> вместе с джинсами, которыми Вы интересовались

или

Пять человек добавили в корзину вместе с этими джинсами ещё и <наименование товара, ссылка на страницу товара, изображение товара, кнопка “добавить в корзину”, etc>

К системе можно предъявить следующие требования:

  • Отслеживать все покупки в (почти) реальном времени

  • Отслеживать корзины всех покупателей в (почти) реальном времени

  • На странице каждого товара показывать, какие товары недавно куплены вместе с ним

  • На странице каждого товара показывать, какие товары находятся вместе с ними в корзинах других покупателей

  • Как повлияет на бизнес длительное отсутствие связи между звеньями сбора данных и анализа?

  • Данные за сколько дней можно потерять без последствий?

  • Должны ли храниться старые данные?

  • Какая семантика доставки сообщений нужна в этом случае?