DEDUP
DEDUP удаляет повторяющиеся записи по ключу. Требует сортировки входных записей.
Порты DEDUP:
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входных записей | Любые |
Output | 0 | да | Для дедуплицированных записей. | Как у Input 0 |
Output | 1 | нет | Для дубликатов записей. | Как у Input 0 |
Атрибуты DEDUP:
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
dedupKey | нет | Ключ, по которому производится дедупликация (удаление дубликатов) записей. Если ключ не установлен, весь входной поток рассматривается как одна группа и удаляются только полные дубликаты (по всем полям записи). | dedupKey="x_coord" |
keep | нет | Определяет, какие записи будут сохранены. В случае значения Unique все записи из входного потока, где по ключу находится больше 1 записи, отбрасываются на порт output[1]. В случае значения First и Last туда идут только те записи, которые не прошли дедупликацию.
"first" (деф.) - Сохраняются записи из начала потока. | keep="unique" |
numberOfDuplicates | нет | Максимальное количество повторяющихся записей, которые необходимо выбрать из каждой группы записей с одинаковым значением ключа или, если ключ не задан, максимальное количество записей с начала или конца всех записей. Игнорируется, если установлен keep="Unique". | numberOfDuplicates="2" |
Пример. Дедупликация несортированных записей.
Записи содержат время входов на некоторый ресурс с различных ip адресов. Нужно найти время первого входа для каждого ip адреса. Метаданные содержат поля «ip» и «time».
Входящие записи:
ip | time |
---|---|
67.249.105.118 | 11:46:12 |
208.25.71.88 | 05:14:15 |
161.100.209.235 | 23:12:32 |
161.100.209.235 | 23:19:34 |
67.249.105.118 | 15:34:09 |
223.78.208.184 | 15:35:43 |
52.151.181.4 | 21:51:17 |
223.78.208.184 | 15:38:49 |
161.100.209.235 | 23:28:16 |
Решение:
Перед передачей в DEDUP данные надо прочитать и отсортировать. Для сортировки укажем ключ sortKey="ip(a);time(a)"
. При дедупликации укажем ключ: dedupKey = «ip»
. Текст графа будет выглядеть следующим образом:
<Graph>
<Global>
<Metadata id="meta0">
<Record name="meta" type="delimited" recordDelimiter="\n" fieldDelimiter="|">
<Field name="ip" type="string"/>
<Field name="time" type="string"/>
</Record>
</Metadata>
</Global>
<Phase number="0">
<Node id="FlatFileReader0" guiX="100" guiY="83" guiName="FlatFileReader" type="FLAT_FILE_READER" fileURL="dedup_in.txt">
</Node>
<Node id="Sort0" guiX="300" guiY="83" guiName="Sort" type="EXT_SORT" sortKey="ip(a);time(a)" sortInMemory="false">
</Node>
<Node id="Trash0" guiX="633" guiY="83" guiName="Trash" type="TRASH" debugOutput="true">
</Node>
<Node id="Dedup0" guiX="467" guiY="83" guiName="Dedup" type="DEDUP" dedupKey="ip" keep="first" numberOfDuplicates="1">
</Node>
<Edge id="Edge0" fromNode="FlatFileReader0:0" toNode="Sort0:0" metadata="meta0"/>
<Edge id="Edge1" fromNode="Sort0:0" toNode="Dedup0:0" metadata="meta0"/>
<Edge id="Edge2" fromNode="Dedup0:0" toNode="Trash0:0" metadata="meta0"/>
</Phase>
</Graph>
Исходящие записи:
ip | time |
---|---|
67.249.105.118 | 11:46:12 |
208.25.71.88 | 05:14:15 |
161.100.209.235 | 23:12:32 |
223.78.208.184 | 15:35:43 |
52.151.181.4 | 21:51:17 |