Skip to content

Commit

Permalink
fix(elasticsearch): v8 support
Browse files Browse the repository at this point in the history
  • Loading branch information
ebuildy committed Oct 31, 2022
1 parent d794591 commit c653633
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 66 deletions.
4 changes: 2 additions & 2 deletions dev/workflows/es2fs/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ jobs:
kind: "org.spark.mutate"
spec:
adds:
day: "{{day}}\\/{{month}}\\/{{year}}"
day: "{{ day }}/{{month}}/{{year}}"
- name: output
kind: org.elasticsearch.output
spec:
host: "{{env.elasticsearch}}"
index: "out"
clean_query: "day:{{day}}\\/{{month}}\\/{{year}}"
clean_query: day.keyword:{{day}}\/{{month}}\/{{year}}
8 changes: 4 additions & 4 deletions djobi-core/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
- GUI_CONFIG_FILE=/home/node/config.yaml

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1
image: docker.elastic.co/elasticsearch/elasticsearch:8.4.3
labels:
- "traefik.enable=true"
- "traefik.http.routers.elasticsearch.rule=Host(`elasticsearch-${DJOBI_HOST}`)"
Expand All @@ -64,15 +64,15 @@ services:
- node.roles=master,data
- xpack.security.enabled=false
- xpack.monitoring.collection.enabled=false
- xpack.monitoring.enabled=false
#- xpack.monitoring.enabled=false
- xpack.ml.enabled=false
- ingest.geoip.downloader.enabled=false
- bootstrap.memory_lock=true
- discovery.type=single-node
- http.cors.enabled=true
- http.cors.allow-origin=*
- http.cors.allow-origin="*"
- ES_JAVA_OPTS=-Xms512m -Xmx512m -Dmapper.allow_dots_in_name=true

volumes:
minio_data:
driver: local
driver: local
4 changes: 2 additions & 2 deletions djobi-elasticsearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ configurations {
es7
es8

testRuntimeOnly.extendsFrom es7
compileOnly.extendsFrom es7
testRuntimeOnly.extendsFrom es8
compileOnly.extendsFrom es8
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;

@Singleton
Expand Down Expand Up @@ -47,7 +48,7 @@ private int getCountValue(HashMap<String, Object> response) {
* @return bool
* @throws Exception
*/
public boolean execute(final ESOutputConfig config, final Job Job) throws Exception {
public boolean execute(final ESOutputConfig config, final Job Job) {
logger.info(String.format("[output:elasticsearch] clean: %s/%s?q=%s", config.url, config.index, config.clean_query));

int deleteTries = 1000;
Expand All @@ -62,7 +63,7 @@ public boolean execute(final ESOutputConfig config, final Job Job) throws Except
//http.post(String.format("%s/%s/_flush", config.url, realEsIndex)).execute().close();

final HashMap<String, Object> response = http.get(
String.format("%s/%s/_count?q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8"))
String.format("%s/%s/_count?q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, StandardCharsets.UTF_8))
).executeAsDict();

final int itemsCount = getCountValue(response);
Expand Down Expand Up @@ -94,27 +95,41 @@ public boolean execute(final ESOutputConfig config, final Job Job) throws Except
{
cleanRequest = http.delete(String.format("%s/%s/_query?size=10000&timeout=10s&q=%s", config.url, config.index, URLEncoder.encode(config.clean_query, "UTF-8")));
}
else
else if (esVersion.startsWith("7"))
{
cleanRequest = http.post(String.format("%s/%s/_delete_by_query?size=10000&timeout=10s&q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8")));
}
else
{
cleanRequest = http.post(String.format("%s/%s/_delete_by_query?max_docs=10000&timeout=10s&q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8")));
}

HttpResponse r = cleanRequest.execute();

if (r.statusCode() == 404) {
logger.error("delete query gives a 404 error, maybe a plugin is missing?");
return false;
switch (r.statusCode()) {
case 404 -> {
logger.error("delete query gives a 404 error, maybe a plugin is missing?");
return false;
}
case 400 -> {
logger.error(String.format("[400] %s", r.raw()));
return false;
}
}

logger.info(r.raw());
logger.debug(r.raw());

Thread.sleep(2000);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch(IOException e) {
logger.error("[output:elasticsearch] clean: exception", e);

throw e;
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import javax.inject.Inject;
import java.io.IOException;
Expand Down Expand Up @@ -101,7 +99,7 @@ void testCleanExistingData() throws Exception {
Assertions.assertEquals(15, elasticsearchUtils.searchCount(esURL, "test", null));
}

private ActionRunResult run(String cleanQuery) throws Exception {
private void run(String cleanQuery) throws Exception {
final String esURL = configuration.getString("elasticsearch");

SparkExecutor executor = getSparkExecutor();
Expand All @@ -118,13 +116,12 @@ private ActionRunResult run(String cleanQuery) throws Exception {
.configure(new Bag(
"host", esURL,
"clean_query", cleanQuery,
"index", "test/doc"
"index", "test"
))
.run(dfData);

elasticsearchUtils.refresh(esURL, "test");

return runResult;
}

}
14 changes: 9 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ services:
# Build and run djobi
##
djobi:
tty: true
build:
context: .
dockerfile: ./packages/docker/Dockerfile
Expand All @@ -59,10 +60,13 @@ services:
# https_proxy:
command: |+
djobi
--djobi-conf djobi.plugins.logger.enabled false
--djobi-conf djobi.plugins.apm.enabled false
--support-elasticsearch 7
/opt/dev/pipelines/es2fs -adate=yesterday
--djobi-conf djobi.plugins.logger.enabled true
--djobi-conf djobi.plugins.apm.enabled false
--djobi-conf elasticsearch "http://elasticsearch:9200"
--log-level info
--verbosity quiet
--support-elasticsearch 8
/opt/dev/workflows/es2fs -adate=yesterday
environment:
DJOBI_CONF: /opt/dev/default.conf
DJOBI_SUPPORT_UA_PARSER: "true"
Expand All @@ -77,4 +81,4 @@ services:
- ./dev:/opt/dev
- /Users/t.decaux/dev/qwant/qwant.jks:/usr/local/openjdk-11/lib/security/cacerts:ro
#- ./packages/docker/entrypoint.sh:/opt/djobi/docker/entrypoint.sh:ro
#- ./djobi-submit/djobi_submit:/opt/djobi/.local/lib/python3.10/site-packages/djobi_submit:ro
#- ./djobi-submit/djobi_submit:/opt/djobi/.local/lib/python3.10/site-packages/djobi_submit:ro
39 changes: 0 additions & 39 deletions packages/docker/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,42 +1,5 @@
#!/bin/sh

# shellcheck disable=SC1091

#set -o errexit
#set -o nounset
#set -o pipefail
#set -o xtrace

if [ $EUID -eq 0 ]
then
info "<!> dont run as root <!>"
fi

set -e

# Check whether there is a passwd entry for the container UID
myuid=$(id -u)
mygid=$(id -g)
# turn off -e for getent because it will return error code in anonymous uid case
set +e
uidentry=$(getent passwd $myuid)
set -e

# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ]
then
if [ -w /etc/passwd ]
then
echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd
else
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi

DJOBI_VERSION=$(cat "${DJOBI_HOME}/VERSION")

export DJOBI_VERSION

case "$1" in
"")
set -- python3 submit/djobi_submit/ run --help
Expand All @@ -56,7 +19,5 @@ case "$1" in
;;
esac

set -ex

# Execute the container CMD under tini for better hygiene
exec tini -s -- "$@"

0 comments on commit c653633

Please sign in to comment.