kubernetes源码分析 kubelet
简介
从官方的架构图中很容易就能找到 kubelet
执行 kubelet -h
看到 kubelet 的功能介绍:
-
kubelet 是每个 Node 节点上都运行的主要“节点代理”。使用如下的一个向 apiserver 注册 Node 节点:主机的
hostname
;覆盖host
的参数;或者云提供商指定的逻辑。 -
kubelet 基于
PodSpec
工作。PodSpec
是用YAML
或者JSON
对象来描述 Pod。Kubelet 接受通过各种机制(主要是 apiserver)提供的一组PodSpec
,并确保里面描述的容器良好运行。
除了由 apiserver 提供 PodSpec
,还可以通过以下方式提供:
-
文件
-
HTTP 端点
-
HTTP 服务器
kubelet 功能归纳一下就是上报 Node 节点信息,和管理(创建、销毁)Pod。 功能看似简单,实际不然。每一个点拿出来都需要很大的篇幅来讲,比如 Node 节点的计算资源,除了传统的 CPU、内存、硬盘,还提供扩展来支持类似 GPU 等资源;Pod 不仅仅有容器,还有相关的网络、安全策略等。
架构
kubelet 的架构由 N 多的组件组成,下面简单介绍下比较重要的几个:
PLEG
即 Pod Lifecycle Event Generator,字面意思 Pod 生命周期事件(ContainerStarted
、ContainerDied
、ContainerRemoved
、ContainerChanged
)生成器。
其维护着 Pod 缓存;定期通过 ContainerRuntime
获取 Pod 的信息,与缓存中的信息比较,生成如上的事件;将事件写入其维护的通道(channel)中。
PodWorkers
处理事件中 Pod 的同步。核心方法 managePodLoop()
间接调用 kubelet.syncPod()
完成 Pod 的同步:
-
如果 Pod 正在被创建,记录其延迟
-
生成 Pod 的 API Status,即
v1.PodStatus
:从运行时的 status 转换成 api status -
记录 Pod 从
pending
到running
的耗时 -
在
StatusManager
中更新 pod 的状态 -
杀掉不应该运行的 Pod
-
如果网络插件未就绪,只启动使用了主机网络(host network)的 Pod
-
如果 static pod 不存在,为其创建镜像(Mirror)Pod
-
为 Pod 创建文件系统目录:Pod 目录、卷目录、插件目录
-
使用
VolumeManager
为 Pod 挂载卷 -
获取 image pull secrets
-
调用容器运行时(container runtime)的
#SyncPod()
方法
PodManager
存储 Pod 的期望状态,kubelet 服务的不同渠道的 Pod
StatusProvider
提供节点和容器的统计信息,有 cAdvisor
和 CRI
两种实现。
ContainerRuntime
顾名思义,容器运行时。与遵循 CRI 规范的高级容器运行时进行交互。
生命周期事件生成器PLEG
对于 Pod来说,Kubelet 会从多个数据来源(api、file以及http) watch Pod spec 中的变化。对于容器来说,Kubelet 会定期轮询容器运行时,以获取所有容器的最新状态。随着 Pod 和容器数量的增加,轮询会产生较大开销,带来的周期性大量并发请求会导致较高的 CPU 使用率峰值,降低节点性能,从而降低节点的可靠性。为了降低 Pod 的管理开销,提升 Kubelet 的性能和可扩展性,引入了 PLEG(Pod Lifecycle Event Generator)。改进了之前的工作方式:
-
减少空闲期间的不必要工作(例如 Pod 的定义和容器的状态没有发生更改)。
-
减少获取容器状态的并发请求数量。
为了进一步降低损耗,社区推出了基于event实现的PLEG,当然也需要CRI运行时支持。
它定期检查节点上 Pod 运行情况,如果发现感兴趣的变化,PLEG 就会把这种变化包 装成 Event 发送给 Kubelet 的主同步机制 syncLoop 去处理。
syncLoop
Kubelet启动后通过syncLoop进入到主循环处理Node上Pod Changes事件,监听来自file,apiserver,http三类的事件并汇聚到kubetypes.PodUpdate Channel(Config Channel)中,由syncLoopIteration不断从kubetypes.PodUpdate Channel中消费。
源码分析
源码版本
origin/release-1.30
启动流程
入口函数
入口位于 cmd/kubelet/app/server.go 。
其中 RunKubelet() 接口下的 startKubelet 即是启动 kubelet 的启动函数。
startKubelet
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {// start the kubeletgo k.Run(podCfg.Updates())// start the kubelet serverif enableServer {go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)}if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))}go k.ListenAndServePodResources()
}
跳转到 pkg/kubelet/kubelet.go ,这里就是 kubelet 的启动接口代码所在。
kubelet.Run
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {// ...// Start the cloud provider sync managerif kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 加载部分模块,如镜像管理, 观察oom, if err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, "Failed to initialize internal modules")os.Exit(1)}// Start volume manager// 开启volume 管理go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)//...// 开启pleg, 即watch方式获取 runtime的event// Start the pod lifecycle event generator.kl.pleg.Start()// Start eventedPLEG only if EventedPLEG feature gate is enabled.if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {kl.eventedPleg.Start()}// 启动主进程,接口 kl.syncLoopIteration()
// 监听来自 file/apiserver/http 事件源发送过来的事件,并对事件做出对应的同步处理。kl.syncLoop(ctx, updates, kl)
}
事件watch
syncLoop
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {//...for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.ErrorS(err, "Skipping pod synchronization")// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())// updatedates为配置聚合通道,handler 为kubelet对象的接口集合// plegch 是来自runtime变化事件的通道if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, \housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}
syncLoop参数中的updates 是来自下面文件的配置聚合通道
// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {pods *podStoragemux *mux// the channel of denormalized changes passed to listeners// updates通道updates chan kubetypes.PodUpdate// contains the list of all configured sourcessourcesLock sync.Mutexsources sets.String
}
handler interface 传入的是实现了下面方法的kubelet对象
syncLoopIteration
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {// 获取来自配置通道的事件case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")return false}// 下面为对收到pod配置后各个操作的处理switch u.Op {// 关注ADD操作case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.ErrorS(nil, "Kubelet does not support snapshot update")default:klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)}kl.sourcesReady.AddSource(u.Source)// 获取来自pleg通道的事件case e := <-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)// 处理 同步自容器运行时的容器最新信息handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}//...}return true
}
Pod的创建
容器的创建
HandlePodAdditions
重点关注ADD操作
kubetypes.ADD
kubetypes.ADD 既是 Pod 的创建,它调用了 handler.HandlePodAdditions(u.Pods) 接口来响应 Pod 的创建操作,我们先看看 Pod 具体是怎么样被创建的。
-
HandlePodAdditions() 接口接收的是 pods 切片,意味着可能会出现多个 pod 同时创建。
-
pods 按创建时间排序,以此对排好序的 pods 进行循环遍历。
-
获取 kubelet 所在节点上现有的所有 pods 。
-
kl.podManager.AddPod(pod) 把当前 pod 信息缓存进 podManager 的 podByUID 这个 map 里面。
-
检查 pod 是否 MirrorPod ,如果是 MirrorPod ,则调用 kl.handleMirrorPod(pod, start) 对 Pod 进行处理 。
-
无论是 MirrorPod 还是正常 Pod ,最终都是调用 kl.dispatchWork() 接口进行处理,他们的区别在于传进去的参数的不同。
-
MirrorPod 是 静态 Pod 在 apiserver 上的一个镜像,kubelet 并不需要在节点上给他创建真实的容器,它的 ADD/UPDATE/DELETE 操作类型都被当做 UPDATE 来处理,更新它在 apiserver 上的状态。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {start := kl.clock.Now()// 按照创建事件排序sort.Sort(sliceutils.PodsByCreationTime(pods))if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {kl.podResizeMutex.Lock()defer kl.podResizeMutex.Unlock()}// 收到的chungking事件通知是一个pods数组,pods可能有多个for _, pod := range pods {existingPods := kl.podManager.GetPods()// Always add the pod to the pod manager. Kubelet relies on the pod// manager as the source of truth for the desired state. If a pod does// not exist in the pod manager, it means that it has been deleted in// the apiserver and no action (other than cleanup) is required.//把当前 pod 信息缓存进 podManager 的 podByUID 这个 map 里面kl.podManager.AddPod(pod)pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)if wasMirror {if pod == nil {klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)continue}kl.podWorkers.UpdatePod(UpdatePodOptions{Pod: pod,MirrorPod: mirrorPod,UpdateType: kubetypes.SyncPodUpdate,StartTime: start,})continue}//...// 转移到podworkers的Update继续处理pod的创建kl.podWorkers.UpdatePod(UpdatePodOptions{Pod: pod,MirrorPod: mirrorPod,UpdateType: kubetypes.SyncPodCreate,StartTime: start,})}
}
podWorkers.UpdatePod
// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
// terminating, or terminated, and will transition to terminating if: deleted on the apiserver,
// discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet.
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {//...// start the pod worker goroutine if it doesn't existpodUpdates, exists := p.podUpdates[uid]// 当没有该pod时将会执行创建流程, 是以goroutine 方式启动运行if !exists {// buffer the channel to avoid blocking this methodpodUpdates = make(chan struct{}, 1)p.podUpdates[uid] = podUpdates// ensure that static pods start in the order they are received by UpdatePodif kubetypes.IsStaticPod(pod) {p.waitingToStartStaticPodsByFullname[status.fullname] =append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)}// allow testing of delays in the pod update channelvar outCh <-chan struct{}if p.workerChannelFn != nil {outCh = p.workerChannelFn(uid, podUpdates)} else {outCh = podUpdates}// 在协程中启动pod// spawn a pod workergo func() {// TODO: this should be a wait.Until with backoff to handle panics, and// accept a context for shutdowndefer runtime.HandleCrash()defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)// 启动pod逻辑p.podWorkerLoop(uid, outCh)}()}//...
}
podWorkerLoop
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {var lastSyncTime time.Timefor range podUpdates {ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)//...// Take the appropriate action (illegal phases are prevented by UpdatePod)switch {case update.WorkType == TerminatedPod:err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)case update.WorkType == TerminatingPod:var gracePeriod *int64if opt := update.Options.KillPodOptions; opt != nil {gracePeriod = opt.PodTerminationGracePeriodSecondsOverride}podStatusFn := p.acknowledgeTerminating(podUID)// if we only have a running pod, terminate it directlyif update.Options.RunningPod != nil {err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)} else {err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)}default:// 继续进入pod的创建isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)}lastSyncTime = p.clock.Now()return err}()}
}
Kubelet.SyncPod
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {// ...sctx := context.WithoutCancel(ctx)// 创建podresult := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)kl.reasonCache.Update(pod.UID, result)// ...return false, nil
}
kubeGenericRuntimeManager.SyncPod
这里就是调用容器运行时 SyncPod() 接口的源码所在,很长,我们先总结一下他的步骤。
-
计算 sandbox 容器和普通容器的状态有没有变更。
-
如果必要,kill 掉 sandbox 容器重建。
-
如果容器不该被运行,则 kill 掉 pod 下的所有容器。
-
如果需要,创建 sandbox 容器。
-
创建临时容器。
-
创建初始化容器 initContainers 。
-
创建普通容器。
我们接下来,在根据上面的每一个步骤,来分析这些步骤的具体内容。
注意
sanbox 容器到底是什么东西?kubelet 创建 pod 的时候,你通过 docker ps 去看,会发现它使用 pause 镜像也跟着创建一个同名容器。
57f7fc7cf97b 605c77e624dd "/docker-entrypoint.…" 14 hours ago Up 14 hours k8s_nginx_octopus-deployment-549545bf46-x9cqm_default_c799bb7f-d5d9-41bd-ba60-9a968f0fac54_0 4e0c7fb21e49 registry.aliyuncs.com/google_containers/pause:3.2 "/pause" 14 hours ago Up 14 hours k8s_POD_octopus-deployment-549545bf46-x9cqm_default_c799bb7f-d5d9-41bd-ba60-9a968f0fac54_0
它是用来为 pod 下的一组容器创建各种 namespace 环境和资源隔离用的, pod 下的各个容器就可以在这个隔离的
环境里面共享各种资源。使得容器无需像 kvm 一样,需要创建一个操作系统实例来对资源进行隔离,这样就可以很好地利用到宿主机的各种资源,这也就是 kubernetes 的精髓所在。
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Resize running containers (if InPlacePodVerticalScaling==true)
// 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {// Step 1: Compute sandbox and container changes.// Step 2: Kill the pod if the sandbox has changed.// Step 3: kill any running containers in this pod which are not to keep.// Step 4: Create a sandbox for the pod if necessary.podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox {// ...podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)}//...// pod 中container 启动方法// Helper containing boilerplate common to starting all types of containers.// typeName is a description used to describe this type of container in log messages,// currently: "container", "init container" or "ephemeral container"// metricLabel is the label used to describe this type of container in monitoring metrics.// currently: "container", "init_container" or "ephemeral_container"start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))return err}metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()}klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are// useful to cluster administrators to distinguish "server errors" from "user errors".metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()}startContainerResult.Fail(err, msg)// known errors that are logged in other places are logged at higher levels here to avoid// repetitive log spamswitch {case err == images.ErrImagePullBackOff:klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)default:utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))}return err}return nil}// Step 5: start ephemeral containers// These are started "prior" to init containers to allow running ephemeral containers even when there// are errors starting an init container. In practice init containers will start first since ephemeral// containers cannot be specified on pod creation.// 创建临时容器, 主要用于需要在运行的 Pod 内执行一些临时任务,// 例如调试、日志收集、文件系统浏览等。for _, idx := range podContainerChanges.EphemeralContainersToStart {start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {// Step 6: start the init container.if container := podContainerChanges.NextInitContainerToStart; container != nil {// Start the next init container.if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {return}// Successfully started the container; clear the entry in the failureklog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))}} else {// 启动init container // 可以帮助确保在主应用容器启动之前完成必要的预处理任务,以确保应用程序的正确运行和可靠性。// Step 6: start init containers.for _, idx := range podContainerChanges.InitContainersToStart {container := &pod.Spec.InitContainers[idx]// Start the next init container.if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {if types.IsRestartableInitContainer(container) {klog.V(4).InfoS("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod))continue}klog.V(4).InfoS("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod))return}// Successfully started the container; clear the entry in the failureklog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))}}// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResourcesif isInPlacePodVerticalScalingAllowed(pod) {if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {m.doPodResizeAction(pod, podStatus, podContainerChanges, result)}}// 启动业务container// Step 8: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))}return
}
Init containers(初始化容器)是 Kubernetes 中的一个概念,它是一种特殊类型的容器,用于在 Pod 中运行在主应用容器启动之前执行的初始化任务。
当创建一个 Pod 时,除了主应用容器之外,你可以定义一个或多个 init containers。这些 init containers 会按照定义的顺序依次执行,并且每个 init container 必须成功完成(即退出状态码为 0)才能继续下一个 init container 的执行,以及主应用容器的启动。
Init containers 可以用于在主应用容器启动之前执行各种初始化任务,例如:
数据库初始化:在主应用容器启动之前,可以使用 init container 创建数据库并执行必要的初始化脚本。
依赖检查:在主应用容器启动之前,可以使用 init container 检查依赖的服务或资源是否可用。
文件下载:在主应用容器启动之前,可以使用 init container 下载配置文件或其他必要的资源。
安全设置:在主应用容器启动之前,可以使用 init container 配置安全相关的设置。
每个 init container 都可以有自己的镜像和配置,它们共享 Pod 的网络命名空间和存储卷。一旦所有的 init containers 成功完成,主应用容器将启动并加入到 Pod 中。
使用 init containers 可以帮助确保在主应用容器启动之前完成必要的预处理任务,以确保应用程序的正确运行和可靠性。
请注意,init containers 是顺序执行的,因此如果某个 init container 失败,则整个 Pod 的启动过程将被阻塞,直到该 init container 成功或手动重启 Pod。
pause容器的注入
containerd.RunPodSandbox
使用的pasue 是在containerd创建sandbox 时。在配置中指定的
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {...// Ensure sandbox container image snapshot.// 使用的sandbox镜像,一般就是pauseimage, err := c.ensureImageExists(ctx, c.config.SandboxImage, config)if err != nil {return nil, fmt.Errorf("failed to get sandbox image %q: %w", c.config.SandboxImage, err)}containerdImage, err := c.toContainerdImage(ctx, *image)if err != nil {return nil, fmt.Errorf("failed to get image from containerd %q: %w", image.ID, err)}ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler())if err != nil {return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)}log.G(ctx).WithField("podsandboxid", id).Debugf("use OCI runtime %+v", ociRuntime)runtimeStart := time.Now()// Create sandbox container.// NOTE: sandboxContainerSpec SHOULD NOT have side// effect, e.g. accessing/creating files, so that we can test// it safely.// NOTE: the network namespace path will be created later and update through updateNetNamespacePath functionspec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, "", ociRuntime.PodAnnotations)if err != nil {return nil, fmt.Errorf("failed to generate sandbox container spec: %w", err)}log.G(ctx).WithField("podsandboxid", id).Debugf("sandbox container spec: %#+v", spew.NewFormatter(spec))sandbox.ProcessLabel = spec.Process.SelinuxLabeldefer func() {if retErr != nil {selinux.ReleaseLabel(sandbox.ProcessLabel)}}()// ...container, err := c.client.NewContainer(ctx, id, opts...)if err != nil {return nil, fmt.Errorf("failed to create containerd container: %w", err)}// Add container into sandbox store in INIT state.sandbox.Container = containerdefer func() {// Put the sandbox into sandbox store when the some resource fails to be cleaned.if retErr != nil && cleanupErr != nil {log.G(ctx).WithError(cleanupErr).Errorf("encountered an error cleaning up failed sandbox %q, marking sandbox state as SANDBOX_UNKNOWN", id)if err := c.sandboxStore.Add(sandbox); err != nil {log.G(ctx).WithError(err).Errorf("failed to add sandbox %+v into store", sandbox)}}}()...
Pause 容器
Pause容器的功能可参考 pause.c文件
pause功能只是截获 SIGINT, SIGTERM, SIGCHLD 信号,其中SIGCHLD只是收集退出的子进程,避免成为孤儿进程。
puase容器主要是为了:
-
保持有一个容器在sandbox中运行,维持sandbox环境。
-
其他容器停止时需要在同空间下有主进程进行回收,避免成为孤儿进程。
/*
Copyright 2016 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>#define STRINGIFY(x) #x
#define VERSION_STRING(x) STRINGIFY(x)#ifndef VERSION
#define VERSION HEAD
#endifstatic void sigdown(int signo) {psignal(signo, "Shutting down, got signal");exit(0);
}static void sigreap(int signo) {while (waitpid(-1, NULL, WNOHANG) > 0);
}int main(int argc, char **argv) {int i;for (i = 1; i < argc; ++i) {if (!strcasecmp(argv[i], "-v")) {printf("pause.c %s\n", VERSION_STRING(VERSION));return 0;}}if (getpid() != 1)/* Not an error because pause sees use outside of infra containers. */fprintf(stderr, "Warning: pause should be the first process\n");// 收集退出的子进程if (sigaction(SIGINT, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)return 1;if (sigaction(SIGTERM, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)return 2;if (sigaction(SIGCHLD, &(struct sigaction){.sa_handler = sigreap,.sa_flags = SA_NOCLDSTOP},NULL) < 0)return 3;for (;;)pause();fprintf(stderr, "Error: infinite loop terminated\n");return 42;
}
相关文章:

kubernetes源码分析 kubelet
简介 从官方的架构图中很容易就能找到 kubelet 执行 kubelet -h 看到 kubelet 的功能介绍: kubelet 是每个 Node 节点上都运行的主要“节点代理”。使用如下的一个向 apiserver 注册 Node 节点:主机的 hostname;覆盖 host 的参数࿱…...

Web3 开发者周刊 36 | 构建自主未来:Agent、可扩展性与赏金
欢迎来到 Web3 开发者周刊 36,这里汇聚了赋能您的 Web3 构建之旅的各种资源。本周我们将剖析基于Agent的系统,讨论来自 Vitalik 关于以太坊 L1 和 L2 的最新思考,并提供最新高价值Bounty消息。 开始Build吧! ✅ One Trillion Age…...

零基础入门机器学习 -- 第十一章机器学习模型的评估与优化
如何判断你的模型到底行不行? 11.1 为什么需要评估模型? 场景设定:信用卡欺诈检测 想象你是ABC银行的风控经理,你每天的工作就是盯着上百万笔交易,防止客户的信用卡被盗刷。 你们银行新推出了一款机器学习模型&…...

菜鸟之路Day15一一IO流(一)
菜鸟之路Day15一一IO流(一) 作者:blue 时间:2025.2.8 文章目录 菜鸟之路Day15一一IO流(一)0.概述1.初识IO流1.1.什么是IO流?1.2.IO流的作用1.3.IO流的分类 2.IO流的体系结构3.字节输出流的基本…...

