Анализ потоковых данных

Теперь поговорим о том, ради чего в первую очередь всё затевалось. Сперва обсудим общие подходы к анализу данных, затем более подробно поговорим о математическом аппарате, используемом для анализа. Здесь интересуют:

  • анализ данных в движении
  • архитектура потоковой обработки
  • основные функции фреймворков с точки зрения анализа

Основной смысл компонента анализа в том, чтобы получать данные от компонента сбора данных и делать их доступными для других компонентов потоковой архитектуры. В этой точке системы, данные готовы к потреблению и ожидают, когда над ними произойдут мистические преобразования, которые сделают их полезными с точки зрения задач, решаемых системой. Сначала посмотрим на общую архитектуру, затем обсудим какие именно волшебные преобразования можно совершать над данными.

Пока отложим обсуждение того, куда после анализа попадают результаты, и сосредоточимся на получении и обработке.

Анализ данных в движении

Для понимания обсуждаемой функциональности, необходмо понимать концепцию данных в движении (in-flight data) и непрерывных запросов. Под данными в движении понимаются все кортежи в системе, от источника (очереди сообщений) до клиента на выходе. Под данными “в покое” понимаются данные, записанные в постоянное хранилище. Данные “в движении”, соответственно – это данные, которые находятся в процессе обработки, или только что обработаны.

Наша цель – как можно более быстро получить (“вытянуть”) данные из компонента очереди сообщений. В идеале, в потоковой системе, анализ данных должен выдерживать тот темп, в котором данные поступают от компонента сбора данных. Основное отличие от “традиционной” (непотоковой) системы, например, построенной на основе “традиционной” СУБД (реляционной или нереляционной, напр. Hadoop, HBase, Cassandra и др.) в том, что в “традиционной” системе мы делаем меняющиеся запросы к (почти) статичным данным и получаем ответы. В потоковой системе, всё наоборот – под фиксированный “запрос” (алгоритм обработки) подаются динамические данные. Такая модель называется моделью непрерывного запроса.

Представим, что система управляет новостным агентством и анализирует, пользуется ли статья популярностью или не сломались ли ссылки. Это нужно, чтобы своевременно корректировать маркетинговые компании или исправлять сайт. При использовании технологий на основе СУБД можно было бы подойти к задаче следующим образом:

  1. Собрать данные с сайта
  2. Загрузить данные в СУБД
  3. Выполнить запрос к СУБД и узнать, работают ли ссылки и пользуются ли статьи популярностью
  4. Предпринять соответствующие действия (например, оповестить администратора сайта, отдел маркетинга, скорректировать алгоритмы рекламных сетей)
  5. Повторять каждые x минут. А скорее, часов или даже дней.

Сравним с подходом в потоковой системе:

  1. Собрать поток данных
  2. Запустить запрос на активном потоке данных
  3. Предпринять соответствующие действия

Думаю, понятно, что традиционная система заметно проигрывает потоковой в отзывчивости. С современным стремлением и бизнеса и конечных потребителей получить “всё и сразу”, возможность реагировать на изменения в режиме реального времени может быть важным конкурентным преимуществом.

В потоковой системе пользователь (или приложение) регистрирует запросто, который выполняется при каждом поступлении данных или с заранее заданным (коротким) интервалом. Результат выполнения сразу по мере доступности отправляется клиенту.

Ещё раз отметим ключевые различия:

  • В традиционной системе, активная сторона – пользователь, желающий получить ответ на вопрос. Он отправляет пассивной стороне (системе) запрос, та возвращает ответ. Ответ всегда основан на данных, загруженных в систему на момент регистрации запроса, т.е. набор данных по сути статический.
  • В потоковой системе, запрос запускается и “непрерывно” (по таймеру или по мере поступления данных) выполняется для поступающих данных. Ответ на запрос проталкивается следующему звену (пользователю или приложению). Здесь условно-пассивной стороной является запрос и клиент, создавший этот запрос, а активной – собственно, система получения и анализа данных.

Таким образом происходит инверсия потока упралвения.

Отдельно следует отметить различие в обработке сбоев:

  • Когда традиционная система не работает, данные не изменяются.
  • Когда компонент анализа в потоковой системе не работает, приложения продолжают генерировать данные. После сбоя может понадобиться наверстать упущенное.
  • Если традиционная система отказывает в момент выполнения запроса, то повторное выполнение запроса – на совести приложения (или пользователя)
  • Если потоковая система отказывает, после восстановления непрерывные запросы могут быть продолжены с прерванного места (хотя это не всегда необходимо). Часто с точки зрения клиента, всё выглядит так, как будто никакого сбоя вообще не было.

