Потоковые данные. Получение данных.

Большие данные

Сейчас всё больше говорят о “Big Data”, феномене, появившемся в основном в конце первого десятилетия 21 века.

Этот феномен, прежде всего, связан с развитием информационных технологий, и по сути, отражает, во-первых, то, что объём информации, порождаемый процессами и индивидами, стабильно растёт, и, во-вторых, повышение интереса к обработке и анализу этих данных и наличие к этому технологических возможностей.

В качестве определяющих характеристик для “Big Data” традиционно называют “три V”: объём (volume), скорость (velocity) и разнообразие (variety).

  • Объём: информационный объём данных значительно превышает возможности простых систем хранения и требует распределённого хранилища.
  • Скорость: данные порождаются с высокой скоростью, нередко требующей распределённых систем для первичной обработки и сохранения.
  • Разнообразие: данные, представляющие интерес, могут быть полностью или частично структурированы и весьма разнообразны по содержанию и форме.

“Традиционный” подход к “Big Data” использует средства массово-параллельной обработки и распределённого хранения. Прежде всего для этого используются распределённые базы свободно структурированных данных категории NoSQL, алгоритм MapReduce и реализующими их фреймворками и библиотеками проекта Hadoop.

Необходимость параллельных и распределённых вычислений

На текущий момент, технологии микрочипов достигли того уровня развития, когда физические ограничения затрудняют технологическое развитие.

Не вдаваясь в теоретические подробности, дальнейшее повышение тактовой частоты вычислительных устройств на основе кремния крайне затруднено, и технологически крайне затратно. Как следствие, повышение скорости обработки данных на устройствах с последовательной моделью исполнения практически невозможно.

Потенциал для дальнейшего наращивания скорости обработки – в использовании параллельной модели исполнения, когда независимые последовательные устройства обрабатывают различные фрагменты данных.

Определённая трудность этого подхода в том, что при наличии зависимостей между данными, эффективность параллельной обработки снижается. В идеальном случае абсолютно независимых данных можно ожидать линейного роста производительности с ростом числа параллельно работающих устройств. На практике же, абсолютно независимых данных, представляющих интерес, достаточно мало. Как следствие, фактический рост производительности с наращиванием числа узлов описывается законом Амдала:

\[S(p) = \paren{\alpha+\frac{1-\alpha}{p}}^{-1},\] где \(\alpha\) – доля вычислений, которые могут быть произведены только последовательно ввиду наличия зависимостей.

В этой связи важным оказывается проектирование систем таким образом, чтобы параллельные ресурсы использовались максимально эффективно.

MapReduce

MapReduce – это модель распределённых вычислений, оригинально представленная компанией Google. Используется для высокопараллельных вычислений.

Работа MapReduce состоит из двух шагов:

  1. Map – шаг предварительной обработки входных данных. Для этого один из узлов распределённой системы (называемый главным, master node), получает входные данные задачи, разбивает их на части, и предаёт другим узлам (называемым рабочими, worker nodes).
  2. Reduce – свёртка предварительно обработанных данных. Главный узел получает ответы от рабочих узлов и на их основе формирует результат.

Основное преимущество MapReduce – в возможности распределённо производить операции предварительной обработки. Более того, если операция свёртки ассоциативна и коммутативна, операции свёртки тоже можно производить распределённо.

Кроме того, использование MapReduce даёт некоторую устойчивость к сбоям: если на одном из рабочих узлов возникает ошибка, главный узел может передать задачу другому узлу.

Более подробно модель MapReduce можно описать следующей схемой:

Hadoop

Hadoop – проект фонда Apache foundation, свободный набор утилит, библиотек и фреймворк, разработанный в рамках модели MapReduce.

На текущий момент, в Hadoop входят четыре модуля:

  • Common – инфраструктурные библиотеки и утилиты, используемые другими модулями
  • HDFS – распределённая отказоустойчивая файловая система
  • YARN – система планирования заданий и управления кластером
  • MapReduce – фреймворк для программирования и выполнения вычислений в рамках модели MapReduce.

Hadoop считается основополагающей технологией “Big Data”.

Hadoop разрабатывается на языке Java.

Основная цель Hadoop – обеспечить максимально высокую горизонтальную масштабируемость кластера, состоящего из (сравнительно) недорогих узлов.

Пакетная обработка

Долгое время основным подходом к обработке “Big Data” являлась так называемая “пакетная обработка”, чему немало способствовала структура MapReduce.

Основной смысл пакетной обработки в том, что данные сначала сохраняются в каком-то хранилище, потом – обрабатываются. Структура MapReduce предполагает, что главный узел имеет доступ ко всем данным, которые необходимо обработать – на этом основана возможность “справедливого” (равномерного) распределения задач по рабочим узлам.

