Пример 1. Работа с простыми файлами, преобразование, запись данных в базу. Автоматизация запуска графа.

Описание задачи

Разработать граф для загрузки данных из CSV файла в таблицу. Добавить поля со временем создания и обновления записей. Изменить в поле price символ точки, на запятую.

Настроить расписание и обработчик событий, вызвать созданный граф из другого графа.

Создание графа для преобразования данных и записи в базу

  1. Чтобы создать новый граф, найдите в структуре своего проекта на панели Project structure папку grapf, в которую поместите граф. Кликните правой кнопкой на папку graph, выберите в меню пункт New grf file. Задайте имя графа, например load_ecommerce_behavior.grf и нажмите Создать.
  2. Привяжите к графу внешний файл параметров: панель Outline -> Parameters -> Link parameters -> вставьте путь к файлу параметров ./workspace.prm в поле fileURL.
  3. Привяжите также файл с описанием соединения с базой: панель Outline -> Connections -> Link connection -> вставьте путь к файлу соединения ./conn/demo2.con в поле dbConfig.
  4. Перетащите на рабочую область с панели компонентов узлы CsvReader, Map, PostgresSqlDataWriter.
  5. Соедините порты узлов рёбрами. Вытягивайте ребро из выходного порта (с правой стороны) узла и отпускайте, когда довели курсор до входного порта (с левой стороны) другого узла.
  6. Создайте 2 схемы данных через редактор схем.
    • на панели Outline откройте контекстное меню раздела Metadata -> выберите пункт New metadata. Задайте имя схемы - input. Создайте 8 полей типа string: event_time, event_type, product_id, category_id, category_code, brand, price, user_id, user_session/ Чтобы добавить поля в схему используйте кнопку +. Чтобы удалить ненужное поле - кнопку -
    • на панели Outline откройте контекстное меню схемы output -> выберите пункт Copy metadata. Скопируйте схему input и вставьте ее через контекстное меню раздела Metadata -> Paste metadata. Откройте редактор и смените название схемы на output. Добавьте еще два поля типа date: created_at, updated_at
  7. Перетащите схему input с панели Outline на ребро между узлами CsvReader и Map, а схему output на ребро между Map и PostgresSqlDataWriter.
  8. Заполните значения атрибутов узлов. Чтобы открыть редактор узла, дважды кликните по его поверхности.
    • в CsvReader нужно вставить имя файла-источника данных в поле fileURL=${DATAIN_DIR}/2019-Nov.csv
    • в Map заполнить поле transform кодом трансформации на внутреннем языке Onebridge. Код приведён ниже в таблице "Атрибуты MAP".
    • в PostgresSqlDataWriter выберите имя нужного dbConnection из списка, вставьте имя таблицы для записи в поле table, и укажите параметры для записи в parameters

Полный список значений атрибутов для каждого узла приведён в следующем разделе Атрибуты используемых узлов.

На рисунке ниже представлен созданный граф для загрузки данных из CSV файла в базу:

Отображение графа load_ecommerce_behavior.grf в инспекторе

Атрибуты используемых узлов

Атрибуты CSV_READER:

АтрибутЗначениеОписание
phase0Фаза узла
node nameCsvReaderИмя узла, отображаемое в рабочей области
enabledtrueРаботоспособность узла
fileURL${DATAIN_DIR}/2019-Nov.csvПуть к источнику данных
charsetUTF_8Кодировка файла-источника
dataPolicystrictПолитика обработки некорректных данных при чтении
trimdefaultФлаг удаления начальных и конечных пробелов в момент прохождения через данный узел
headertrueФлаг удаления заголовка файла
quotedStringsfalseФлаг восприятия спец. символов
quoteChar bothСпец. символ для атрибута quotedStrings
fieldDelimiter,Разделитель полей
recordDelimiter\nРазделитель записей

Атрибуты MAP:

АтрибутЗначениеОписание
phase0Фаза узла
node nameCsvReaderИмя узла, отображаемое в рабочей области
enabledtrueРаботоспособность узла
transform
#[derive(Default)]
pub struct Local {
    pub start_date: DateTime,
}

