Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pulsarctl runner images as default and add logLevel to genericRuntime #727

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .ci/examples/node-examples/exclamation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
async function process(message, context) {
return message.concat("!");
}
6 changes: 3 additions & 3 deletions .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ function ci::verify_function_mesh() {
sleep 5
kubectl get pods -l compute.functionmesh.io/name="${FUNCTION_NAME}"
kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=50 || true
num=$(kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=-1 | grep "Created producer\|Created consumer\|Subscribed to topic" | wc -l)
num=$(kubectl logs -l compute.functionmesh.io/name="${FUNCTION_NAME}" --tail=-1 | grep "Created producer\|Created consumer\|Subscribed to topic\|to broker pulsar" | wc -l)
done
}

Expand Down Expand Up @@ -573,7 +573,7 @@ function ci::verify_log_topic() {
timesleep=$3

sleep "$timesleep"
MESSAGE=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client consume -n 1 -s "sub" --subscription-position Earliest "${logTopic}")
MESSAGE=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client consume -n 2 -s "sub" --subscription-position Earliest "${logTopic}")
echo "$MESSAGE"
if [[ "$MESSAGE" == *"$message"* ]]; then
return 0
Expand All @@ -588,7 +588,7 @@ function ci::verify_log_topic_with_auth() {
timesleep=$3

sleep "$timesleep"
consumeCommand="kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-client --auth-plugin \$brokerClientAuthenticationPlugin --auth-params \$brokerClientAuthenticationParameters consume -n 1 -s "sub" --subscription-position Earliest \"${logTopic}\"'"
consumeCommand="kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-client --auth-plugin \$brokerClientAuthenticationPlugin --auth-params \$brokerClientAuthenticationParameters consume -n 2 -s "sub" --subscription-position Earliest \"${logTopic}\"'"
MESSAGE=$(sh -c "$consumeCommand")
echo "$MESSAGE"
if [[ "$MESSAGE" == *"$message"* ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ if [ $? -ne 0 ]; then
exit 1
fi

verify_log_topic=$(ci::verify_log_topic_with_auth persistent://public/default/logging-generic-auth-function-logs "org.apache.pulsar.functions.runtime.JavaInstanceStarter" 10 2>&1)
verify_log_topic=$(ci::verify_log_topic_with_auth persistent://public/default/logging-generic-auth-function-logs "it is not a NAR file" 10 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_log_topic"
kubectl delete -f "${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function-generic-auth/manifests.yaml > /dev/null 2>&1 || true
Expand Down
45 changes: 45 additions & 0 deletions .ci/tests/integration/cases/nodejs-function/manifests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
name: node-function-excla-generic-sample
namespace: default
spec:
className: exclamation
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
input:
topics:
- persistent://public/default/node-generic-excla-input
typeClassName: string
output:
topic: persistent://public/default/node-generic-excla-output
typeClassName: string
resources:
requests:
cpu: 500m
memory: 0.5G
limits:
cpu: 500m
memory: 0.5G
pulsar:
pulsarConfig: "test-node-pulsar"
tlsConfig:
enabled: false
allowInsecure: true
hostnameVerification: true
genericRuntime:
functionFile: exclamation.js
functionFileLocation: function://public/default/test-node-function
language: nodejs
logLevel: warn
clusterName: test
autoAck: true
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-node-pulsar
data:
webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650
54 changes: 54 additions & 0 deletions .ci/tests/integration/cases/nodejs-function/verify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

set -e

E2E_DIR=$(dirname "$0")
BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd)
PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"}
PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"}
E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"}

source "${BASE_DIR}"/.ci/helm.sh

if [ ! "$KUBECONFIG" ]; then
export KUBECONFIG=${E2E_KUBECONFIG}
fi

manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/nodejs-function/manifests.yaml

kubectl apply -f "${manifests_file}" > /dev/null 2>&1

verify_fm_result=$(ci::verify_function_mesh node-function-excla-generic-sample 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_fm_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

verify_node_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_exclamation_function "persistent://public/default/node-generic-excla-input" "persistent://public/default/node-generic-excla-output" "test-message" "test-message!" 10 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_node_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

echo "e2e-test: ok" | yq eval -
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
3 changes: 3 additions & 0 deletions .ci/tests/integration/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ setup:
bash .ci/upload_function.sh pyzip
bash .ci/upload_function.sh pypip
bash .ci/upload_function.sh go
bash .ci/upload_function.sh node

- name: install function-mesh operator
command: |
Expand Down Expand Up @@ -168,3 +169,5 @@ verify:
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration/cases/python-log-format-json/verify.sh
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration/cases/nodejs-function/verify.sh
expected: expected.data.yaml
4 changes: 4 additions & 0 deletions .ci/upload_function.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ case ${1} in
kubectl cp "${PULSAR_HOME}/.ci/examples/go-examples" "${NAMESPACE}/${CLUSTER}-pulsar-broker-0:/pulsar/"
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/test-go-function --path /pulsar/go-examples/exclamationFunc --description "test golang function"
;;
node)
kubectl cp "${PULSAR_HOME}/.ci/examples/node-examples" "${NAMESPACE}/${CLUSTER}-pulsar-broker-0:/pulsar/"
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/test-node-function --path /pulsar/node-examples/exclamation.js --description "test nodejs function"
;;
esac
1 change: 1 addition & 0 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type GenericRuntime struct {
// +kubebuilder:validation:Required
Language string `json:"language"`
FunctionFileLocation string `json:"functionFileLocation,omitempty"`
LogLevel string `json:"logLevel,omitempty"`
}

type SecretRef struct {
Expand Down
24 changes: 15 additions & 9 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ import (
const (
EnvShardID = "SHARD_ID"
FunctionsInstanceClasspath = "pulsar.functions.instance.classpath"
DefaultRunnerTag = "2.10.0.0-rc10"
DefaultGenericRunnerTag = "0.1.0"
DefaultRunnerTag = "3.2.0.1"
DefaultGenericRunnerTag = "latest"
DefaultRunnerPrefix = "streamnative/"
DefaultRunnerImage = DefaultRunnerPrefix + "pulsar-all:" + DefaultRunnerTag
DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-java-runner:" + DefaultRunnerTag
DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-python-runner:" + DefaultRunnerTag
DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-go-runner:" + DefaultRunnerTag
DefaultGenericNodejsRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-nodejs-runner:" + DefaultGenericRunnerTag
DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-java-runner:" + DefaultRunnerTag
DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-python-runner:" + DefaultRunnerTag
DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-pulsarctl-go-runner:" + DefaultRunnerTag
DefaultGenericNodejsRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-node-runner:" + DefaultGenericRunnerTag
DefaultGenericPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-python-runner:" + DefaultGenericRunnerTag
DefaultGenericRunnerImage = DefaultRunnerPrefix + "pulsar-functions-generic-base-runner:" + DefaultGenericRunnerTag
PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin"
Expand Down Expand Up @@ -394,10 +394,10 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph

func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef,
state *v1alpha1.Stateful,
tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string {
tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, logLevel string) []string {
processCommand := setShardIDEnvironmentVariableCommand() + " && " +
strings.Join(getProcessGenericRuntimeArgs(language, functionFile, clusterName,
details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ")
details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig, logLevel), " ")
if downloadPath != "" && !utils.EnableInitContainers {
// prepend download command if the downPath is provided
downloadCommand := strings.Join(getDownloadCommand(downloadPath, functionFile, true, true,
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func getProcessPythonRuntimeArgs(name, packageName, clusterName, details, uid st

func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, uid string, authProvided, tlsProvided bool,
secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig,
authConfig *v1alpha1.AuthConfig) []string {
authConfig *v1alpha1.AuthConfig, logLevel string) []string {

args := []string{
"exec",
Expand All @@ -1210,6 +1210,12 @@ func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details,
secretProviderArgs := getGenericSecretProviderArgs(secretMaps, language)
args = append(args, secretProviderArgs...)
}
if logLevel != "" {
args = append(args, []string{
"--log_level",
logLevel,
}...)
}
if state != nil && state.Pulsar != nil && state.Pulsar.ServiceURL != "" {
statefulArgs := []string{
"--state_storage_serviceurl",
Expand Down
6 changes: 4 additions & 2 deletions controllers/spec/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container {
mounts = append(mounts,
generateDownloaderVolumeMountsForRuntime(function.Spec.Java, function.Spec.Python, function.Spec.Golang)...)
}
function.Spec.Image = getFunctionRunnerImage(&function.Spec)
return &corev1.Container{
// TODO new container to pull user code image and upload jars into bookkeeper
Name: "pulsar-function",
Image: getFunctionRunnerImage(&function.Spec),
Image: function.Spec.Image,
Command: makeFunctionCommand(function),
Ports: []corev1.ContainerPort{GRPCPort, MetricsPort},
Env: generateContainerEnv(function),
Expand Down Expand Up @@ -246,7 +247,8 @@ func makeFunctionCommand(function *v1alpha1.Function) []string {
spec.GenericRuntime.Language, spec.ClusterName,
generateFunctionDetailsInJSON(function), string(function.UID),
spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap,
function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig)
function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig,
function.Spec.GenericRuntime.LogLevel)
}
}

Expand Down
Loading