forked from HariSekhon/DevOps-Python-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pig-text-to-elasticsearch.pig
84 lines (67 loc) · 3.35 KB
/
pig-text-to-elasticsearch.pig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
--
-- Author: Hari Sekhon
-- Date: 2015-03-19 22:36:22 +0000 (Thu, 20 Mar 2015)
--
-- vim:ts=4:sts=4:sw=4:et
--
-- Pig script to index [bz2 compressed] text files or logs for fast source file lookups in Elasticsearch
--
-- This was a simple use case where I didn't need to parse the logs as it's more oriented around finding source data files based on full-text search.
-- Tested on Pig 0.14 (Tez/MapReduce) on Hortonworks HDP 2.2
-- http://www.elastic.co/guide/en/elasticsearch/hadoop/current/pig.html
-- USAGE:
--
-- must download Elasticsearch connector for Hadoop from here:
--
-- https://www.elastic.co/downloads/hadoop
--
-- hadoop fs -put elasticsearch-hadoop.jar
--
-- pig -p path=/data/logs -p index=logs -p type=myType pig-text-to-elasticsearch.pig
REGISTER 'elasticsearch-hadoop.jar';
--%default path '/data';
--%default index 'myIndex';
--%default type 'myType';
-- Elasticsearch configuration
--
-- http://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
--
%default es_nodes 'localhost:9200';
%default es_port '9200'; -- only used for es.nodes not containing ports
%default queue 'default';
-- -Djob.name= doesn't work on Tez, but this does work to override on command line via -p job_name=
%default job_name 'pig-text-to-elasticsearch.pig';
set job.name '$job_name';
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.http.timeout = 5m',
'es.index.auto.create = true', -- should pre-create index with tuned settings, but this is convenient for testing
'es.nodes = $es_nodes',
'es.port = $es_port');
set default_parallel 5;
set pig.noSplitCombination true;
-- use dedicated queue to limit containers and resource consumption to not overwhelm Elasticsearch, otherwise loses a handful out of a millions of doc inserts as Elasticsearch throttle merges, causing task failures, which would then either retry, resulting in duplicates with autogenerated IDs, or failing the entire job
--
-- doesn't work with tez even via -D
--set tez.queue.name 'es';
-- but this works even for tez - overrides -Dmapreduce.job.queuename=
set mapreduce.job.queuename '$queue';
-- don't retry I don't want duplicates since using autogenerated IDs, instead fail job => tune => retry
set mapreduce.map.maxattempts 1;
set mapreduce.reduce.maxattempts 1;
-- old settings
set mapred.map.max.attempts 1;
set mapred.reduce.max.attempts 1;
-- none of the above work on Tez but this does
set tez.am.task.max.failed.attempts 0;
-- avoid dups
set mapreduce.map.speculative false;
set mapreduce.reduce.speculative false;
-- old variables
set mapred.map.tasks.speculative.execution false;
set mapred.reduce.tasks.speculative.execution false;
lines = LOAD '$path' USING PigStorage('\n', '-tagPath') AS (path:chararray, line:chararray);
-- preserve whitespace but check and remove lines that are only whitespace
lines2 = FILTER lines BY line IS NOT NULL AND TRIM(line) != '';
-- strip redundant prefixes like hdfs://nameservice1 or file: to avoid storing the same bytes over and over without value
--lines_final = FOREACH lines2 GENERATE REPLACE(path, '^file:', '') AS path, line;
lines_final = FOREACH lines2 GENERATE REPLACE(path, '^hdfs://\\w+(?::\\d+)?', '') AS path, line;
STORE lines_final INTO '$index/$type' USING EsStorage;