fn transform(
    input: In, mut output: Out, 
    local: &mut Local, glb: Global,
) -> Result {
    if local.start_date == DateTime::NULL {
        local.start_date = DateTime::now();
    }

    output.0.event_time = input.0.event_time.clone();
    output.0.event_type = input.0.event_type.clone();
    output.0.product_id = input.0.product_id.clone();
    output.0.category_id = input.0.category_id.clone();
    output.0.category_code = input.0.category_code.clone();
    output.0.brand = input.0.brand.clone();
    output.0.price = input.0.price.replace(".", ",");
    output.0.user_id = input.0.user_id.clone();
    output.0.user_session = input.0.user_session.clone();
    output.0.created_at = local.start_date.clone();
    output.0.updated_at = local.start_date.clone();

    Result::All
}
Код для преобразования данных и присвоения значений выходной схеме данных

Атрибуты POSTGRESQL_DATA_WRITER:

АтрибутЗначениеОписание
phase0Фаза узла
node nameCsvReaderИмя узла, отображаемое в рабочей области
enabledtrueРаботоспособность узла
dbConnection./conn/demo2.conИмя соединения с базой
tableecommerce_behaviorИмя таблицы для записи
parameters
NULL=NULL
Параметры для утилиты psql и оператора copy, используемых узлом POSTGRESQL_DATA_WRITER

Вызов графа из другого графа

Чтобы вызвать созданный граф из другого графа, используйте узел ExecuteGraph.

  1. Создайте новый граф, задайте имя, например startLoad
  2. На рабочую область нужно поместить единственный компонент - ExecuteGraph
  3. Откройте редактор атрибутов узла и задайте в поле jobURL имя графа, который нужно запустить: jobURL = ./graph/load_ecommerce_behavior.grf
  4. Сохраните изменения атрибутов узла, сохраните граф CTRL+S.

Так будет выглядеть граф для вызова другого графа:

Граф startLoad.grf для запуска load_ecommerce_behavior.grf

Теперь, если запустить startLoad.grf, он следом запустит первоначальный граф load_ecommerce_behavior.grf.

Результат выполнения можно будет увидеть в панели администратора на странице История выполнения.

Отображение запуска startLoad.grf в истории выполнения

Создание расписания

Чтобы запустить граф в определенное время в автоматическом режиме, можно использовать расписание.

  1. Для этого в панели администратора на странице Расписания диалог создания расписаний.
  2. Задайте имя - Расписание запуска startLoad.grf, периодичность - Один раз, время исполнения, выберите проект и граф. Сохраните расписание.

Граф будет запущен в назначенное время.

Просмотр дополнительной информации по расписанию "Расписание запуска startLoad.grf"

Результат выполнения графа отобразится в панели администратора на странице История выполнения. На вкладке Обзор в поле "Тип запуска" будет указано по расписанию, а в "Пользователь" - root.

Просмотр информации по запуску расписания "Расписание запуска startLoad.grf"

Создание обработчика событий

Если нужно, чтобы граф запускался по событию - нужно настроить обработчик для события. Триггерным событием для обработчика может быть окончание работы графа с определенным статусом либо создание/удаление файла в указанной директории.

  1. В панели администратора, на странице Обработчики событий, вызовите диалог создания обработчика кнопкой Новый обработчик.
  2. Задайте название обработчика, например, Поступил файл. Выберите запускающее событие - Файл и укажите необходимость проверить добавление файла в директорию /srv/projects/DEMO/data-in/*.csv. Задайте действие, которое надо выполнить - Запуск графа и его параметры: проект - DEMO, граф - /graph/startLoad.grf. Сохраните обработчик.

Просмотр дополнительной информации по обработчику событий "Обработчик для запуска startLoad.grf"

Когда триггерное событие случится, указанный в поле "Начало" граф будет запущен. Проверить выполнение графа можно на истории выполнения. На вкладке Обзор в поле "Пользователь" будет root, а в поле "Тип запуска" будет указано по событию.

Просмотр информации по запуску обработчика "Обработчик для запуска startLoad.grf"