DEDUP

DEDUP удаляет повторяющиеся записи по ключу. Требует сортировки входных записей.

Порты DEDUP:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входных записейЛюбые
Output0даДля дедуплицированных записей.Как у Input 0
Output1нетДля дубликатов записей.Как у Input 0

Атрибуты DEDUP:

АтрибутОбязательныйОписаниеВозможные значения
dedupKeyнетКлюч, по которому производится дедупликация (удаление дубликатов) записей. Если ключ не установлен, весь входной поток рассматривается как одна группа и удаляются только полные дубликаты (по всем полям записи).dedupKey="x_coord"
keepнетОпределяет, какие записи будут сохранены. В случае значения Unique все записи из входного потока, где по ключу находится больше 1 записи, отбрасываются на порт output[1]. В случае значения First и Last туда идут только те записи, которые не прошли дедупликацию.

"first" (деф.) - Сохраняются записи из начала потока.
"last" - Сохраняются записи из конца потока.
"unique" - Выбираются только записи без дубликатов, в этом случае numberOfDuplicates игнорируется.

keep="unique"
numberOfDuplicatesнетМаксимальное количество повторяющихся записей, которые необходимо выбрать из каждой группы записей с одинаковым значением ключа или, если ключ не задан, максимальное количество записей с начала или конца всех записей. Игнорируется, если установлен keep="Unique".numberOfDuplicates="2"

Пример. Дедупликация несортированных записей.

Записи содержат время входов на некоторый ресурс с различных ip адресов. Нужно найти время первого входа для каждого ip адреса. Метаданные содержат поля «ip» и «time».

Входящие записи:

iptime
67.249.105.11811:46:12
208.25.71.8805:14:15
161.100.209.23523:12:32
161.100.209.23523:19:34
67.249.105.11815:34:09
223.78.208.18415:35:43
52.151.181.421:51:17
223.78.208.18415:38:49
161.100.209.23523:28:16

Решение:

Перед передачей в DEDUP данные надо прочитать и отсортировать. Для сортировки укажем ключ sortKey="ip(a);time(a)". При дедупликации укажем ключ: dedupKey = «ip». Текст графа будет выглядеть следующим образом:


<Graph>
  <Global>
    <Metadata id="meta0">
      <Record name="meta" type="delimited" recordDelimiter="\n" fieldDelimiter="|">
        <Field name="ip" type="string"/>
        <Field name="time" type="string"/>
      </Record>
    </Metadata>
  </Global>
  <Phase number="0">
    <Node id="FlatFileReader0" guiX="100" guiY="83" guiName="FlatFileReader" type="FLAT_FILE_READER" fileURL="dedup_in.txt">
    </Node>
    <Node id="Sort0" guiX="300" guiY="83" guiName="Sort" type="EXT_SORT" sortKey="ip(a);time(a)" sortInMemory="false">
    </Node>
    <Node id="Trash0" guiX="633" guiY="83" guiName="Trash" type="TRASH" debugOutput="true">
    </Node>
    <Node id="Dedup0" guiX="467" guiY="83" guiName="Dedup" type="DEDUP" dedupKey="ip" keep="first" numberOfDuplicates="1">
    </Node>
    <Edge id="Edge0" fromNode="FlatFileReader0:0" toNode="Sort0:0" metadata="meta0"/>
    <Edge id="Edge1" fromNode="Sort0:0" toNode="Dedup0:0" metadata="meta0"/>
    <Edge id="Edge2" fromNode="Dedup0:0" toNode="Trash0:0" metadata="meta0"/>
  </Phase>
</Graph>

Исходящие записи:

iptime
67.249.105.11811:46:12
208.25.71.8805:14:15
161.100.209.23523:12:32
223.78.208.18415:35:43
52.151.181.421:51:17