Для преобразования данных
Преобразователи — это промежуточные шаги графа. Преобразователи получают данные через подключенные входные порты, обрабатывают их указанным пользователем способом и отправляют через подключенные выходные порты в следующие шаги.
Список шагов для преобразования данных:
MAP - пользовательский алгоритм обработки
ROLLUP - создает записи с помощью преобразования
EXT_SORT
EXT_SORT сортирует полученные записи в соответствии с указанным ключом сортировки и копирует каждую из них на все подключенные выходные порты. Позволяет использовать несколько параллельных потоков для сортировки больших данных.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Одинаковые метаданные на входных и выходных портах |
Output | 0 | да | Для отсортированных записей | |
1-n | нет | Для отсортированных записей |
Атрибуты EXT_SORT:
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
sortKey | да | Список полей метаданных, по которым производится сортировка и порядок сортировки. Наивысший приоритет сортировки имеет первое поле в последовательности. Порядок сортировки выражается отдельно для каждого ключевого поля (по возрастанию или по убыванию). Порядок сортировки по умолчанию — по возрастанию. | sortKey="x_coord(a); y_coord(d)" |
sortInMemory | нет | При sortInMemory="true" выполняется внутренняя сортировка. По умолчанию false . | sortInMemory="true" |
runSize | нет | Количество записей, сортируемых одновременно в памяти; размер одного буфера чтения. По умолчанию 8192 . | runSize="15456" |
Пример. Сортировка данных.
Входные записи содержат имена файлов и их размер. Нужно отсортировать файлы по размеру, начиная с самого большого (descending – по убыванию). Метаданные содержат поля «FileName», «FileSize».
Входящие записи:
FileName | FileSize |
---|---|
file.txt | 2048 |
file.docx | 1048576 |
file.xml | 65536 |
Решение:
Ключ сортировки: FileSortKey="FileSize(d)"
Исходящие записи:
FileName | FileSize |
---|---|
file.docx | 1048576 |
file.xml | 65536 |
file.txt | 2048 |
EXT_FILTER
EXT_FILTER фильтрует входные данные в соответствии с логическим выражением. Отправляет все записи, соответствующие выражению фильтра, в первый выходной порт и все отклоненные записи во второй выходной порт.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Одинаковые метаданные на входных и выходных портах |
Output | 0 | да | Для отфильтрованных записей | |
1 | нет | Для отклонённых записей |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
filterExpression | да | Выражение, по которому фильтруются записи. Для записи преобразования используется JavaScript. Возвращает логическое значение. |
|
Пример. Фильтрация данных.
Входные данные содержат данные о продуктах, проданных в прошлом году. Нужно узнать данные только по карандашам. Метаданные содержат поля Product
, Count
и Location
.
Входящие записи:
Product | Count | Location |
---|---|---|
карандаш | 1553 | екатеринбург |
бумага | 6475 | новгород |
ручка | 598 | владикавказ |
карандаш | 177 | омск |
карандаш | 239 | волгоград |
бумага | 19 | казань |
ластик | 53 | ростов |
Решение:
Выражение для фильтрации: $in[0].product == «карандаш»
Исходящие записи:
Product | Count | Location |
---|---|---|
карандаш | 1553 | екатеринбург |
карандаш | 177 | омск |
карандаш | 239 | волгоград |
SIMPLE_COPY
SIMPLE_COPY получает записи через один входной порт и копирует каждую из них на все подключенные выходные порты. Шаг не имеет атрибутов.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Любые |
Output | 0 | да | Для скопированных записей | Как у Input 0 |
1-n | нет | Как у Output 0 |
Пример. Копирование данных.
Нужно скопировать записи с метаданными «carID» и «mark» в три потока.
Входящие записи:
порт 0:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
Решение:
Для копирования в несколько потоков нужно подключить SIMPLE_COPY несколько выходных портов. Записи на всех выходных портах будут идти в одинаковом порядке.
Исходящие записи:
порт 0:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
порт 1:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
порт 2:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
MAP
MAP позволяет написать пользовательский алгоритм обработки данных, используя внутренний язык системы. Можно по своему усмотрению трансформировать данные между входным и выходными портами, если предложенных шагов не хватает для выполнения необходимых преобразований данных.
Имеет единственный входной порт и как минимум один выходной. Может отправлять разные записи в разные выходные порты или даже отправлять одну и ту же запись на несколько выходных портов. Работает только с одним элементом, сохраняет порядок записей.
С помощью MAP можно:
- удалить ненужные значения полей
- проверить записи с помощью функций или регулярных выражений
- создать новые или изменить существующие поля
- преобразовать типы данных
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Любые |
Output | 0 | да | Для преобразованных записей | |
1-n | нет |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
transform | Алгоритм преобразования данных | ||
transformURL | 1 | Имя внешнего файла, в котором описано преобразввание | |
charset | нет | Кодировка внешнего файла, определяющего преобразование |
|
Один из атрибутов должен быть указан.
Пример. Обработка данных с помощью MAP.
Нужно получить произведение и сумму полученных на вход данных и отправить результаты на разные выходные порты. Входные метаданные содержат поля a, b. Нужно отправить результат перемножения a*b на первый порт, а результат сложения a+b на второй порт.
Входящие записи:
a | b |
---|---|
5 | 6 |
2 | 4 |
1 | 2 |
Решение:
Преобразование:
<Attr name="transform"><![CDATA[
pub fn transform() -> OutPort {
let res_mul = $in[0].a * $in[0].b;
let res_add = $in[0].a + $in[0].b;
$out[0].res_mul = res_mul;
$out[1].res_add = res_add;
return ALL;
}
]]>
</Attr>
Исходящие записи:
порт 0:
multiplied |
---|
30 |
6 |
2 |
порт 1:
added |
---|
11 |
5 |
3 |
ROLLUP
ROLLUP создает одну или несколько выходных записей из одной или нескольких входных записей. Может отправлять разные записи на разные выходные порты, указанные пользователем. Записи должны быть отсортированы перед подачей в этот шаг.
Метаданные на разных выходных портах могут различаться.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входных записей | любые |
Output | 0 | да | Для выходных записей | |
1-N | нет |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
groupKeyFields (groupKey) | да | Ключ, по которому записи считаются включенными в одну группу. Выражается в виде последовательности имен отдельных входных полей, разделенных друг от друга символом «#». | name; salary |
groupAccumulatorMetadataId (groupAccMd, groupMd) | дa | Идентификатор метаданных, которые служат для создания групповых аккумуляторов. | metadataName |
transform | да | Алгоритм обработки данных. Функции для преобразования на шаге ROLLUP описаны в таблице ниже. |
|
Функции шага ROLLUP:
Когда приходит первая запись, срабатывает initGroup(groupAccumulator). Он инициализирует группу записей, объединенных групповым акумулятором groupAccumulator.
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры |
Метаданные, указанные пользователем. Если |
Возвращает | void |
Вызов | Вызывается по одному разу для первой входной записи каждой группы. Вызывается перед updateGroup(groupAccMd). |
Описание | Инициализирует информацию для конкретной группы. |
Пример |
|
Далее для каждой записи, которая соответствует этой группе, вызывается updateGroup(groupAccumulator).
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры |
Метаданные, указанные пользователем. Если |
Возвращает |
если true, то вызывается если false, то вызывается |
Вызов |
Вызывается многократно (по одному разу для каждой входной записи группы, включая первую и последнюю запись). Вызывается после того, как функция |
Описание | Инициализирует информацию для конкретной группы. |
Пример |
|
Если updateGroup вернул true, то для каждой записи еще вызывается updateTransform(counter, groupAccumulator) столько раз сколько указан counter внутри, пока не вернётся SKIP.
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры |
Целочисленный счетчик (начинается с 0, указывает количество созданных записей. Должен быть завершен, как показано в примере ниже. Вызовы функций заканчиваются, когда возвращается
Если |
Возвращает | целочисленные значения |
Вызов |
Вызывается неоднократно, как указано пользователем. Вызывается после того, как |
Описание |
Создает выходные записи на основе информации об отдельных записях. Если |
Пример |
|
Когда группа закончилась, отрабатывает finishGroup(groupAccumulator).
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры |
Метаданные, указанные пользователем. Если |
Возвращает |
если true, то вызывается если false, то вызывается |
Вызов |
Вызывается повторно, один раз для последней входной записи каждой группы. Вызывается после того, как |
Описание |
Если |
Пример |
|
Затем выполняется transform(counter, groupAccumulator).
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры |
целочисленный счетчик (начинается с 0, указывает количество созданных записей. должен быть завершен, как показано в примере ниже. Вызовы функций заканчиваются, когда возвращается SKIP.)
Если |
Возвращает | целочисленные значения |
Вызов |
Вызывается неоднократно, как указано пользователем. Вызывается после того, как |
Описание |
Создает выходные записи на основе всех записей всей группы. Если функция |
Пример |
|
Входные записи или поля
Входные записи или поля доступны в функциях initGroup()
, updateGroup()
, finishGroup()
. Они также доступны в функциях updateTransform()
, transform()
.
Выходные записи или поля
Выходные записи или поля доступны в функциях updateTransform()
, transform()
.
Групповой аккумулятор
Групповой аккумулятор доступен в функциях initGroup()
, updateGroup()
, finishGroup()
. Он также доступен в функциях updateTransform()
, transform()
.