动手学Agent——Day2
文章目录 一、用 Llama-index 创建 Agent1. 测试模型2. 自定义一个接口类3. 使用 ReActAgent & FunctionTool 构建 Agent 二、数据库对话 Agent1. SQLite 数据库1.1 创建数据库 & 连接1.2 创建、插入、查询、更新、删除数据1.3 关闭连接建立数据库 2. ollama3. 配置对话…...

JSONObject,TreeUtil,EagelMap,BeanUtil使用
目录 JSONObject的使用 TreeUtil的使用 EagleMap使用 安装 application.yml配置 springboot导入依赖 配置信息 简单使用 如果想获取这个json字符串里面的distance的值 BeanUtil拷贝注意 JSONObject的使用 假如我现在要处理这样的json数据 可以直接使用JSONUtil.parseObj…...

Unity嵌入到Winform
Unity嵌入到Winform Winform工程🌈...

TCP/UDP协议与OSI七层模型的关系解析| HTTPS与HTTP安全性深度思考》
目录 OSI 7层模型每一层包含的协议: TCP和UDP协议: TCP (Transmission Control Protocol): UDP (User Datagram Protocol): 数据包流程图 TCP与UDP的区别: 传输层与应用层的关联 传输层和应用层的关联…...

《Zookeeper 分布式过程协同技术详解》读书笔记-2
目录 zk的一些内部原理和应用请求,事务和标识读写操作事务标识(zxid) 群首选举Zab协议(ZooKeeper Atomic Broadcast protocol)文件系统和监听通知机制分布式配置中心, 简单Demojava code 集群管理code 分布式锁 zk的一…...