Приведём несколько примеров успеха:

  • Маркетинг на основе поведения. В 2015 году, компания McDonald’s с помощью платформы VMob (сейчас переименована в Plexure) внедрила в Нидерландах, Швеции и Японии персонализированную маркетинговую платформу, рассылающую специальные предложения пользователям на основе местоположения, погоды, запомненных привычек и предпочтений. В одном исследовании приводится статистика, по которой, в результате внедрения этого решения, конверсия увеличилась на 700% (в 8 раз!).
  • Повышение эффективности дорожного движения. Компания Veovo использует дорожные датчики транспортных средств и другие источники данных о состоянии дорожного движения для оптимизации распределения нагрузки на дорожную сеть. Реальные “истории успеха” о решении различных транспортных проблем можно найти на https://veovo.com/experiences/traffic-management/.
  • Обнаружение мошенничества. Согласно отчёту компании FICO, после внедрения в США анализа данных в реальном времени, потери от мошенничества с кредитными картами снизились на 70% (поскольку FICO продаёт решения для обнаружения мошенничества, эту цифру возможно следует воспринимать с некоторой долей скептицизма, но сложно отрицать значительный положительный эффект)

Архитектуры распределённой обработки потоков

Компонент анализа может работать и на одном узле, но реальность такова, что объем и скорость поступления данных рано или поздно выходят за рамки возможностей такого подхода. Представим, например, что нас интересует не популярность статьи, а анализ работы газовой турбины с целью обнаружения признаков неполадок. По данным компании General Electric, одна турбина порождает около 1 ТБ данных в час. Это примерно 2.2 гигабита в секунду. Понятно, что единичные системы обработки и хранения не справятся с таким потоком информации, поэтому сразу будем обсуждать общие подходы к распределённым системам.

На рынке существует немало технологий для обработки потоков. Из популярных продуктов с открытым исходным кодом можно отметить:

  • Spark Streaming
  • Storm
  • Flink
  • Samza

Все перечисленные проекты являются проектами фонда Apache. Пока не будем вдаваться в детали каждого из этих проектов, но кратко обсудим, какое место они занимают в обобщённой потоковой архитектуре.

Обсудим общую архитектуру.

Драйвер приложения
Присутствует не во всех потоковых системах. Это клиентский код, который определяет, как программируется потоковая обработка и взаимодействует с потоковым диспетчером. Например, в фреймворке Spark Streaming, клиенский код разбивается на две логические части: драйвер и потоковый алгоритм (называемый задачей – task). Драйвер отправляет задачу потоковому диспетчеру и, возможно, собирает результаты после обработки и управляет временем жизни задачи.
Потоковый диспетчер
Этот компонент несёт ответственность за передачу потоковой задачи одному или нескольким потоковым процессорам. В некоторых случаях он также управляет ресурсами, необходимыми потоковым процессорам.
Потоковый процессор
Здесь выполняется основной алгоритм. Конкретный вид потокового процессора зависит от платформы, но суть одинаковая – это компонент, выполняющий алгоритм обработки.
Источники данных
Представляют входные (и возможно выходные) данные потоковой задачи. Одни платформы позволяют подавать на вход задачи данные из нескольких источников, другие ограничиваются единственным источником. Иногда данные собирает драйвер, иногда они записываются в другой источник данных, чтобы ими могла воспользоваться другая система или задача. Примерами источников могут быть Twitter, Интернет вещей, сеть, файл, и т.п.

Вернёмся к примеру газовой турбины и посмотрим, как эта задача ложится на общую архитектуру:

Spark Streaming

Система Spark Streaming построена поверх Apache Spark – универсальной системы распределённых вычислений. Эта платформа поддерживает несколько разных языков (Java, Scala, Python, R) и кроме, собственно, Streaming, на её основе построены:

  • MLib – средство для машинного обучения
  • SparkR – интеграция с языком статистического анализа R
  • GraphX – средство распределённой обработки графовых данных

Архитектура Streaming в общих чертах выглядит следующим образом:

