k8s之flink的几种创建方式
在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库
注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启
一、session 模式
Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
-
运行
JobManager
的部署 -
TaskManagers
池的部署 -
暴露
JobManager
的 REST 和 UI 端口的服务
1.1 Native Kubernetes 模式
Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。
在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl
命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。
1.1.1 构建镜像 Dockerfile
1.创建dockerfileFROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-82.开始构建镜像
docker build -t 192.168.20.62:2333/bigdata/flink-session:1.16.23.上传镜像
docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2
1.1.2 创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.1.3 创建 flink 集群
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort
1.1.4 提交任务
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar \
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \
1.1.5 删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force
1.2 Standalone 模式
Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。
1.2.1 创建docker-entrypoint.sh脚本
#!/usr/bin/env bash###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf " Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
1.2.2 编排 Dockerfile
FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.shRUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
1.2.3 开始构建镜像
docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache# 上传镜像
docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
1.2.4 创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.2.5 编排 yaml 文件
1.2.5.1 flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
1.2.5.3 jobmanager-rest-service.yaml
将 jobmanager rest端口公开为公共 Kubernetes 节点的端口
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
1.2.5.5 jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.5.6 taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.6 创建 flink 集群
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink
1.2.7 提交任务
./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar
1.2.8 删除集群
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force
二 、application 模式(推荐)
Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件
-
运行
JobManager
的应用程序 -
TaskManagers
池的部署 -
暴露
JobManager
的 REST 和 UI 端口的服务
2.1 Native Kubernetes 模式(常用)
2.1.1 构建镜像 Dockerfile
FROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/开始构建镜像docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-application:1.16.2
2.1.2 创建命名空间和 serviceacount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.1.3 创建 flink 集群并提交任务
./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=my-first-application-cluster \-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dexternal-resource.limits.kubernetes.memory=2Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///opt/flink/usrlib/TopSpeedWindowing.jar
local
是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。
2.1.4 删除 flink 集群
kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force
2.2 Standalone 模式
在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib
NFS共享存储服务
2.2.1 创建启动脚本docker-entrypoint.sh
#!/usr/bin/env bash###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf " Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
2.2.2 编排Dockerfile
FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/COPY /RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache
RUN chmod +x /opt/apache/docker-entrypoint.sh#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
2.2.3 创建命名空间和 serviceacount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.2.4 flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
2.2.5 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
2.2.6 jobmanager-rest-service.yaml
将 jobmanager rest
端口公开为公共 Kubernetes 节点的端口。
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
2.2.7 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
2.2.8 jobmanager-application-non-ha.yaml
apiVersion: batch/v1
kind: Job
metadata:name: flink-jobmanager
spec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib
2.2.9 taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib
2.2.10 创建 flink 集群并提交任务
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-application-non-ha.yaml -n flink
kubectl create -f taskmanager-job-deployment.yaml -n flink
2.2.11 删除 flink 集群
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flinkkubectl delete ns flink --force
相关文章:
k8s之flink的几种创建方式
在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库 注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启 一、session 模式 Session 模式是指在 Kubernetes 上启动一个共享的…...

