当前位置: 首页 > news >正文

k8s之flink的几种创建方式

在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库

注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启

一、session 模式

Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。


Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

1.1 Native Kubernetes 模式

Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。

在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。

1.1.1 构建镜像 Dockerfile

1.创建dockerfileFROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-82.开始构建镜像
docker build -t  192.168.20.62:2333/bigdata/flink-session:1.16.23.上传镜像
docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2

1.1.2 创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.1.3 创建 flink 集群

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

1.1.4 提交任务

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar \
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \

1.1.5 删除 flink 集群

kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

1.2 Standalone 模式

Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。

1.2.1 创建docker-entrypoint.sh脚本

#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

1.2.2 编排 Dockerfile

FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.shRUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]

1.2.3 开始构建镜像

docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache# 上传镜像
docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2

1.2.4 创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.2.5 编排 yaml 文件

1.2.5.1  flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
1.2.5.3 jobmanager-rest-service.yaml

将 jobmanager rest端口公开为公共 Kubernetes 节点的端口

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口

apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
1.2.5.5  jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.5.6  taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

1.2.6 创建 flink 集群

kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink

1.2.7 提交任务

./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar

1.2.8 删除集群

kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force

二 、application 模式(推荐)

Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件

  • 运行JobManager的应用程序

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

2.1 Native Kubernetes 模式(常用)

2.1.1 构建镜像 Dockerfile

FROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY   ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/开始构建镜像docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
docker push  192.168.20.62:2333/bigdata/flink-application:1.16.2

2.1.2 创建命名空间和 serviceacount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.1.3 创建 flink 集群并提交任务

./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=my-first-application-cluster  \-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dexternal-resource.limits.kubernetes.memory=2Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///opt/flink/usrlib/TopSpeedWindowing.jar

local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

2.1.4 删除 flink 集群

kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force

2.2 Standalone 模式

在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib

NFS共享存储服务

2.2.1 创建启动脚本docker-entrypoint.sh

#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

2.2.2 编排Dockerfile

FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/COPY /RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache
RUN chmod +x /opt/apache/docker-entrypoint.sh#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2

2.2.3 创建命名空间和 serviceacount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.2.4 flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF

2.2.5 jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager

2.2.6  jobmanager-rest-service.yaml

将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager

2.2.7 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager

2.2.8 jobmanager-application-non-ha.yaml

apiVersion: batch/v1
kind: Job
metadata:name: flink-jobmanager
spec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib

2.2.9 taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib

2.2.10 创建 flink 集群并提交任务

kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f  jobmanager-application-non-ha.yaml -n flink
kubectl create -f  taskmanager-job-deployment.yaml -n flink

2.2.11 删除 flink 集群

kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flinkkubectl delete ns flink --force

相关文章:

k8s之flink的几种创建方式

在此之前需要部署一下私人docker仓库&#xff0c;教程搭建 Docker 镜像仓库 注意&#xff1a;每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启 一、session 模式 Session 模式是指在 Kubernetes 上启动一个共享的…...

应用OpenCV绘制箭头

绘制箭头函数 方法&#xff1a;函数cv2.arrowedLine( ) 语法格式&#xff1a;cv2.arrowedLine(img, pt1, pt2, color[, thickness[, line_type[, shift[, tipLength]]]]) 参数说明&#xff1a; img&#xff1a;要画的直线所在的图像&#xff0c;也称为画布。。 pt1&#x…...

信息学奥赛一本通1032:大象喝水查

1032&#xff1a;大象喝水查 时间限制: 1000 ms 内存限制: 65536 KB 提交数: 104347 通过数: 64726 【题目描述】 一只大象口渴了&#xff0c;要喝20升水才能解渴&#xff0c;但现在只有一个深h厘米&#xff0c;底面半径为r厘米的小圆桶(h和r都是整数)。问大象至少…...

聊聊jvm的direct buffer统计

