Для преобразования данных

Преобразователи — это промежуточные шаги графа. Преобразователи получают данные через подключенные входные порты, обрабатывают их указанным пользователем способом и отправляют через подключенные выходные порты в следующие шаги.

Список шагов для преобразования данных:

EXT_SORT - сортирует записи

EXT_FILTER - фильтрует записи

SIMPLE_COPY - копирует записи

MAP - пользовательский алгоритм обработки

ROLLUP - создает записи с помощью преобразования

EXT_SORT

EXT_SORT сортирует полученные записи в соответствии с указанным ключом сортировки и копирует каждую из них на все подключенные выходные порты. Позволяет использовать несколько параллельных потоков для сортировки больших данных.

Порты EXT_SORT:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейОдинаковые метаданные на входных и выходных портах
Output0даДля отсортированных записей
1-nнетДля отсортированных записей

Атрибуты EXT_SORT:

АтрибутОбязательныйОписаниеВозможные значения
sortKeyдаСписок полей метаданных, по которым производится сортировка и порядок сортировки. Наивысший приоритет сортировки имеет первое поле в последовательности. Порядок сортировки выражается отдельно для каждого ключевого поля (по возрастанию или по убыванию). Порядок сортировки по умолчанию — по возрастанию.sortKey="x_coord(a); y_coord(d)"
sortInMemoryнетПри sortInMemory="true" выполняется внутренняя сортировка. По умолчанию false.sortInMemory="true"
runSizeнетКоличество записей, сортируемых одновременно в памяти; размер одного буфера чтения. По умолчанию 8192.runSize="15456"

Пример. Сортировка данных.

Входные записи содержат имена файлов и их размер. Нужно отсортировать файлы по размеру, начиная с самого большого (descending – по убыванию). Метаданные содержат поля «FileName», «FileSize».

Входящие записи:

FileNameFileSize
file.txt2048
file.docx1048576
file.xml65536

Решение:

Ключ сортировки: FileSortKey="FileSize(d)"

Исходящие записи:

FileNameFileSize
file.docx1048576
file.xml65536
file.txt2048

EXT_FILTER

EXT_FILTER фильтрует входные данные в соответствии с логическим выражением. Отправляет все записи, соответствующие выражению фильтра, в первый выходной порт и все отклоненные записи во второй выходной порт.

Порты EXT_FILTER:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейОдинаковые метаданные на входных и выходных портах
Output0даДля отфильтрованных записей
1нетДля отклонённых записей

Атрибуты EXT_FILTER:

АтрибутОбязательныйОписаниеВозможные значения
filterExpressionдаВыражение, по которому фильтруются записи. Для записи преобразования используется JavaScript. Возвращает логическое значение.
<attr name="filterExpression">
    <![CDATA[ 
        $in[0].count != 177 && $in[0].product == «карандаш» 
    ]]>
</attr>

Пример. Фильтрация данных.

Входные данные содержат данные о продуктах, проданных в прошлом году. Нужно узнать данные только по карандашам. Метаданные содержат поля Product, Count и Location.

Входящие записи:

ProductCountLocation
карандаш1553екатеринбург
бумага6475новгород
ручка598владикавказ
карандаш177омск
карандаш239волгоград
бумага19казань
ластик53ростов

Решение:

Выражение для фильтрации: $in[0].product == «карандаш»

Исходящие записи:

ProductCountLocation
карандаш1553екатеринбург
карандаш177омск
карандаш239волгоград

SIMPLE_COPY

SIMPLE_COPY получает записи через один входной порт и копирует каждую из них на все подключенные выходные порты. Шаг не имеет атрибутов.

Порты SIMPLE_COPY:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
Output0даДля скопированных записейКак у Input 0
1-nнетКак у Output 0

Пример. Копирование данных.

Нужно скопировать записи с метаданными «carID» и «mark» в три потока.

Входящие записи:

порт 0:

carIDmark
145mercedes
856toyota
245chevrolet

Решение:

Для копирования в несколько потоков нужно подключить SIMPLE_COPY несколько выходных портов. Записи на всех выходных портах будут идти в одинаковом порядке.

Исходящие записи:

порт 0:

carIDmark
145mercedes
856toyota
245chevrolet

порт 1:

carIDmark
145mercedes
856toyota
245chevrolet

порт 2:

carIDmark
145mercedes
856toyota
245chevrolet

MAP

MAP позволяет написать пользовательский алгоритм обработки данных, используя внутренний язык системы. Можно по своему усмотрению трансформировать данные между входным и выходными портами, если предложенных шагов не хватает для выполнения необходимых преобразований данных.

