order |
---|
1 |
English | 简体中文
At present, BitSail supports flink deployment on Yarn and native Kubernetes.
Here are the contents of this part:
Below is a step-by-step guide to help you effectively deploy it on Yarn.
To support Yarn deployment, HADOOP_CLASSPATH
has to be set in system environment properties. There are two ways to set this environment property:
-
Set
HADOOP_CLASSPATH
directly. -
Set
HADOOP_HOME
targeting to the hadoop dir in deploy environment. The bitsail scripts will use the following command to generateHADOOP_CLASSPATH
.
if [ -n "$HADOOP_HOME" ]; then
export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
fi
After packaging, the project production contains a file conf/bitsail.conf. This file describes the system configuration of deployment environment, including the flink path and some other default parameters.
Here are some frequently-used options in the configuration file:
Prefix | Parameter name | Description | Example |
---|---|---|---|
sys.flink. | flink_home | The root dir of flink. | ${BITSAIL_HOME}/embedded/flink |
checkpoint_dir | The path storing the meta data file and data files of checkpoints. Reference: Flink Checkpoints |
"hdfs://opensource/bitsail/flink-1.11/checkpoints/" | |
flink_default_properties | General flink runtime options configued by "-D". | { classloader.resolve-order: "child-first" akka.framesize: "838860800b" rest.client.max-content-length: 838860800 rest.server.max-content-len } |
You can use the startup script bin/bitsail
to submit flink jobs to yarn.
The specific commands are as follows:
bash ./bin/bitsail run --engine flink --conf [job_conf_path] --execution-mode run --queue [queue_name] --deployment-mode yarn-per-job [--priority [yarn_priority] -p/--props [name=value]]
Parameter description
- Required parameters
- queue_name: Target yarn queue
- job_conf_path: Path of job configuration file
- Optional parameters
- yarn_priority: Job priority on yarn
- name=value: Flink properties, for example
classloader.resolve-order=child-first
- name: Property key. Configurable flink parameters that will be transparently transmitted to the flink task.
- value: Property value.
Submit a fake source to print sink test to yarn.
bash ./bin/bitsail run --engine flink --conf ~/bitsail-archive-0.2.0-SNAPSHOT/examples/Fake_Print_Example.json --execution-mode run -p 1=1 --deployment-mode yarn-per-job --queue default
Please check ${FLINK_HOME}/log/
folder to read the log file of BitSail client.
Please go to Yarn WebUI to check the logs of Flink JobManager and TaskManager.
Suppose that BitSail install path is: ${BITSAIL_HOME}
.
After building BitSail, we can enter the following path and find runnable jars and example job configuration files:
cd ${BITSAIL_HOME}/bitsail-dist/target/bitsail-dist-0.2.0-SNAPSHOT-bin/bitsail-archive-0.2.0-SNAPSHOT/
Users can use commands --deployment-mode remote
to submit a BitSail job to remote flink session.
Use examples/Fake_Print_Example.json as example to start a BitSail job:
<job-manager-address>
: the address of job manager, should be host:port, e.g.localhost:8081
.
bash bin/bitsail run \
--engine flink \
--execution-mode run \
--deployment-mode remote \
--conf examples/Fake_Print_Example.json \
--jm-address <job-manager-address>
For example, we can use the script bitsail-archive-0.1.0-SNAPSHOT/embedded/flink/bin/start-cluster.sh
to start a standalone session. Then we can run the example with following commands:
bash bin/bitsail run \
--engine flink \
--execution-mode run \
--deployment-mode remote \
--conf examples/Fake_Print_Example.json \
--jm-address localhost:8081
Then you can visit Flink WebUI to see the running job. In task manager, we can see the output of the Fake_to_Print job in its stdout.
Users can use commands --deployment-mode local
to run a BitSail job locally.
Use examples/Fake_Print_Example.json as example to start a BitSail job:
bash bin/bitsail run \
--engine flink \
--execution-mode run \
--deployment-mode local \
--conf examples/Fake_Print_Example.json
Take examples/Fake_hive_Example.json as another example:
- Remember fulfilling the job configuration with an available hive source before run the command:
job.writer.db_name
: the hive database to write.job.writer.table_name
: the hive table to write.job.writer.metastore_properties
: add hive metastore address to it, like:
{ "job": { "writer": { "metastore_properties": "{\"hive.metastore.uris\":\"thrift://localhost:9083\"}" } } }
Then you can use the similar command to submit a BitSail job to specified Flink session:
bash bin/bitsail run \
--engine flink \
--execution-mode run \
--deployment-mode local \
--conf examples/Fake_Hive_Example.json
When any of the reader or writer data source is relate to hadoop, e.g., hive_to_print
job, the hadoop libs are needed.
There are two ways to offer hadoop libs for local minicluster:
- If you already have local hadoop environment, then you can directly set
$HADOOP_HOME
to the folder of your hadoop libs. For example:
export HADOOP_HOME=/usr/local/hadoop-3.1.1
- If there is no hadoop environment, you can use
flink-shaded-hadoop
. Remember moving the uber jar to your flink lib dir. For example, suppose the flink root dir is/opt/flink
:
# download flink-shaded-hadoop-uber jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
# move to flink libs
mv flink-shaded-hadoop-2-uber-2.7.5-10.0.jar /opt/flink/lib/flink-shaded-hadoop-uber.jar
At present, BitSail supports native Kubernetes via Flink 1.11 engine.
Below is a step-by-step guide to help you effectively deploy it on native Kubernetes. Currently, BitSail support Application deployment mode: Allows users to create a single image containing their Job and the Flink runtime, which will automatically create and destroy cluster components as needed.
- Kubernetes >= 1.9
- KubeConfig, which has access to list, create, delete pods and services, configurable via
~/.kube/config
. You can verify permissions by runningkubectl auth can-i <list|create|edit|delete> pods
- Kubernetes DNS enabled
- Have compiled BitSail ready (After building with
${BITSAIL_HOME}/build.sh
, the artifacts will be located in${BITSAIL_HOME}/output/
)
If you have problems setting up a Kubernetes cluster, then take a look at how to setup a Kubernetes cluster.
Role-based access control (RBAC) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. Users can configure RBAC roles and service accounts used by JobManager to access the Kubernetes API server within the Kubernetes cluster.
Every namespace has a default service account. However, the default
service account may not have the permission to create or delete pods within the Kubernetes cluster. Users can instead use the following command to create a new service account <self-defined-service-account>
and set the role binding. Then use the config option kubernetes.service-account=<self-defined-service-account>
to make the JobManager pod use the <self-defined-service-account>
service account to create/delete TaskManager pods and leader ConfigMaps. Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.
$ kubectl create serviceaccount <self-defined-service-account> # Please replace <self-defined-service-account> with a custom name
$ kubectl create clusterrolebinding <self-defined-cluster-role-binding> --clusterrole=edit --serviceaccount=default:<self-defined-service-account> # Please replace <self-defined-service-account> and <self-defined-cluster-role-binding> with custom names
Application mode allows users to create a single image containing their Job and the Flink runtime, which will automatically create and destroy cluster components as needed. The Flink community provides base docker images customized for any use case.
Build your <CustomImage>
using the Dockerfile
from ${BITSAIL_HOME}/output/Dockerfile
:
Publish your <CustomImage>
onto Dockerhub so that Kubernetes cluster can download:
How to create and manage docker repositories.
docker build -t <your docker repository>:<tag>
docker push <your docker repository>:<tag>
bash ${BITSAIL_HOME}/bin/bitsail run \
--engine flink \
--target kubernetes-application \
--deployment-mode kubernetes-application \
--execution-mode run-application \
-p kubernetes.jobmanager.service-account=<self-defined-service-account> \
-p kubernetes.container.image=<CustomImage> \
-p kubernetes.jobmanager.cpu=0.25 \
-p kubernetes.taskmanager.cpu=0.5 \
--conf-in-base64 <base64 conf>
User can specify more configurations by adding more -p key=value
in bitsail command lines.
Configurations:
Key | Required or Optional | Default | Type | Description |
---|---|---|---|---|
kubernetes.cluster-id | Optional | bitsail-<instance-id> | String | The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random numeric ID with 'bitsail-' prefix. |
kubernetes.cluster.jar.path | Optional | "/opt/bitsail/bitsail-core.jar" | String | The BitSail jar path in kubernetes cluster. |
kubernetes.container.image | Required | The default value depends on the actually running version. In general it looks like "flink:<FLINK_VERSION>-scala_<SCALA_VERSION>" | String | Image to use for BitSail containers. The specified image must be based upon the same Apache Flink and Scala versions as used by the application. Visit https://hub.docker.com/_/flink?tab=tags for the images provided by the Flink project. |
kubernetes.container.image.pull-policy | Optional | IfNotPresent | Enum. Possible values: [IfNotPresent, Always, Never] | The Kubernetes container image pull policy (IfNotPresent or Always or Never). The default policy is IfNotPresent to avoid putting pressure to image repository. |
kubernetes.container.image.pull-secrets | Optional | (none) | List <String> | A semicolon-separated list of the Kubernetes secrets used to access private image registries. |
kubernetes.hadoop.conf.config-map.name | Optional | (none) | String | Specify the name of an existing ConfigMap that contains custom Hadoop configuration to be mounted on the JobManager(s) and TaskManagers. |
kubernetes.jobmanager.cpu | Optional | 1.0 | Double | The number of cpu used by job manager |
kubernetes.jobmanager.service-account | Required | "default" | String | Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server. |
kubernetes.namespace | Optional | "default" | String | The namespace that will be used for running the jobmanager and taskmanager pods. |
kubernetes.taskmanager.cpu | Optional | -1.0 | Double | The number of cpu used by task manager. By default, the cpu is set to the number of slots per TaskManager |
Users can go to Flink WebUI to cancel running jobs.
Alternatively, users can run the following bitSail command to cancel a job.
Noted that
<jobId>
can be retrieved from Flink JobManager, either from logs or WebUI.<cluster-id>
can be retrieved fromkubectl get deployment
kubectl get deployment
# expected output
NAME READY UP-TO-DATE AVAILABLE AGE
<cluster-id> 1/1 1 1 22s
bash ${BITSAIL_HOME}/bin/bitsail stop \
--engine flink \
--target kubernetes-application \
--deployment-mode kubernetes-application \
--execution-mode cancel \
-p kubernetes.cluster-id=<cluster-id> \
--job-id <jobId>
If users want to delete the whole application, users can run kubectl
commands to delete the whole deployment in order to stop the application
kubectl delete deployments bitsail-job
There are three types of logs:
- BitSail client log:
${FLINK_HOME}/log/flink-xxx.log
on client end - BitSail JobManager log:
/opt/flink/log/jobmanager.log
on Kubernetes JobManager pod - BitSail TaskManager log:
/opt/flink/log/taskmanager.log
on Kubernetes TaskManager pod
If you want to use kubectl logs <PodName>
to view the logs, you must perform the following:
- Add a new appender to the log4j.properties in the Flink client.
- Add the following ‘appenderRef’ the rootLogger in log4j.properties
rootLogger.appenderRef.console.ref = ConsoleAppender
. - Stop and start your Application again. Now you could use kubectl logs to view your logs.
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
User can dump JobManager/TaskManager logs on client end by running kubectl
commands
# During job running
kubectl get pods # Will return jobmanager pod and taskmanager pod
kubectl logs -f <jobmanagerPod> # Will dump jobManager log
kubectl logs -f <taskmanagerPod> # Will dump taskManager log
Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. More information in https://nightlies.apache.org/flink/flink-docs-release-1.11/monitoring/historyserver.html
Start or stop the HistoryServer
${FLINK_HOME}/bin/historyserver.sh (start|start-foreground|stop)
Run BitSail command line to configure history server.
bash ${BITSAIL_HOME}/bin/bitsail run \
--engine flink \
--target kubernetes-application \
--deployment-mode kubernetes-application \
--execution-mode run-application \
-p kubernetes.cluster-id=<cluster-id> \
-p kubernetes.jobmanager.service-account=<self-defined-service-account> \
-p kubernetes.container.image=<CustomImage> \
-p kubernetes.jobmanager.cpu=0.25 \
-p kubernetes.taskmanager.cpu=0.5 \
-p jobmanager.archive.fs.dir=hdfs:///completed-jobs/ \
-p historyserver.web.address=0.0.0.0 \
-p historyserver.web.port 8082 \
-p historyserver.archive.fs.dir hdfs:///completed-jobs/ \
-p historyserver.archive.fs.refresh-interval 10000 \
--conf-in-base64 <base64 conf>