На практике, у этого подхода есть несколько существенных недостатков:

  1. Необходимость хранения большого объёма необработанных данных
  2. Большие задержки между получением данных и их обработкой
  3. Невозможность непосредственной обработки “неограниченных” данных, т.е. данных, которые невозможно однозначно разделить на блоки.
  1. делает затруднительным применение этого подхода в системах, приближенных к “реальному времени”, т.е. системах, в которых результат нужно получить “почти сразу” после получения данных.

  2. усложняет получение однозначно корректных результатов в системах, предполагающих непрерывную работу.

Большинство реальных данных, представляющих интерес, представляют из себя не набор аккуратных блоков, а практически непрерывный поток событий.

Потоковая обработка

Потоковая обработка данных отказывается от той части модели MapReduce, которая предполагает наличие доступа к конечному набору данных: вместо этого данные рассматриваются как потенциально бесконечная последовательность событий, распределённая во времени. Такие последовательности событий математическим описываются формализмом временных рядов.

Потоковая обработка позволяет строить системы “мягкого реального времени”.

Системы реального времени, напомним, разделяются на

  • Жёсткого реального времени – нулевая терпимость к задержкам, превышающим микросекунды - миллисекунды
  • Мягкого реального времени – низкая терпимость к задержкам, превышающим миллисекунды - секунды
  • “Почти реального времени” – высокая терпимость к задержкам, превышающим секунды-минуты

Системы жёсткого реального времени обычно связаны с угрозой жизни и здоровью: они важны в медицине, автомобильной промышленности и т.п.

Системы мягкого реального времени допускают “отставание” от реального времени, но такие задержки могут приводить к финансовым и другим потерям. Такие системы используются в системах резервирования билетов, биржевых торгов, VoIP и пр.

Системы “почти реального времени” обычно применяются в случаях, когда задержки нежелательны, но не приводят прямо к финансовым потерям. К таким системам может относиться потоковое видео, домашняя автоматика, социальные сети и пр.

Система потоковой обработки данных
Вычислительная часть системы работает в режиме мягкого или “почти” реального времени, но клиенты потребляют данные не в реальном времени из-за сетевых задержек или просто потому, что приложение не запущено. Таким образом, имеем систему нежёсткого реального времени, которая должна делать данные доступными, когда клиентское приложение “хочет” их видеть.

Понятие системы потоковой обработки данных позволяет не задумываться о том, мягкое или “почти” реальное время в нашей системе и использовать одни и те же подходы в обоих случаях.

Общая архитектура потоковой системы приведена на диаграмме:

Рассмотрим эту схему на примере социальной сети Twitter:

  1. Сбор данных: отправленные сообщения собираются службами Twitter
  2. Очередь сообщений: Twitter задействует географически распределённые центры обработки данных, сбор сообщений, скорее всего, производится не там же, где анализ
  3. Анализ: сообщения подвергаются самым разным способам обработки. Как минимум, система должна определить, каким пользователям следует отправить это сообщение
  4. Долговременное хранилище: Мы не будем подробно останавливаться на этом моменте, но часто бывает необходимо сохранить результаты обработки данных, чтобы иметь возможность к ним вернуться при необходимости
  5. Хранилище в памяти: Недавние сообщения в Twitter почти наверняка хранятся в оперативной памяти узлов для ускорения доступа (и пока не будут записаны в долговременное хранилище)
  6. Доступ к данным: любой клиент (мобильный, браузер, etc) подключается к службам Twitter для получения сообщений

На этой диаграмме мы опустили аспекты безопасности данных. О них не стоит забывать, но чтобы не усложнять изложение, мы будем подразумевать, что все узлы распределённой системы, клиенты и источники данных аутентифицированы, авторизованы, и данные передаются по защищённым каналам связи.

Масштабирование

Мы уже вскользь упоминали о масштабировании, но обсудим немного более подробно.

Масштабирование – адаптация системы к изменяющимся требованиям.

На самом общем уровне есть два подхода:

  1. Вертикальное масштабирование
  2. Горизонтальное масштабирование

Вертикальное масштабирование – это наращивание мощности существующего оборудования путём добавления ресурсов. Оснащение серверов дополнительной памятью, процессорами или жёсткими дисками, или вообще замена серверов на более мощные – это вертикальное масштабирование. У вертикального масштабирования есть естественный предел, ограниченный максимально возможными количеством ресурсов для данной системы или на данном технологическом уровне.

Горизонтальное масштабирование – это наращивание мощности путём увеличения числа серверов. Горизонтальное масштабирование по сути не ограничено, но его эффективность для большинства задач падает с ростом числа узлов (по закону Амдала). Одно из преимуществ горизонтального масштабирования – его проще автоматизировать в соответствии с текущими требованиями. Облачные провайдеры предоставляют возможность автомасштабирования – когда ресурсы выделяются при необходимости, а при исчезновении необходимости – отключаются.