В основной программе, которая здесь выполняет роль потокового диспетчера и драйвера, находится так называемый потоковый контекст StreamingContext, который берёт на себя большинство функций диспетчера потоков. Не вдаваясь в детали, StreamingContext содержит логику, необходимую для отслеживания поступающих данных, подготовки потоковых задач, планирования их размещения на исполнителях и выполнения. Spark Streaming, поскольку построен на основе Spark, оперирует не потоками, а пакетными работами. В случае Streaming, пакет представляет собой данные за период времени (например, данные в скользящем окне) и может планироваться до нескольких раз в секунду. Под задачей понимается логика программы, упакованная и переданная исполнителям. Основная идея та же, что и в Hadoop MapReduce. Исполнители Spark могут работать на различном числе машин (от одной до тысяч), именно на них выполняется задача (алгоритм обработки потока). Исполнители получают данные из внешнего источника и взаимодействуют с “контекстом” StreamingContext, который является частью драйвера.

Apache Storm

Storm – система потоковой обработки, работающая по принципу “один кортеж за раз”. Она спроектирована для обработки потоков данных в режиме реального времени.

Обзорно, архитектура Storm может быть представлена следующим образом:

В отличие от Hadoop и Spark Streaming, Storm оперирует понятием “топология” вместо “задача”. Разница в том, что “задача” рано или поздно заканчивается, а “топология” описывает бесконечные потоки данных между обработчиками. В конечном итоге обе системы обеспечивают развёртывание программы на узлах-исполнителях.

Топология подаётся на вход компоненту, называемому Nimbus. Этот компонент решает, как развернуть топологию на супервизорах, назначает супервизорам задания и ведёт мониторинг отказов. Топология получает данные из источников данных (в том числе от других супервизоров).

В терминах Storm, источники данных называются Spout (струя), а обработчики, выполняемые на супервизорах – Bolt (в данном случае “разряд”).

Apache Flink

Apache Flink – потокоориентированная система. В ней всё рассматривается как поток. Программа для Flink состоит из двух типов строительных блоков: потоки и преобразования. Поток можно рассматривать как промежуточный результат, получаемый в результате движения данных от источника к “стоку” (приёмнику). Преобразователем является любая операция, принимающая один или несколько потоков и порождающая на выходе один или несколько потоков (всё – поток).

Приложение для Flink, составленное из этих частей, само по себе рассматривается как поток данных, начинающийся с поступления данных из источника(ов), содержащий одно или несколько преобразований, и заканчивающийся сохранением данных в одном или нескольких стоках. Следует отметить, что такая архитектура позволяет легко производить композицию различных приложений (сток одного назначается источником другого).

Приложения Flink работают в распределённой среде, состоящей из одного главного узла и нескольких исполнителей.

Поверхностно, архитектуру можно обрисовать следующим образом:

Apache Samza

Потоковая модель Samza несколько отличается, в том плане, что использует поэтапную модель обработки потока. Для этого используются технологии Apache YARN и Apache Kafka.

YARN (Yet Another Resource Negotiatior) – это диспетчер кластера, который управляет ресурсами, планирует и мониторит задачи. Он отвечает за выделение ресурсов (процессоров, памяти, дисков, сети, etc) всем приложениям, работающем в кластере. Компонент планирования и мониторинга задач отвечает за выполнение задачи на кластере.

В рамках Samza, YARN управляет распределением ресурсов между исполнителями Samza.

Вход и выход заданий опосредован технологией Kafka, чётко укладывающейся в рамки обсуждения очередей сообщений. Пока можно считать, что Kafka – это высокоскоростное хранилище, откуда читаются данные и куда записываются результаты обработки.

Ключевые функции систем потоковой обработки

В рамках анализа потоковой архитектуры можно использовать различные системы потоковой обработки. Обратим внимание на несколько ключевых функций, которые следует принимать во внимание при сравнении таких систем и выборе подходящей для решения задачи.

Семантика доставки сообщений

Семантика доставки сообщений похожа на семантику доставки сообщений в очередях. Определения будут такими же, но выводы будут отличаться.

В качестве напоминания, различные семантики:

  • Не более одного раза – сообщение может потеряться, но никогда не будет обработано дважды
  • Не менее одного раза – сообщение не может потеряться, но может быть обработано несколько раз
  • Ровно один раз – сообщение не может потеряться, и никогда не будет обработано дважды

