В предыдущей лекции были рассмотрены основные характеристики потоков данных, семантика доставки и отказоустойчивость. Теперь мы сосредоточимся на вопросах получения информации из потоков данных, а именно потоковой аналитикой или data mining.
В потоковых системах встречаются два типа запросов:
- Ситуативные запросы. Это однократные вопросы о потоке – например, какое максимальное значение, встретившееся в потоке. Такие запросы аналогичны запросам в РСУБД.
- Непрерывные запросы. О них мы говорили в предыдущей лекции. Это запросы, выполняемые, пока поток существует. Например, может “непрерывно” (с небольшим интервалом или при поступлении новой информации) вычисляться максимальное значение в потоке и генерироваться оповещение, если значение превышает пороговое значение.
В различных системах потоковой обработки данных существуют свои средства получения информации о данных в потоке. Некоторые системы предоставляют SQL-подобный язык запросов. Другие требуют написания программных обработчиков.
На момент написания, системы Storm, Samza, Flink и Spark Streaming все поддерживают в той или иной мере запросы SQL, однако полнота реализации и уровень поддержки может отличаться. Все проекты бурно развиваются, и информация может устаревать очень быстро, но на момент написания ситуация следующая:
- Apache Storm конвертирует SQL-запросы в топологии и непосредственно выполняет эти топологии на кластере. Эта возможность считается экспериментальной, и детали могут быть изменены без предварительного предупреждения.
- Apache Samza поддерживает только простые SQL-запросы, использующие выборку и проекцию. Агрегация, соединения и срезы по времени на момент написания находятся в разработке.
- Flink поддерживает SQL и табличный программный интерфейс. Обе возможности помечены как “в активной разработке” и “не в полной мере реализованные”.
- Spark Streaming поддерживает SQL-подобный язык запросов Spark SQL, основанный на механизме DataFrames. Поскольку Spark по сути реализует потоковую обработку поверх пакетной, запросы выполняются над отдельными “пакетами”, что приходится учитывать.
Рассмотрим общие соображения касательно алгоритмов обработки данных в различных системах. Сначала обсудим ограничения, которые на нас накладывает потоковая модель обработки
Ограничения потоковой обработки
Одной из характерных систем потоковой обработки, как обсуждалось ранее, является тот факт, что часто невозможно сохранить “весь” поток – во-первых, он не ограничен, а во-вторых, даже за сравнительно небольшие промежутки времени (напр. несколько суток), объём данных может быть очень высок. Как следствие, при обработке потоковых данных, необходимо выдавать результаты запросов по мере поступления данных в режиме “он-лайн”. При проектировании алгоритмов потоковой обработки данных следует иметь ввиду следующие ограничения:
- Однопроходность. В общем следует исходить из того, что данные не архивируются, как следствие, может быть только один шанс их обработать. Это оказывает существенное влияние на конструкцию алгоритмов. Многие “традиционные” алгоритмы data mining подразумевают несколько проходов по данным. Для потоковой обработки такие алгоритмы непригодны и требуют модификации.
- Изменение концепции. По мере поступления новых данных, могут изменяться их статистические свойства. Часто это необходимо учитывать и подстраивать прогностическую модель “на лету”.
- Ограниченность ресурсов. Для многих потоков, сложно или даже невозможно контролировать темп поступления данных. Иногда из-за кратковременных пиков в скорости потока, ресурсов обработчиков оказывается недостаточно и алгоритм вынужден отбрасывать часть данных, которую не успевает обработать. Это называется сбрасыванием нагрузки. Таким образом, обрабатываемые данные не обязательно полны, и это необходимо учитывать.
- Ограничения предметной области. В зависимости от решаемой задачи, характерные объёмы данных могут далеко выходить за рамки технически возможного. Можно сказать, что это случай ограниченности ресурсов, но в данном случае это характеристика самой решаемой задачи.
Из-за этих ограничений, практически всегда используется та или иная форма конспектирования данных.
О времени событий
В потоковой системе, данные могут приходить с задержкой или приходить в порядке, отличном от хронологического. Существует несколько подходов к учёту времени при анализе потока данных.
Время потока и время события
Разделяют два различных аспекта времени в потоковых данных:
- Время потока характеризует момент времени, когда событие (данные) попадает в систему потоковой обработки.
- Время события характеризует время, когда событие непосредственно произошло.
Понятно, что время события всегда меньше (раньше) времени потока. Разница между ними называется временным сдвигом.
Наличие временного сдвига может оказывать существенное влияние на алгоритм вычислений. При обсуждении следующей темы помните о временном сдвиге.
Оконные методы
Оконные методы в общем ограничиваются рассмотрением некоторого временного среза данных, и на основе этого окна строят какие-то локальные результаты.
“Окно” представляет собой некоторую небольшую часть потока данных, ограниченную по какому-то критерию. В потоковых системах этот критерий определяется двумя политиками:
- Политикой срабатывания. Политика срабатывания определяет правила, по которым начинается обработка данных, находящихся в окне. Обычно это реализуется уведомлением пользовательского кода о том, что данные в окне необходимо обработать.
- Политикой вытеснения. Политика вытеснения определяет, когда очередной находящийся в окне элемент пора оттуда убрать.
В основе и той и другой лежит либо время, либо объём данных в окне. В случае объёма данных, всё достаточно просто. В случае времени, ситуация становится несколько сложнее из-за наличия временного сдвига.
Скользящее окно
В методе скользящего окна, политики вытеснения и срабатывания основаны на времени. Основные определяющие факторы – это протяжённость окна и интервал скольжения.
Протяжённость окна определяет политику вытеснения: это время, в течение которого данные хранятся в окне и доступны для обработки.
Интервал скольжения определяет политику срабатывания: собственно, интервал скольжения по сути есть интервал срабатывания.
Поддержка в системах потоковой обработки разнится:
- Storm поддерживает скользящие окна с временем потока или временем события. Политики определяются протяжённостью окна и интервалом скольжения.
- Flink поддерживает скользящие окна с временем потока или временем события. Поддерживаются пользовательские политики срабатывания и вытеснения. Возможна интеграция с Apache Beam.
- Samza поддерживает оконные методы с временем события на основе Apache Beam, и с временем потока на низком уровне.
- Spark Streaming поддерживает оконные методы с временем потока. Возможна интеграция с Apache Beam.
Отдельно имеет смысл упомянуть про фреймворк Apache Beam, позволяющий определять алгоритмы (“пайплайны”) обработки данных на различных системах используя однородный API.
Даже если фреймворк не поддерживает скользящие окна, их можно реализовать самостоятельно. Конечно, это может подразумевать довольно большой объём работы.
Прыгающее окно
Прыгающие окна могут работать по времени или по счётчику. В любом случае, основное отличие от скользящего окна в том, что каждое событие принадлежит только одному прыгающему окну.
Flink и Storm поддерживают прыгающие окна по счётчику и по времени. Samza только по времени. Spark Streaming поддерживает прыгающие окна по времени при работе с API Structured Streaming.
Методы обобщения
Поскольку потоковых данных может быть очень много, часто нужно перед ресурсозатратной обработкой обобщить данные, т.е. уменьшить их объём при этом (в идеале) сохранив основные закономерности.
Поскольку мы не знаем, закончится ли поток, и не можем рассчитывать, что сохраним его целиком в памяти. Поэтому получить абсолютно точные ответы на вопросы о данных в потоке крайне сложно. Обычно достаточно приблизительного ответа (с высокой степенью достоверности). Само собой, иногда необходим точный ответ, но для его получения скорее всего придётся пожертвовать скоростью обработки.
Случайная выборка
Случайную выборку из потока может быть нужно произвести для статистического анализа в реальном времени. Идея достаточно проста: если мы пытаемся описать некоторый стохастический процесс, то достаточно большая равномерно-распределённая случайная выборка событий из этого процесса будет иметь такие же статистические свойства, как и исходный процесс.
Некоторая трудность возникает при обсуждении потоковых систем, поскольку сразу может быть не очевидно, как произвести случайную выборку из набора данных, которых нет ни в памяти, ни на диске, и как удостовериться, насколько эта выборка случайна?
Типичный ответ на этот вопрос – резервуарная выборка. Идея в том, чтобы хранить заранее определённое (возможно достаточно большое) число значений из потока (это хранилище и есть “резервуар”).
Если размер резервуара \(k\), то первые \(k\) значений из потока помещаются в резервуар. Для каждого следующего после \(k\) значения, оно помещается на случайное место в резервуар с вероятностью \(k/i\), где \(i\) – номер значения (начиная с 1). После завершения работы алгоритма, в резервуаре будет находиться равномерная случайная выборка из исходного распределения.
Существует две основных реализации этого алгоритма: простая и эффективная.
Простая реализация состоит в следующем:
function reservoirSample(S, R) {
for i := 1 to k{
:= S[i]
R[i]
}for i := k+1 to n {
j := randomInteger(1, i)
// равномерное случайное целое
// в диапазоне [1, i]
if j <= k {
:= S[i]
R[j]
}
} }
Для каждого элемента источника после \(k\), генерируется случайное равномерно распределённое целое значение на отрезке \([1, i]\), и если это значение является индексом резервуара, соответствующее значение в резервуаре заменяется на новое, в противном случае новое значение отбрасывается.
В контексте потоковой обработки данных, алгоритм можно немного расширить, и возвращать значение в резервуаре, которое должно быть заменено, в качестве “элемента случайной выборки”. После того, как алгоритм поработает достаточное время, выборка в резервуаре “достаточно случайная”, чтобы возвращаемое таким образом значение можно было считать элементом случайной выборки.
Более эффективный алгоритм, вместо того, чтобы вычислять случайное число для каждого элемента, вместо этого заранее вычисляет, сколько входящих элементов будет отброшено:
function ReservoirSample(S, R) {
for i = 1 to k
:= S[i]
R[i]
// random() -- случайное число на интервале (0,1)
W := exp(log(random())/k)
while i <= n {
i := i + floor(log(random())/log(1-W)) + 1
if i <= n {
randomInteger(1,k)] := S[i]
R[W := W * exp(log(random())/k)
}
} }
Подсчёт уникальных элементов
Подсчёт числа уникальных элементов в случае потоковой обработки в конечном итоге всегда основан на вероятностных алгоритмах. Их можно отнести к двум категориям:
- На основе битовых комбинаций. Такие алгоритмы основаны на рассмотрении комбинаций битов, встречающихся в начале двоичного представления хэша элемента потока.
- На основе порядковых статистик. Алгоритмы этого класса основаны на порядковых статистиках, например на основе минимального значения, встретившегося в потоке. Например, к этому классу относятся алгоритмы MinCount и Бар-Йоссефа.
На практике, обычно используют алгоритмы на основе битовых комбинаций. Наиболее популярными представителями этого класса являются алгоритмы HyperLogLog и HyperLogLog++. Концептуально они одинаковы.
В общих чертах алгоритм можно описать следующим образом:
- Входящее значение передаётся “хорошей” хэш-функции.
- Полученный хэш-код преобразуется в двоичную строку.
- \(m\) младших бит используются для выбора ячейки хэш-таблицы, которая будет обновлена
- В ячейку хэш-таблицы записывается максимум текущего значения и длины префикса битовой строки вида
0...01
(1 если нет ведущих нулей) - Для определения приближённой кардинальности потока (числа уникальных элементов) вычисляется среднее гармоническое по всем значениям в хэш-таблице. \[H(x_1,\ldots,x_n) = \frac{n}{\sum_{i=1}^n x_i^{-1}}\]
Число \(m\) называется точностью и определяет погрешность в смысле стандартной ошибки (которая составляет около \(\frac{1.3}{\sqrt m}\))
Говоря о HyperLogLog, следует также упомянуть, что он поддерживает распределённые вычисления, так как достаточно легко вычислить объединение двух хэш-таблиц.
Частота
Аналогичный статистический вопрос, сколько раз встречается элемент X?
Самый популярный алгоритм для ответа на такого рода вопросы – Count-Min sketch.
В общих чертах алгоритм выглядит следующим образом:
В CM sketch используется набор числовых массивов, называемых счётчиками. Число таких массивов определяется шириной w, а размер каждого – длиной m. Массивы индексируются с нуля. За каждым счётчиком закрепляется своя функция хеширования, функции хеширования должны быть попарно независимы.
К каждому значению в потоке применяется каждая функция хеширования, затем в каждой строке содержимое соответствующей результату хеширования ячейки увеличивается на 1.
Поскольку в результате коллизий хешей частота может быть только переоценена, но не недооценена, конечная оценка представляет собой минимальное значение для данного идентификатора во всех строках.
Оценки ошибки следующие: \[\frac{\tilde x-x}{N} \le \frac{e}{m}\] с вероятностью \[1-δ = 1-e^{-w},\] где \(N\) – общее число элементов в потоке.
Вхождение в множество
Вопрос звучит так: “Встречался ли данный элемент в потоке ранее?”
Не имея возможности сохранять весь поток, можно использовать такой же подход, как и в случае с определением частот. Этот подход называется фильтр Блума (не путать с визуальным эффектом засветки “Bloom”).
Аналогично предыдущему разделу, фильтр Блума может давать ложноположительный ответ, но не может давать ложноотрицательный.
В фильтре Блума так же используется w хеш-функций, каждое значение из потока хэшируется каждой функцией и индексы, которые должны быть обновлены, определяются по результатам хеширования.
Отличие от CM sketch в том, что вместо массивов чисел используется один битовый вектор, в качестве обновления – битовая дизъюнкция, а конечная оценка – это результат конъюнкции соответствующих бит.