Современные системы больше ориентированы на горизонтальное масштабирование, в основном поскольку оно часто дешевле. Вертикальное масштабирование, само собой, тоже используется, но менее активно.

Сбор данных

Этап сбора данных – этап, на котором данные поступают в систему. Мы рассмотрим типичные подходы к сбору данных, вопросы масштабирования этого этапа и вопросы отказоустойчивости.

Типичные подходы к сбору данных

Независимо от протокола, используемого для отправки данных, в настоящее время используется не так много паттернов взаимодействия. Из можно разделить на следующие категории:

  • Запрос-ответ
  • Издатель-подписчик
  • Одностороннее взаимодействие
  • Запрос-подтверждение
  • Поток

Запрос-ответ

Возможно, самый простой паттерн. Клиент посылает запрос службе на выполнение какого-либо действие или предоставление каких-либо данных. Служба посылает ответ клиенту.

Этот паттерн является синхронным, и хорошо подходит в случаях, когда система должна предоставить ответ “немедленно”. Платой за простоту является то, что клиент вынужден дождаться ответа. В случаях, когда сервер не может ответить достаточно быстро, это может приводить к существенным задержкам.

Для преодоления этой трудности могут использоваться два подхода, на стороне клиента или на стороне сервера.

На стороне клиента используется подход асинхронных запросов-ответов:

В этом случае клиент отправляет запрос к серверу и продолжает заниматься другими делами, пока сервер обрабатывает запрос. Этот паттерн используется в современных браузерах: браузер отправляет асинхронные запросы на получение ресурсов и отображает их по мере поступления ответов. Такой подход иногда называют полуасинхронным, поскольку асинхронная обработка используется только на стороне клиента.

Этот же подход можно применить и на стороне сервера: сервер делегирует запрос исполнителю, по завершению отправляет клиенту ответ. Такая архитектура упрощает горизонтальное масштабирование:

Этот паттерн иногда называют полноасинхронным.

Запрос-подтверждение

В случаях, когда ответ службы нас в конечном итоге не интересует, и нужно лишь подтверждение того, что запрос получен, используется паттрен запрос-подтверждение.

В составе подтверждения могут возвращаться данные, используемые для последующих запросов, например – уникальный идентификатор запроса.

Бытовым примером паттерна запрос-подтверждение может быть заказ в интернет-магазине: при первичном создании заказа, клиенту выдаётся номер заказа. Выдача номера заказа означает только то, что заказ получен системой. При последующих запросах, у клиента могут спросить номер заказа. В автоматических информационных системах используется тот же принцип.

Издатель-подписчик

Издатель или производитель публикует сообщение, адресованное брокеру. Сообщения часто направляются в тему, которую можно считать логической группой сообщений. Затем сообщение направляется всем подписчикам или потребителям, подписанным на соответствующую тему.

В некоторых реализациях поток данных именно такой, как показано на схеме, когда брокер инициирует посылку подписчикам (модель push). Но есть и вариант, в котором подписчик инициирует этот процесс (модель pull).

Этот паттерн позволяет “разорвать” связь между отправителем и получателем информации (за счёт введения брокера), что для некоторых применений значительно упрощает потоки данных.

Одностороннее взаимодействие

Этот паттерн чаще всего встречается в ситуациях, когда отправитель запроса не нуждается в ответе. Этот паттерн аналогичен паттерну запрос-подтверждение, с той разницей, что клиент не получает подтверждения. И вообще никакого ответа.

Возникает вопрос при каких обстоятельствах может быть полезен такой паттерн. Ответ достаточно простой: если у клиента нет ресурсов и/или необходимости обрабатывать ответ. Такой подход неплохо работает для устройств “интернета вещей”, задачей которых является порождение данных с максимальной скоростью, но при этом не требуется гарантия доставки. Например, датчик температуры, посылающий текущее значение температуры раз в секунду. Поскольку температура почти никогда не меняется значительно за интервал времени в секунду, может быть не слишком важно если одно или несколько значений в этом потоке данных “потеряются”.

Поток

В этом случае клиент и сервер “как бы” меняются местами.

Принципиально паттерн “поток” похож на паттерн “запрос-ответ”, но есть следующие отличия:

  • Ответом на запрос является непрерывный поток данных
  • Инициатором подключения является служба сбора данных, а не клиент

Примером этого паттерна может служить, например, система предсказания акций компаний на основе сообщений в Twitter. Такая система сама должна подключаться к Twitter и “вытягивать” у него поток новых сообщений.

Этот паттерн часто используется для потребления общедоступных данных и создания новых потоков данных по результатам анализа.

Масштабирование паттернов взаимодействия