В случае семантики “не более одного раза”, возможны две возможные ошибки:

  • потеря (отбрасывание) сообщения – некоторые сообщения могут отбрасываться обработчиком (некорректные, либо если обработчик не успевает их обработать)
  • отказ обработчика – в таком случае, сообщения будут теряться, пока обработчик “лежит”

Семантика “не более одного раза” – самая простая из того, что может предложить система.

Семантика “не менее одного раза” более сложная, поскольку потоковая система должна отслеживать каждое сообщение и подтверждение получения. Если потоковый диспетчер решит, что сообщение не было обработано, то сообщение должно быть отправлено снова. При таком подходе, потоковая задача может получить одно и то же сообщение несколько раз, поэтому в идеале, задача должна быть идемпотентной, т.е. многократная обработка одного и того же сообщения должна давать тот же результат, что и обработка этого сообщения единожды.

Семантика “ровно один раз” ещё увеличивает сложность системы. Кроме учёта всех отправленных сообщений, система должна ещё и обнаруживать дубликаты. Это, конечно, упрощает работу задачи. Однако, вообще, хотя идемпотентность задачи не обязательна, лучше всё равно делать потоковые задачи идемпотентными, тогда проще о них думать и находить проблемные места.

Какие именно гарантии нужны в каждом конкретном случае зависит, понятно, от характера решаемой задачи.

Например, возвращаясь к примеру с газовой турбиной, которая генерирует терабайт данных в час. Потенциально системе нужно вести мониторинг тысяч таких турбин. Насколько необходимо исключить возможность потери всех сообщений? Все ли данные нужны алгоритму прогнозирования отказов? Если алгоритм способен работать на частичных данных, возможно имеет смысл выбрать стратегию “не более одного раза” как самую простую (а значит наименее затратную).

А Если характер задачи таков, что на основе потокового запроса принимается решение о финансовой операции? Например, система обслуживает рекламную сеть и выставляет клиентам счета в режиме реального времени. В таком случае требуется семантика “ровно один раз”.

Управление состоянием

Если потоковый алгоритм более сложный, чем чистая обработка текущего сообщения (без зависимостей между сообщениями или от внешних данных), то возникает необходимость хранения некоего состояния, и, весьма вероятно, потребуется служба управления состоянием, предоставляемая выбранной системой.

Представим, что в рамках анализа маркетинга в интернет-магазине, мы хотим знать, сколько страниц в час просматривает каждый посетитель. Оставим пока в стороне то, что эту задачу можно было бы решить при помощи пакетной обработки, и подумаем над тем, как реализовать эту задачу в рамках потоковой модели.

Состояние здесь хранится в том потоковом обработчике, где производится подсчёт посещений. Если средство потокового анализа не поддерживает управление состоянием, то один из возможных вариантов – хранить данные в памяти и производить сброс раз в час. Это будет работать, если полная потеря данных при крахе обработчика не является проблемой. Ну и конечно рано или поздно по закону Мерфи, обработчики начнут сыпаться на 59-й минуте каждого часа. В зависимости от характера бизнеса, такой риск может оказаться приемлемым. Однако, обычно, всё не так просто, и приходится думать ещё и об управлении состоянием.

Различные системы потоковой обработки предоставляют механизмы управления состоянием. Если посмотреть на разные концы спектра сложности и функциональности, то на одном будет хранение в памяти, которое мы только что обсуждали, а на другом – хранение в реплицированном постоянном хранилище с эффективным выполнением запросов.

Сложные системы позволяют делать более интересные вещи, чем простые, например, соединять два потока данных. Представим себе, что мы управляем системой показа рекламы и учёта кликов по и показов рекламных объявлений. В результате имеем два потока данных – клики и показы.

Если данные о показах и кликах поступают в двух отдельных потоках, нам хотелось бы соединять клики с соответствующими им показами (для целей анализа). Понятно, что клики будут отставать от показов, возможно значительно. Тем не менее, если использовать систему потоковой обработки, которая хранит состояние в постоянном хранилище с возможностью запросов, мы можем сохранять там показы, а когда поступают соответствующие клики, возвращать один результат. Мы вернёмся к этой увлекательной теме в следующий раз.

Отказоустойчивость

