diff --git a/dev/workflows/es2fs/pipeline.yml b/dev/workflows/es2fs/pipeline.yml index 24f61c6..05a3939 100644 --- a/dev/workflows/es2fs/pipeline.yml +++ b/dev/workflows/es2fs/pipeline.yml @@ -34,10 +34,10 @@ jobs: kind: "org.spark.mutate" spec: adds: - day: "{{day}}\\/{{month}}\\/{{year}}" + day: "{{ day }}/{{month}}/{{year}}" - name: output kind: org.elasticsearch.output spec: host: "{{env.elasticsearch}}" index: "out" - clean_query: "day:{{day}}\\/{{month}}\\/{{year}}" + clean_query: day.keyword:{{day}}\/{{month}}\/{{year}} diff --git a/djobi-core/docker-compose.yml b/djobi-core/docker-compose.yml index 4a6fd8b..929faae 100644 --- a/djobi-core/docker-compose.yml +++ b/djobi-core/docker-compose.yml @@ -55,7 +55,7 @@ services: - GUI_CONFIG_FILE=/home/node/config.yaml elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1 + image: docker.elastic.co/elasticsearch/elasticsearch:8.4.3 labels: - "traefik.enable=true" - "traefik.http.routers.elasticsearch.rule=Host(`elasticsearch-${DJOBI_HOST}`)" @@ -64,15 +64,15 @@ services: - node.roles=master,data - xpack.security.enabled=false - xpack.monitoring.collection.enabled=false - - xpack.monitoring.enabled=false + #- xpack.monitoring.enabled=false - xpack.ml.enabled=false - ingest.geoip.downloader.enabled=false - bootstrap.memory_lock=true - discovery.type=single-node - http.cors.enabled=true - - http.cors.allow-origin=* + - http.cors.allow-origin="*" - ES_JAVA_OPTS=-Xms512m -Xmx512m -Dmapper.allow_dots_in_name=true volumes: minio_data: - driver: local \ No newline at end of file + driver: local diff --git a/djobi-elasticsearch/build.gradle b/djobi-elasticsearch/build.gradle index 25c2135..a1fad92 100644 --- a/djobi-elasticsearch/build.gradle +++ b/djobi-elasticsearch/build.gradle @@ -14,8 +14,8 @@ configurations { es7 es8 - testRuntimeOnly.extendsFrom es7 - compileOnly.extendsFrom es7 + testRuntimeOnly.extendsFrom es8 + compileOnly.extendsFrom es8 } dependencies { diff --git a/djobi-elasticsearch/src/main/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputExistingRemover.java b/djobi-elasticsearch/src/main/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputExistingRemover.java index 56550aa..ec2c371 100644 --- a/djobi-elasticsearch/src/main/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputExistingRemover.java +++ b/djobi-elasticsearch/src/main/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputExistingRemover.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.HashMap; @Singleton @@ -47,7 +48,7 @@ private int getCountValue(HashMap response) { * @return bool * @throws Exception */ - public boolean execute(final ESOutputConfig config, final Job Job) throws Exception { + public boolean execute(final ESOutputConfig config, final Job Job) { logger.info(String.format("[output:elasticsearch] clean: %s/%s?q=%s", config.url, config.index, config.clean_query)); int deleteTries = 1000; @@ -62,7 +63,7 @@ public boolean execute(final ESOutputConfig config, final Job Job) throws Except //http.post(String.format("%s/%s/_flush", config.url, realEsIndex)).execute().close(); final HashMap response = http.get( - String.format("%s/%s/_count?q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8")) + String.format("%s/%s/_count?q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, StandardCharsets.UTF_8)) ).executeAsDict(); final int itemsCount = getCountValue(response); @@ -94,27 +95,41 @@ public boolean execute(final ESOutputConfig config, final Job Job) throws Except { cleanRequest = http.delete(String.format("%s/%s/_query?size=10000&timeout=10s&q=%s", config.url, config.index, URLEncoder.encode(config.clean_query, "UTF-8"))); } - else + else if (esVersion.startsWith("7")) { cleanRequest = http.post(String.format("%s/%s/_delete_by_query?size=10000&timeout=10s&q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8"))); } + else + { + cleanRequest = http.post(String.format("%s/%s/_delete_by_query?max_docs=10000&timeout=10s&q=%s", config.url, realEsIndex, URLEncoder.encode(config.clean_query, "UTF-8"))); + } HttpResponse r = cleanRequest.execute(); - if (r.statusCode() == 404) { - logger.error("delete query gives a 404 error, maybe a plugin is missing?"); - return false; + switch (r.statusCode()) { + case 404 -> { + logger.error("delete query gives a 404 error, maybe a plugin is missing?"); + return false; + } + case 400 -> { + logger.error(String.format("[400] %s", r.raw())); + return false; + } } - logger.info(r.raw()); + logger.debug(r.raw()); - Thread.sleep(2000); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } } catch(IOException e) { logger.error("[output:elasticsearch] clean: exception", e); - throw e; + return false; } } } diff --git a/djobi-elasticsearch/src/test/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputRunnerTest.java b/djobi-elasticsearch/src/test/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputRunnerTest.java index 10f4930..ff9cf1c 100644 --- a/djobi-elasticsearch/src/test/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputRunnerTest.java +++ b/djobi-elasticsearch/src/test/java/io/datatok/djobi/engine/stages/elasticsearch/output/ESOutputRunnerTest.java @@ -15,8 +15,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import javax.inject.Inject; import java.io.IOException; @@ -101,7 +99,7 @@ void testCleanExistingData() throws Exception { Assertions.assertEquals(15, elasticsearchUtils.searchCount(esURL, "test", null)); } - private ActionRunResult run(String cleanQuery) throws Exception { + private void run(String cleanQuery) throws Exception { final String esURL = configuration.getString("elasticsearch"); SparkExecutor executor = getSparkExecutor(); @@ -118,13 +116,12 @@ private ActionRunResult run(String cleanQuery) throws Exception { .configure(new Bag( "host", esURL, "clean_query", cleanQuery, - "index", "test/doc" + "index", "test" )) .run(dfData); elasticsearchUtils.refresh(esURL, "test"); - return runResult; } } diff --git a/docker-compose.yml b/docker-compose.yml index 1851a8c..8b34e98 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,7 @@ services: # Build and run djobi ## djobi: + tty: true build: context: . dockerfile: ./packages/docker/Dockerfile @@ -59,10 +60,13 @@ services: # https_proxy: command: |+ djobi - --djobi-conf djobi.plugins.logger.enabled false - --djobi-conf djobi.plugins.apm.enabled false - --support-elasticsearch 7 - /opt/dev/pipelines/es2fs -adate=yesterday + --djobi-conf djobi.plugins.logger.enabled true + --djobi-conf djobi.plugins.apm.enabled false + --djobi-conf elasticsearch "http://elasticsearch:9200" + --log-level info + --verbosity quiet + --support-elasticsearch 8 + /opt/dev/workflows/es2fs -adate=yesterday environment: DJOBI_CONF: /opt/dev/default.conf DJOBI_SUPPORT_UA_PARSER: "true" @@ -77,4 +81,4 @@ services: - ./dev:/opt/dev - /Users/t.decaux/dev/qwant/qwant.jks:/usr/local/openjdk-11/lib/security/cacerts:ro #- ./packages/docker/entrypoint.sh:/opt/djobi/docker/entrypoint.sh:ro - #- ./djobi-submit/djobi_submit:/opt/djobi/.local/lib/python3.10/site-packages/djobi_submit:ro \ No newline at end of file + #- ./djobi-submit/djobi_submit:/opt/djobi/.local/lib/python3.10/site-packages/djobi_submit:ro diff --git a/packages/docker/entrypoint.sh b/packages/docker/entrypoint.sh index 9be2a55..ee659f0 100755 --- a/packages/docker/entrypoint.sh +++ b/packages/docker/entrypoint.sh @@ -1,42 +1,5 @@ #!/bin/sh -# shellcheck disable=SC1091 - -#set -o errexit -#set -o nounset -#set -o pipefail -#set -o xtrace - -if [ $EUID -eq 0 ] -then - info " dont run as root " -fi - -set -e - -# Check whether there is a passwd entry for the container UID -myuid=$(id -u) -mygid=$(id -g) -# turn off -e for getent because it will return error code in anonymous uid case -set +e -uidentry=$(getent passwd $myuid) -set -e - -# If there is no passwd entry for the container UID, attempt to create one -if [ -z "$uidentry" ] -then - if [ -w /etc/passwd ] - then - echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd - else - echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" - fi -fi - -DJOBI_VERSION=$(cat "${DJOBI_HOME}/VERSION") - -export DJOBI_VERSION - case "$1" in "") set -- python3 submit/djobi_submit/ run --help @@ -56,7 +19,5 @@ case "$1" in ;; esac -set -ex - # Execute the container CMD under tini for better hygiene exec tini -s -- "$@"