Масштабирование паттернов запрос-ответ

В случаях, когда паттерн не включает неявного состояния клиента (которое должно так или иначе храниться на сервере), паттерн запрос-ответ хорошо горизонтально масштабируется.

В отсутствие неявного состояния, любой клиент может подключиться к любому экземпляру службы и послать запрос. Как следствие, при необходимости легко можно добавить новые экземпляры службы, не меняя существующих.

Обычно между клиентами и экземплярами службы располагается балансировщик нагрузки, по какому-то алгоритму распределяющий запросы между активными экземплярами службы (число которых может автоматически изменяться по запросу от балансировщика)

Масштабирование паттерна “поток”

В рамках паттерна “поток” присутствует прямое соединение между клиентом и сервером, как следствие, масштабирование оказывается несколько сложнее.

В схеме выше только один узел сбора данных является активным, остальные три бездействуют.

В некоторых случаях возможно увеличить параллельность системы, добавив слой буферизации

Чтобы можно было поместить слой буферизации, необходимо, чтобы сообщения из потока не подвергались логической обработке, а сразу передавались слою буферизации. Для содержательной обработки может использоваться дополнительный слой сбора данных, который уже может горизонтально масштабироваться.

Отказоустойчивость

Какой бы паттерн взаимодействия ни использовался, один или несколько узлов сбора данных могут выйти из строя, ввиду программной ошибки или аппаратного сбоя. Независимо от причины, мы хотели бы компенсировать отказы и обеспечить надёжность узла сбора данных.

Сообщение от клиента иногда может быть невосстановимо, и не всегда есть возможность запросить повторную отправку данных. Иногда с потерей данных можно смириться, но не всегда.

Два основных подхода к отказоустойчивости это использование контрольных точек и протоколирование.

Рассмотрим сперва контрольные точки. Все протоколы контрольных точек обладают двумя характеристиками:

  • Глобальный снимок – требуется, чтобы где-то во внешней к узлам памяти регулярно сохранятся глобальный снимок состояния всей системы.
  • Возможность потери данных – гарантируется только восстановление системы в последнем сохранённом глобальном состоянии; все сообщения после этого состояния теряются

Бытовым примером такой системы может быть автосохранение в системах редактирования документов (Microsoft Office, Google Docs).

Использование контрольных точек для потоковой системы дополнительно осложняется тем, что система состоит из множества слоёв, и вообще говоря, создать непротиворечивый глобальный снимок состояния – нетривиальная задача.

Обратимся к протоколированию. Основная идея в следующем: если любое сообщение можно воспроизвести, то система может достичь глобально непротиворечивого состояния без создания глобального снимка.

Каждое звено системы независимо записывает все полученные им сообщения и воспроизводит их после аварии. Это освобождает нас от необходимости поддерживать глобальное состояние. Рассмотрим три подхода:

  1. Протоколирование на стороне получателя
  2. Протоколирование на стороне отправителя
  3. Гибридный подход

Протоколирование на стороне получателя

Безаварийный путь:

  1. Клиент отправляет сообщение
  2. Регистратор RBML (receiver-based message logging) получает сообщение от производителя данных и отправляет его в хранилище
  3. Сообщение записывается в надёжное хранилище
  4. Сообщение обрабатывается в узле
  5. Сообщение отправляется в очередь сообщений

Важно, что сообщение сохраняется до того, как с ним происходит что-то ещё.

После аварии поступление входных сообщений на узел прекращается, этот узел выводится из ротации балансировки нагрузки. Регистратор RBML читает не обработанные сообщения из хранилища и передаёт их обработчику, как будто ничего не произошло. После того, как все сохранённые сообщения будут обработаны, узел считается восстановленным и возвращается в ротацию.

Протоколирование на стороне отправителя

Протоколирование на стороне получателя защищает от собственных отказов узла. Протоколирование на стороне отправителя защищает от отказов нижестоящей стадии и отказов сети.

Сообщение сохраняется непосредственно перед отправкой.

Возникает один нюанс: как узнать, что сообщение обработано следующим звеном? Есть несколько способов. Следующее звено может возвращать подтверждение того, что сообщение получено. Если протокол взаимодействия не предполагает отправки, то факт отсутствия ошибки можно считать таким подтверждением.

Поток данных в случае ошибки показан на рис:

Гибридное протоколирование

Совместное использование SBML и RBML даёт предотвращение потери данных и повышения надёжности, однако ценой повышенных накладных расходов. Для уменьшения накладных расходов может использоваться гибридный метод:

Во-первых, экземпляры надёжного хранилища объединены. Это очевидная оптимизация. Во-вторых, запись в хранилище на стороне регистратора RBML – асинхронная, что позволяет не ожидая записи в хранилище начать обработку. Это, однако, усложняет обработку ошибок.