序 本文主要研究一下jvm的direct buffer统计 spring boot metrics jvm.memory.used {"name": "jvm.memory.used","description": "The amount of used memory","baseUnit": "bytes","measurements"…...

C/C++ 位段

目录 什么是位段&#xff1f; 位段的内存分配 位段的跨平台问题 什么是位段&#xff1f; 位段的声明与结构是类似的&#xff0c;但是有两个不同&#xff1a; 位段的成员必须是 int、unsigned int 或signed int 等整型家族。位段的成员名后边有一个冒号和一个数字 这是一个…...

Peter算法小课堂—树的应用

开篇先给大家讲个东西&#xff0c;叫vector&#xff0c;有老师称之为“向量”&#xff0c;当然与数学中的向量不一样啊&#xff0c;所以我要称之为“长度可变的数组” vector 头文件&#xff1a;#include <vector> 用法&#xff1a;vector<int> d; 尾部增加元素…...

FineBI:简介

1 介绍 FineBI 是帆软软件有限公司推出的一款商业智能&#xff08;Business Intelligence&#xff09;产品。 FineBI 是定位于自助大数据分析的 BI 工具&#xff0c;能够帮助企业的业务人员和数据分析师&#xff0c;开展以问题导向的探索式分析。 2 现阶段数据分析弊端 现阶…...

原神单机版【完全无脑搭建】⭐纯单机⭐*稳定版*

版本介绍 版本3.7稳定版【过分追新并不稳&#xff0c;合理才完美】 独家原神&#xff0c;游戏内自带剧情任务&#xff0c;完美仿官&#xff0c;一比一完美复制&#xff01; 已经拥有完美剧情、任务、副本、卡池、深渊、全物品、和全部功能和皮肤。 送&#xff1a;GM全套工具…...

用通俗易懂的方式讲解:万字长文带你入门大模型

告别2023&#xff0c;迎接2024。大模型技术已成为业界关注焦点&#xff0c;你是否也渴望掌握这一领域却又不知从何学起&#xff1f; 本篇文章将特别针对入门新手&#xff0c;以浅显易懂的方式梳理大模型的发展历程、核心网络结构以及数据微调等关键技术。 如果你在阅读中收获…...

Invalid options in vue.config.js: “plugins“ is not allowed

项目场景&#xff1a; 安装并配置elementPlus报错。 问题描述 "plugins" is not allowed. plugins不被允许。参考官网修改配置文件vue.config.js。 解决方案&#xff1a; const AutoImport require(unplugin-auto-import/webpack) const Components require(un…...

四、C语言中的数组:数组的创建与初始化

其实在之前的学习中我们已经或多或少接触到了数组&#xff0c;有关scanf()的安全用法中我们提到了如何避免数组溢出的问题&#xff0c;详情可以查看二、C语言数据类型与变量&#xff08;scanf和printf (4&#xff09;完) 这一章我们将详细学习数组在C语言中的应用 1.数组的概…...

html5中各标签的语法格式总结以及属性值说明

有关闭标签的元素 a元素 <a href"" target"" title""></a>表格相关元素 table元素&#xff1a;表格标签caption元素&#xff1a;表头thead元素tbody元素&#xff1a;表格主体元素tfoot元素th元素tr元素&#xff1a;行标签td元素&…...

力扣(leetcode)第412题Fizz Buzz(Python)

412.Fizz Buzz 题目链接&#xff1a;412.Fizz Buzz 给你一个整数 n &#xff0c;找出从 1 到 n 各个整数的 Fizz Buzz 表示&#xff0c;并用字符串数组 answer&#xff08;下标从 1 开始&#xff09;返回结果&#xff0c;其中&#xff1a; answer[i] “FizzBuzz” 如果 i 同…...

苦学golang半年,写了一款web服务器

苦学golang半年&#xff0c;写了一款web服务器 文章目录 苦学golang半年&#xff0c;写了一款web服务器example 项目地址&#xff1a;https://github.com/fengyuan-liang/jet-web-fasthttp 可以的话&#xff0c;请star支持一下&#x1f642; 苦学golang半年&#xff0c;写了一款…...

uniapp vue2 车牌号输入组件记录

uniapp vue2 车牌号输入案例记录 组件如图 直接上代码 1.html <template><view><view class"plate" :class"{show: show}"><view class"itemFirst flex-d"><view class"item item1" click"handl…...

Unity 点击对话系统(含Demo)

点击对话系统 可实现点击物体后自动移动到物体附近&#xff0c;然后弹出对话框进行对话。 基于Unity 简单角色对话UI脚本的编写&#xff08;新版UI组件&#xff09;和Unity 关于点击不同物品移动并触发不同事件的结合体&#xff0c;有兴趣可以看一下之前文章。 下边代码为U…...

vue接入高德地图

使用 JSAPI 安全模式,代理服务请以_AMapService 作为一级路由 index.html <script type"text/javascript">window._AMapSecurityConfig {serviceHost: "http://xx.xx.xx.xx:8223/_AMapService"};</script><script type"text/javascr…...

Linux的基本指令(5)

目录 bc指令 uname指令 压缩解压相关的指令 zip指令 unzip指令 tar打包压缩指令 tar解压解包指令 ​传输指令sz&rz 热键 关机命令 安装&#xff1a;yum install -y 指令 bc指令 bc命令可以很方便的进行浮点运算 Linux中的计算器 uname指令 语法&#xff1a;unam…...

华为商城秒杀时加密验证 device_data 的算法研究

前言 之前华为商城放出 Mate60 手机时, 想给自己和家人抢购一两台&#xff0c;手动刷了好几天无果后&#xff0c;决定尝试编写程序&#xff0c;直接发送 POST 请求来抢。通过抓包和简单重放发送后&#xff0c;始终不成功。仔细研究&#xff0c;发现 Cookie 中有一个名为 devic…...

Wrk压测发送Post请求的正确姿势

一、Wrk简介 wrk 是一个能够在单个多核 CPU 上产生显著负载的现代 HTTP 基准测试工具。它采用了多线程设计&#xff0c;并使用了像 epoll 和 kqueue 这样的可扩展事件通知机制。此外&#xff0c;用户可以指定 LuaJIT 脚本来完成 HTTP 请求生成、响应处理和自定义报告等功能。 …...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题&#xff0c;说是客户的导入文件模版想支持部分导入内容的下拉选&#xff0c;于是我就找了easyexcel官网寻找解决方案&#xff0c;并没有找到合适的方案&#xff0c;没办法只能自己动手并分享出来&#xff0c;针对Java生成Excel下拉菜单时因选项过多导…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...