Имеет единственный входной порт и как минимум один выходной. Может отправлять разные записи в разные выходные порты или даже отправлять одну и ту же запись на несколько выходных портов. Работает только с одним элементом, сохраняет порядок записей.

С помощью MAP можно:

  • удалить ненужные значения полей
  • проверить записи с помощью функций или регулярных выражений
  • создать новые или изменить существующие поля
  • преобразовать типы данных

Порты MAP:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
Output0даДля преобразованных записей
1-nнет

Атрибуты MAP:

АтрибутОбязательныйОписаниеВозможные значения
transform

1

Алгоритм преобразования данных
transformURL 1Имя внешнего файла, в котором описано преобразввание
charsetнетКодировка внешнего файла, определяющего преобразование

<attr name="transform">
    <![CDATA[
        function transform() {
            $out[0].person = $in[0].name.toString() + "_" + $in[0].surname.toString();
            $out[1].person = $in[0].name.toString().toUpperCase() + " " + $in[0].surname.toString().toUpperCase();
            return ALL;
        }
    ]]>
</attr>
1

Один из атрибутов должен быть указан.

Пример. Обработка данных с помощью MAP.

Нужно получить произведение и сумму полученных на вход данных и отправить результаты на разные выходные порты. Входные метаданные содержат поля a, b. Нужно отправить результат перемножения a*b на первый порт, а результат сложения a+b на второй порт.

Входящие записи:

ab
56
24
12

Решение:

Преобразование:


<Attr name="transform"><![CDATA[
    pub fn transform() -> OutPort {
        let res_mul = $in[0].a * $in[0].b;
        let res_add = $in[0].a + $in[0].b;

        $out[0].res_mul = res_mul;
        $out[1].res_add = res_add;

        return ALL;
    }

    ]]>
</Attr>

Исходящие записи:

порт 0:

multiplied
30
6
2

порт 1:

added
11
5
3

ROLLUP

ROLLUP создает одну или несколько выходных записей из одной или нескольких входных записей. Может отправлять разные записи на разные выходные порты, указанные пользователем. Записи должны быть отсортированы перед подачей в этот шаг.

Метаданные на разных выходных портах могут различаться.

Порты ROLLUP:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входных записейлюбые
Output0даДля выходных записей
1-Nнет

Атрибуты ROLLUP:

АтрибутОбязательныйОписаниеВозможные значения
groupKeyFields (groupKey)даКлюч, по которому записи считаются включенными в одну группу. Выражается в виде последовательности имен отдельных входных полей, разделенных друг от друга символом «#».name; salary
groupAccumulatorMetadataId (groupAccMd, groupMd)дaИдентификатор метаданных, которые служат для создания групповых аккумуляторов.metadataName
transformдаАлгоритм обработки данных. Функции для преобразования на шаге ROLLUP описаны в таблице ниже.
<Attr name="transform">
    <![CDATA[//
    Пользовательский алгоритм обработки.
    ]]>
</Attr>

Функции шага ROLLUP:

Когда приходит первая запись, срабатывает initGroup(groupAccumulator). Он инициализирует группу записей, объединенных групповым акумулятором groupAccumulator.

ПараметрЗначение
ОбязательныйДа
Входные параметры
<metadata name> groupAccMd

Метаданные, указанные пользователем. Если groupAccMd не определен, выполнение графа завершится с ошибкой.

Возвращаетvoid
ВызовВызывается по одному разу для первой входной записи каждой группы. Вызывается перед updateGroup(groupAccMd).
ОписаниеИнициализирует информацию для конкретной группы.
Пример

pub fn init_group() { }

Далее для каждой записи, которая соответствует этой группе, вызывается updateGroup(groupAccumulator).

ПараметрЗначение
ОбязательныйДа
Входные параметры
<metadata name> groupAccumulatorMetadataId

Метаданные, указанные пользователем. Если groupAccMd не определен, выполнение графа завершится с ошибкой.

Возвращает

если true, то вызывается updateTransform(counter, groupAccMd)

если false, то вызывается updateTransformOnError(counter, groupAccMd)

Вызов

Вызывается многократно (по одному разу для каждой входной записи группы, включая первую и последнюю запись). Вызывается после того, как функция initGroup(groupAccumulator) уже была вызвана для всей группы.

ОписаниеИнициализирует информацию для конкретной группы.
Пример