Было бы прекрасно, если бы всё всегда работало и ничего не ломалось. К сожалению, в реальности всё обычно ровно наоборот: вопрос не в том, сломается что-то или нет, а в том, когда это произойдёт. Способность системы в целом продолжать работу в условиях отказа компонентов или ресурсов – прямое следствие реализованных в ней механизмов отказоустойчивости.

В компоненте анализа данных (соответствующей общей архитектуре, которую мы рассматриваем) можно назвать семь возможных точек отказа:

  1. Входной поток данных. Очередь сообщений не контролируется подсистемой анализа, но очередь может отказать, и система анализа не должна “падать” вслед за очередью из-за недоступности данных/ресурсов.
  2. Сеть передачи входного потока. Аналогично, система должна корректно обрабатывать перебои связи
  3. Потоковый обработчик. Здесь выполняется наш код, и выполнение должно контролироваться системой анализа. Если в коде алгоритма что-то пойдёт не так, скажем, из-за программной или аппаратной ошибки, то потоковый диспетчер должен перезапустить обработчик или перенести обработку на другой узел.
  4. Связь с местом назначения. Если место назначения для результата одного или нескольких обработчиков вдруг становится недоступно, потоковый диспетчер должен управлять потоком данных, чтобы процессоры не “захлебнулись” от резко возросшего противодавления и не “упали” ввиду недоступности ресурса или сети.
  5. Место назначения. Не контролируется потоковым диспетчером, но отказы должны корректно обрабатываться
  6. Потоковый диспетчер. Отказ потокового диспетчера оставит нас без координатора. Здесь тоже нужна отказоустойчивая система (например, с резервированием и автосогласованием замены)
  7. Драйвер приложения. Если драйвер только отправляет задачи диспетчеру, это не слишком интересно с точки зрения отказоустойчивости (подход “попробуйте ещё раз” здесь сработает). Если же он объединён с потоковым диспетчером, то он подвержен тем же рискам.

Какие возможны решения для данных проблем? Для начала условимся, что отказы со стороны входящего потока и места назначения (и связи с ними) должны корректно обрабатываться и связь должна автоматически восстанавливаться при первой возможности – в остальном компонент анализа никак не может повлиять на эти части системы. Если объединить оставшиеся элемент, то получится две основных моды отказов:

  • Потеря данных. Сюда относится потеря данных при передаче, и авария обработчика (теряются данные в памяти)
  • Утрата контроля. Авария потокового диспетчера и/или драйвера.

О способах предотвращения потери данных мы уже говорили, когда обсуждали очередь сообщений. Здесь подходы будут в целом такими же.

Что касается утраты контроля, в системах потоковой обработки, все методы обработки ошибок сводятся к той или иной форме репликации и координации. Механизмы отказоустойчивости обычно проектируются с расчётом на k одновременных отказов, поэтому можно услышать выражение “k-отказоустойчивая система”.

В общем случае существует два подхода к репликации и координации:

  • Резервирование
  • Восстановление откатом

Оба основаны на предположении, что алгоритм обработки детерминированный, т.е. если два правильно работающих одинаковых обработчика получают одинаковые входные данные в одном и том же порядке, то они порождают одинаковые результаты в одном и том же порядке.

В случае похода на основе резервирования, потоковый диспетчер реплицирует задачу на нескольких независимых узлах и координирует реплики, посылая всем одни и те же входные данные в одном и том же порядке. Этот подход требует кратное число дополнительных ресурсов, но восстановление возможно с минимальным временем простоя. Для приложений, которые должны работать в приближенном к реальному времени (“мягком” реальном времени), дополнительные затраты на ресурсы могут быть оправданы.

В случае восстановления откатом, потоковый обработчик периодически сохраняет состояние вычисления в так называемую контрольную точку, которая потом передаётся на другой узел или сохраняется в постоянное хранилище, а операции между контрольными точками журналируются. В случае краха, потоковый диспетчер восстанавливает состояние из контрольной точки и повторяет операции из журнала – тогда задача будет переведена в состояние, предшествовавшее аварии. По сравнению с первым подходом, у этого решения ниже накладные расходы, но восстановление обходится дороже (т.е. занимает больше времени). Этот подход хорошо работает в случаях, когда отказоустойчивость важна, но с редкими умеренными задержками можно смириться.

В различных системах потоковой обработки, если вообще предлагается какой-то вариант отказоустойчивости, обычно предлагается вариант одного из этих двух подходов.