缺陷检测之图片标注工具--labme
一、labelme简介 Labelme是开源的图像标注工具,常用做检测,分割和分类任务的图像标注。 它的功能很多,包括: 对图像进行多边形,矩形,圆形,多段线,线段,点形式的标注&a…...
机器学习_13 决策树知识总结
决策树是一种直观且强大的机器学习算法,广泛应用于分类和回归任务。它通过树状结构的决策规则来建模数据,易于理解和解释。今天,我们就来深入探讨决策树的原理、实现和应用。 一、决策树的基本概念 1.1 决策树的工作原理 决策树是一种基于…...
请解释一下Standford Alpaca格式、sharegpt数据格式-------deepseek问答记录
1 Standford Alpaca格式 json格式数据。Stanford Alpaca 格式是一种用于训练和评估自然语言处理(NLP)模型的数据格式,特别是在指令跟随任务中。它由斯坦福大学的研究团队开发,旨在帮助模型理解和执行自然语言指令。以下是该格式的…...
ubuntu 安装管理多版本python3 相关问题解决
背景:使用ubuntu 22.04 默认python 未3.10.编译一些模块的时候发现需要降级到python3.9.于是下载安装 下载: wget https://www.python.org/ftp/python/3.9.16/Python-3.9.16.tgz解压与编译 tar -xf Python-3.9.16.tgz cd Python-3.9.16 ./configure -…...