应用OpenCV绘制箭头
绘制箭头函数 方法:函数cv2.arrowedLine( ) 语法格式:cv2.arrowedLine(img, pt1, pt2, color[, thickness[, line_type[, shift[, tipLength]]]]) 参数说明: img:要画的直线所在的图像,也称为画布。。 pt1&#x…...
信息学奥赛一本通1032:大象喝水查
1032:大象喝水查 时间限制: 1000 ms 内存限制: 65536 KB 提交数: 104347 通过数: 64726 【题目描述】 一只大象口渴了,要喝20升水才能解渴,但现在只有一个深h厘米,底面半径为r厘米的小圆桶(h和r都是整数)。问大象至少…...
聊聊jvm的direct buffer统计
序 本文主要研究一下jvm的direct buffer统计 spring boot metrics jvm.memory.used {"name": "jvm.memory.used","description": "The amount of used memory","baseUnit": "bytes","measurements"…...

C/C++ 位段
目录 什么是位段? 位段的内存分配 位段的跨平台问题 什么是位段? 位段的声明与结构是类似的,但是有两个不同: 位段的成员必须是 int、unsigned int 或signed int 等整型家族。位段的成员名后边有一个冒号和一个数字 这是一个…...
Peter算法小课堂—树的应用
开篇先给大家讲个东西,叫vector,有老师称之为“向量”,当然与数学中的向量不一样啊,所以我要称之为“长度可变的数组” vector 头文件:#include <vector> 用法:vector<int> d; 尾部增加元素…...

FineBI:简介
1 介绍 FineBI 是帆软软件有限公司推出的一款商业智能(Business Intelligence)产品。 FineBI 是定位于自助大数据分析的 BI 工具,能够帮助企业的业务人员和数据分析师,开展以问题导向的探索式分析。 2 现阶段数据分析弊端 现阶…...
原神单机版【完全无脑搭建】⭐纯单机⭐*稳定版*
版本介绍 版本3.7稳定版【过分追新并不稳,合理才完美】 独家原神,游戏内自带剧情任务,完美仿官,一比一完美复制! 已经拥有完美剧情、任务、副本、卡池、深渊、全物品、和全部功能和皮肤。 送:GM全套工具…...

用通俗易懂的方式讲解:万字长文带你入门大模型
告别2023,迎接2024。大模型技术已成为业界关注焦点,你是否也渴望掌握这一领域却又不知从何学起? 本篇文章将特别针对入门新手,以浅显易懂的方式梳理大模型的发展历程、核心网络结构以及数据微调等关键技术。 如果你在阅读中收获…...
Invalid options in vue.config.js: “plugins“ is not allowed
项目场景: 安装并配置elementPlus报错。 问题描述 "plugins" is not allowed. plugins不被允许。参考官网修改配置文件vue.config.js。 解决方案: const AutoImport require(unplugin-auto-import/webpack) const Components require(un…...
四、C语言中的数组:数组的创建与初始化
其实在之前的学习中我们已经或多或少接触到了数组,有关scanf()的安全用法中我们提到了如何避免数组溢出的问题,详情可以查看二、C语言数据类型与变量(scanf和printf (4)完) 这一章我们将详细学习数组在C语言中的应用 1.数组的概…...

html5中各标签的语法格式总结以及属性值说明
有关闭标签的元素 a元素 <a href"" target"" title""></a>表格相关元素 table元素:表格标签caption元素:表头thead元素tbody元素:表格主体元素tfoot元素th元素tr元素:行标签td元素&…...
力扣(leetcode)第412题Fizz Buzz(Python)
412.Fizz Buzz 题目链接:412.Fizz Buzz 给你一个整数 n ,找出从 1 到 n 各个整数的 Fizz Buzz 表示,并用字符串数组 answer(下标从 1 开始)返回结果,其中: answer[i] “FizzBuzz” 如果 i 同…...

苦学golang半年,写了一款web服务器
苦学golang半年,写了一款web服务器 文章目录 苦学golang半年,写了一款web服务器example 项目地址:https://github.com/fengyuan-liang/jet-web-fasthttp 可以的话,请star支持一下🙂 苦学golang半年,写了一款…...

uniapp vue2 车牌号输入组件记录
uniapp vue2 车牌号输入案例记录 组件如图 直接上代码 1.html <template><view><view class"plate" :class"{show: show}"><view class"itemFirst flex-d"><view class"item item1" click"handl…...

Unity 点击对话系统(含Demo)
点击对话系统 可实现点击物体后自动移动到物体附近,然后弹出对话框进行对话。 基于Unity 简单角色对话UI脚本的编写(新版UI组件)和Unity 关于点击不同物品移动并触发不同事件的结合体,有兴趣可以看一下之前文章。 下边代码为U…...
vue接入高德地图
使用 JSAPI 安全模式,代理服务请以_AMapService 作为一级路由 index.html <script type"text/javascript">window._AMapSecurityConfig {serviceHost: "http://xx.xx.xx.xx:8223/_AMapService"};</script><script type"text/javascr…...

Linux的基本指令(5)
目录 bc指令 uname指令 压缩解压相关的指令 zip指令 unzip指令 tar打包压缩指令 tar解压解包指令 传输指令sz&rz 热键 关机命令 安装:yum install -y 指令 bc指令 bc命令可以很方便的进行浮点运算 Linux中的计算器 uname指令 语法:unam…...

华为商城秒杀时加密验证 device_data 的算法研究
前言 之前华为商城放出 Mate60 手机时, 想给自己和家人抢购一两台,手动刷了好几天无果后,决定尝试编写程序,直接发送 POST 请求来抢。通过抓包和简单重放发送后,始终不成功。仔细研究,发现 Cookie 中有一个名为 devic…...

Wrk压测发送Post请求的正确姿势
一、Wrk简介 wrk 是一个能够在单个多核 CPU 上产生显著负载的现代 HTTP 基准测试工具。它采用了多线程设计,并使用了像 epoll 和 kqueue 这样的可扩展事件通知机制。此外,用户可以指定 LuaJIT 脚本来完成 HTTP 请求生成、响应处理和自定义报告等功能。 …...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...

C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...

ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...