diff --git a/.ci/examples/node-examples/exclamation.js b/.ci/examples/node-examples/exclamation.js new file mode 100644 index 000000000..fb753464b --- /dev/null +++ b/.ci/examples/node-examples/exclamation.js @@ -0,0 +1,3 @@ +async function process(message, context) { + return message.concat("!"); +} \ No newline at end of file diff --git a/.ci/helm.sh b/.ci/helm.sh index 042eaba47..116e0a47e 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -139,7 +139,7 @@ function ci::verify_function_mesh() { sleep 5 kubectl get pods -l compute.functionmesh.io/name="${FUNCTION_NAME}" kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=50 || true - num=$(kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=-1 | grep "Created producer\|Created consumer\|Subscribed to topic" | wc -l) + num=$(kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=-1 | grep "Created producer\|Created consumer\|Subscribed to topic\|to broker pulsar" | wc -l) done } @@ -573,7 +573,7 @@ function ci::verify_log_topic() { timesleep=$3 sleep "$timesleep" - MESSAGE=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client consume -n 1 -s "sub" --subscription-position Earliest "${logTopic}") + MESSAGE=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client consume -n 2 -s "sub" --subscription-position Earliest "${logTopic}") echo "$MESSAGE" if [[ "$MESSAGE" == *"$message"* ]]; then return 0 @@ -588,7 +588,7 @@ function ci::verify_log_topic_with_auth() { timesleep=$3 sleep "$timesleep" - consumeCommand="kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-client --auth-plugin \$brokerClientAuthenticationPlugin --auth-params \$brokerClientAuthenticationParameters consume -n 1 -s "sub" --subscription-position Earliest \"${logTopic}\"'" + consumeCommand="kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-client --auth-plugin \$brokerClientAuthenticationPlugin --auth-params \$brokerClientAuthenticationParameters consume -n 2 -s "sub" --subscription-position Earliest \"${logTopic}\"'" MESSAGE=$(sh -c "$consumeCommand") echo "$MESSAGE" if [[ "$MESSAGE" == *"$message"* ]]; then diff --git a/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh b/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh index f16217fa9..4202b8754 100644 --- a/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh +++ b/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh @@ -41,7 +41,7 @@ if [ $? -ne 0 ]; then exit 1 fi -verify_log_topic=$(ci::verify_log_topic_with_auth persistent://public/default/logging-generic-auth-function-logs "org.apache.pulsar.functions.runtime.JavaInstanceStarter" 10 2>&1) +verify_log_topic=$(ci::verify_log_topic_with_auth persistent://public/default/logging-generic-auth-function-logs "it is not a NAR file" 10 2>&1) if [ $? -ne 0 ]; then echo "$verify_log_topic" kubectl delete -f "${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/manifests.yaml > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/cases/nodejs-function/manifests.yaml b/.ci/tests/integration/cases/nodejs-function/manifests.yaml new file mode 100644 index 000000000..e30944caa --- /dev/null +++ b/.ci/tests/integration/cases/nodejs-function/manifests.yaml @@ -0,0 +1,45 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: node-function-excla-generic-sample + namespace: default +spec: + className: exclamation + forwardSourceMessageProperty: true + maxPendingAsyncRequests: 1000 + replicas: 1 + input: + topics: + - persistent://public/default/node-generic-excla-input + typeClassName: string + output: + topic: persistent://public/default/node-generic-excla-output + typeClassName: string + resources: + requests: + cpu: 500m + memory: 0.5G + limits: + cpu: 500m + memory: 0.5G + pulsar: + pulsarConfig: "test-node-pulsar" + tlsConfig: + enabled: false + allowInsecure: true + hostnameVerification: true + genericRuntime: + functionFile: exclamation.js + functionFileLocation: function://public/default/test-node-function + language: nodejs + logLevel: warn + clusterName: test + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-node-pulsar +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 diff --git a/.ci/tests/integration/cases/nodejs-function/verify.sh b/.ci/tests/integration/cases/nodejs-function/verify.sh new file mode 100644 index 000000000..c7881b323 --- /dev/null +++ b/.ci/tests/integration/cases/nodejs-function/verify.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/nodejs-function/manifests.yaml + +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh node-function-excla-generic-sample 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_node_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_exclamation_function "persistent://public/default/node-generic-excla-input" "persistent://public/default/node-generic-excla-output" "test-message" "test-message!" 10 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_node_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +echo "e2e-test: ok" | yq eval - +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/e2e.yaml b/.ci/tests/integration/e2e.yaml index bf11a8df8..86688b03e 100644 --- a/.ci/tests/integration/e2e.yaml +++ b/.ci/tests/integration/e2e.yaml @@ -88,6 +88,7 @@ setup: bash .ci/upload_function.sh pyzip bash .ci/upload_function.sh pypip bash .ci/upload_function.sh go + bash .ci/upload_function.sh node - name: install function-mesh operator command: | @@ -168,3 +169,5 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/python-log-format-json/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration/cases/nodejs-function/verify.sh + expected: expected.data.yaml diff --git a/.ci/upload_function.sh b/.ci/upload_function.sh index 5f4a469c6..1f1c39e85 100755 --- a/.ci/upload_function.sh +++ b/.ci/upload_function.sh @@ -44,4 +44,8 @@ case ${1} in kubectl cp "${PULSAR_HOME}/.ci/examples/go-examples" "${NAMESPACE}/${CLUSTER}-pulsar-broker-0:/pulsar/" kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/test-go-function --path /pulsar/go-examples/exclamationFunc --description "test golang function" ;; + node) + kubectl cp "${PULSAR_HOME}/.ci/examples/node-examples" "${NAMESPACE}/${CLUSTER}-pulsar-broker-0:/pulsar/" + kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/test-node-function --path /pulsar/node-examples/exclamation.js --description "test nodejs function" + ;; esac diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 3a8d90769..8e22917d7 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -277,6 +277,7 @@ type GenericRuntime struct { // +kubebuilder:validation:Required Language string `json:"language"` FunctionFileLocation string `json:"functionFileLocation,omitempty"` + LogLevel string `json:"logLevel,omitempty"` } type SecretRef struct { diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 697df7a3f..4480b91b8 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -55,14 +55,14 @@ import ( const ( EnvShardID = "SHARD_ID" FunctionsInstanceClasspath = "pulsar.functions.instance.classpath" - DefaultRunnerTag = "2.10.0.0-rc10" - DefaultGenericRunnerTag = "0.1.0" + DefaultRunnerTag = "3.2.0.1" + DefaultGenericRunnerTag = "latest" DefaultRunnerPrefix = "streamnative/" DefaultRunnerImage = DefaultRunnerPrefix + "pulsar-all:" + DefaultRunnerTag - DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-java-runner:" + DefaultRunnerTag - DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-python-runner:" + DefaultRunnerTag - DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-go-runner:" + DefaultRunnerTag - DefaultGenericNodejsRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-nodejs-runner:" + DefaultGenericRunnerTag + DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-java-runner:" + DefaultRunnerTag + DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-python-runner:" + DefaultRunnerTag + DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-go-runner:" + DefaultRunnerTag + DefaultGenericNodejsRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-node-runner:" + DefaultGenericRunnerTag DefaultGenericPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-python-runner:" + DefaultGenericRunnerTag DefaultGenericRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-base-runner:" + DefaultGenericRunnerTag PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin" @@ -394,10 +394,10 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, - tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, logLevel string) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessGenericRuntimeArgs(language, functionFile, clusterName, - details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") + details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig, logLevel), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided downloadCommand := strings.Join(getDownloadCommand(downloadPath, functionFile, true, true, @@ -1194,7 +1194,7 @@ func getProcessPythonRuntimeArgs(name, packageName, clusterName, details, uid st func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, - authConfig *v1alpha1.AuthConfig) []string { + authConfig *v1alpha1.AuthConfig, logLevel string) []string { args := []string{ "exec", @@ -1210,6 +1210,12 @@ func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, secretProviderArgs := getGenericSecretProviderArgs(secretMaps, language) args = append(args, secretProviderArgs...) } + if logLevel != "" { + args = append(args, []string{ + "--log_level", + logLevel, + }...) + } if state != nil && state.Pulsar != nil && state.Pulsar.ServiceURL != "" { statefulArgs := []string{ "--state_storage_serviceurl", diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 0392746b6..e4d7ab0a3 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -163,10 +163,11 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container { mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(function.Spec.Java, function.Spec.Python, function.Spec.Golang)...) } + function.Spec.Image = getFunctionRunnerImage(&function.Spec) return &corev1.Container{ // TODO new container to pull user code image and upload jars into bookkeeper Name: "pulsar-function", - Image: getFunctionRunnerImage(&function.Spec), + Image: function.Spec.Image, Command: makeFunctionCommand(function), Ports: []corev1.ContainerPort{GRPCPort, MetricsPort}, Env: generateContainerEnv(function), @@ -246,7 +247,8 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { spec.GenericRuntime.Language, spec.ClusterName, generateFunctionDetailsInJSON(function), string(function.UID), spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, - function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig) + function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig, + function.Spec.GenericRuntime.LogLevel) } }