滑动窗口算法篇:连续子区间与子串问题
1.滑动窗口原理 那么一谈到子区间的问题,我们可能会想到我们可以用我们的前缀和来应用子区间问题,但是这里对于子区间乃至子串问题,我们也可以尝试往滑动窗口的思路方向去进行一个尝试,那么说那么半天,滑动窗口是什么…...

Python爬虫实战:股票分时数据抓取与存储 (1)
在金融数据分析中,股票分时数据是投资者和分析师的重要资源。它能够帮助我们了解股票在交易日内的价格波动情况,从而为交易决策提供依据。然而,获取这些数据往往需要借助专业的金融数据平台,其成本较高。幸运的是,通过…...

【设计模式】【行为型模式】访问者模式(Visitor)
👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD 🔥 2025本人正在沉淀中… 博客更新速度 👍 欢迎点赞、收藏、关注,跟上我的更新节奏 🎵 当你的天空突…...
基于实例详解pytest钩子pytest_generate_tests动态生成测试的全过程
关注开源优测不迷路 大数据测试过程、策略及挑战 测试框架原理,构建成功的基石 在自动化测试工作之前,你应该知道的10条建议 在自动化测试中,重要的不是工具 作为一名软件开发人员,你一定深知有效测试策略的重要性,尤其…...