pub fn update_group() -> bool {
    group.count += 1;
    group.sum += input.salary;
return true}

Если updateGroup вернул true, то для каждой записи еще вызывается updateTransform(counter, groupAccumulator) столько раз сколько указан counter внутри, пока не вернётся SKIP.

ПараметрЗначение
ОбязательныйДа
Входные параметры

Целочисленный счетчик (начинается с 0, указывает количество созданных записей. Должен быть завершен, как показано в примере ниже. Вызовы функций заканчиваются, когда возвращается SKIP.)

<metadata name> groupAccMd (метаданные, указанные пользователем).

Если groupAccMd не определен, выполнение графа завершится с ошибкой.

Возвращаетцелочисленные значения
Вызов

Вызывается неоднократно, как указано пользователем. Вызывается после того, как updateGroup(groupAccumulator) возвращает значение true. Функция вызывается до тех пор, пока не будет возвращен SKIP.

Описание

Создает выходные записи на основе информации об отдельных записях. Если updateTransform() завершится ошибкой, а updateTransformOnError() не определено, весь граф завершится ошибкой. Если какая-либо часть функции transform() для какой-либо выходной записи вызывает сбой функции updateTransform(), и если определена другая (updateTransformOnError()), обработка продолжается в этом updateTransformOnError() в том месте, где произошел сбой updateTransform(). Функция updateTransformOnError() получает информацию, собранную функцией updateTransform(), полученную из ранее успешно обработанного кода. Сообщение об ошибке и трассировка стека также передаются в updateTransformOnError().

Пример

pub fn update_transform(counter: usize) -> OutPort {
    if counter > 1 { 
        return out_port![SKIP] }

    output.out_0 = input.clone();
    return out_port![0]

Когда группа закончилась, отрабатывает finishGroup(groupAccumulator).

ПараметрЗначение
ОбязательныйДа
Входные параметры
<metadata name> groupAccMd

Метаданные, указанные пользователем. Если groupAccMd не определен, выполнение графа завершится с ошибкой.

Возвращает

если true, то вызывается transform(counter,groupAccMd)

если false, то вызывается transformOnError(counter,groupAccMd)

Вызов

Вызывается повторно, один раз для последней входной записи каждой группы. Вызывается после того, как updateGroup(groupAccMd) уже был вызван для всех входных записей группы.

Описание

Если finishGroup() завершается с ошибкой, то весь граф завершится ошибкой.

Пример

pub fn finish_group() -> bool {
    if input.name.chars().count() < 5 { 
        return false }
        return true}

Затем выполняется transform(counter, groupAccumulator).

ПараметрЗначение
ОбязательныйДа
Входные параметры

целочисленный счетчик (начинается с 0, указывает количество созданных записей. должен быть завершен, как показано в примере ниже. Вызовы функций заканчиваются, когда возвращается SKIP.)

<metadata name> groupAccMd (метаданные, указанные пользователем).

Если groupAccMd не определен, выполнение графа завершится с ошибкой.

Возвращаетцелочисленные значения
Вызов

Вызывается неоднократно, как указано пользователем. Вызывается после того, как updateGroup(groupAccumulator) возвращает значение true. Функция вызывается до тех пор, пока не будет возвращен SKIP.

Описание

Создает выходные записи на основе всех записей всей группы. Если функция transform() завершается ошибкой, а функция transformOnError() не определена, весь граф завершится ошибкой. Если какая-либо часть функции transform() для какой-либо выходной записи вызывает сбой функции transform(), и если функция transformOnError() определена, обработка продолжается в transformOnError() в том месте, где произошла ошибка transform(). Функция transformOnError() получает информацию, собранную функцией transform(), которая была получена из ранее успешно обработанного кода. Также сообщение об ошибке и трассировка стека передаются в transformOnError().

Пример

pub fn transform(counter: usize) -> OutPort {
    if counter > 0 { 
        return out_port![SKIP] }
        
    output.out_0.name = input.name.clone() + "[AVG]";
    output.out_0.salary = group.sum / (group.count as f64);
    output.out_1 = output.out_0.clone();
    return out_port![ALL]

Входные записи или поля

Входные записи или поля доступны в функциях initGroup(), updateGroup(), finishGroup(). Они также доступны в функциях updateTransform(), transform().

Выходные записи или поля

Выходные записи или поля доступны в функциях updateTransform(), transform().

Групповой аккумулятор

Групповой аккумулятор доступен в функциях initGroup(), updateGroup(), finishGroup(). Он также доступен в функциях updateTransform(), transform().