Copilot基于企业PPT模板生成演示文稿
关于copilot创建PPT,咱们写过较多文章了: Copilot for PowerPoint通过文件创建PPT Copilot如何将word文稿一键转为PPT Copilot一键将PDF转为PPT,治好了我的精神内耗 测评Copilot和ChatGPT-4o从PDF创建PPT功能 Copilot for PPT全新功能&a…...

2025百度快排技术分析:模拟点击与发包算法的背后原理
一晃做SEO已经15年了,2025年还有人问我如何做百度快速排名,我能给出的答案就是:做好内容的前提下,多刷刷吧!百度的SEO排名算法一直是众多SEO从业者研究的重点,模拟算法、点击算法和发包算法是百度快速排名的…...

七星棋牌全开源修复版源码解析:6端兼容,200种玩法全面支持
本篇文章将详细讲解 七星棋牌修复版源码 的 技术架构、功能实现、二次开发思路、搭建教程 等内容,助您快速掌握该棋牌系统的开发技巧。 1. 七星棋牌源码概述 七星棋牌修复版源码是一款高度自由的 开源棋牌项目,该版本修复了原版中的多个 系统漏洞&#…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...

React第五十七节 Router中RouterProvider使用详解及注意事项
前言 在 React Router v6.4 中,RouterProvider 是一个核心组件,用于提供基于数据路由(data routers)的新型路由方案。 它替代了传统的 <BrowserRouter>,支持更强大的数据加载和操作功能(如 loader 和…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...