当前位置: 首页 > news >正文

kube-apiserver启动流程源码分析

1. 概述

KubeAPIServer 主要是提供对 API Resource 的操作请求,为 kubernetes 中众多 API 注册路由信息,暴露 RESTful API 并且对外提供 kubernetes service,使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。

2. 代码流图及

在这里插入图片描述

3. 源码分析

3.1 启动main函数

func main() {command := app.NewAPIServerCommand()code := cli.Run(command)os.Exit(code)
}

3.2 创建cobra.Command对象

// NewAPIServerCommand creates a *cobra.Command object with default parameters
// NewAPIServerCommand使用默认参数创建cobra.Command对象
func NewAPIServerCommand() *cobra.Command {s := options.NewServerRunOptions()cmd := &cobra.Command{Use: "kube-apiserver",Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,// stop printing usage when the command errorsSilenceUsage: true,PersistentPreRunE: func(*cobra.Command, []string) error {// silence client-go warnings.// kube-apiserver loopback clients should not log self-issued warnings.rest.SetDefaultWarningHandler(rest.NoWarnings{})return nil},RunE: func(cmd *cobra.Command, args []string) error {verflag.PrintAndExitIfRequested()fs := cmd.Flags()// Activate logging as soon as possible, after that// show flags with the final logging configuration.if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {return err}cliflag.PrintFlags(fs)// set default optionscompletedOptions, err := Complete(s)if err != nil {return err}// validate optionsif errs := completedOptions.Validate(); len(errs) != 0 {return utilerrors.NewAggregate(errs)}// add feature enablement metricsutilfeature.DefaultMutableFeatureGate.AddMetrics()return Run(completedOptions, genericapiserver.SetupSignalHandler())},Args: func(cmd *cobra.Command, args []string) error {for _, arg := range args {if len(arg) > 0 {return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)}}return nil},}// 设置flag标识fs := cmd.Flags()namedFlagSets := s.Flags()verflag.AddFlags(namedFlagSets.FlagSet("global"))globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))for _, f := range namedFlagSets.FlagSets {fs.AddFlagSet(f)}cols, _, _ := term.TerminalSize(cmd.OutOrStdout())cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)return cmd
}

3.3 APIServer启动流程,主要逻辑为:

  1. 调用CreateServerChain构建服务调用链并判断是否启动非安全的httpserverhttpserver链中包含 apiserver要启动的三个server,以及为每个server注册对应资源的路由;
  2. 调用server.PrepareRun进行服务运行前的准备,该方法主要完成了健康检查. 存活检查和OpenAPI路由的注册工作;
  3. 调用prepared.Run启动server;
// Run 运行指定的 APIServer,不会退出
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {// 注意:此版本不是k8s的versionklog.Infof("Version: %+v", version.Get())klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))// 创建调用链,通过delegation(代理)创建server(其实是代理聚合后的server)server, err := CreateServerChain(completeOptions)if err != nil {return err}// 进行一些准备工作, 注册一些hander,执行hook等,包括:通过设置 OpenAPI 规范并调用通用apiserver PrepareRun 来准备运行聚合服务。prepared, err := server.PrepareRun()if err != nil {return err}// 开始启动聚合服务return prepared.Run(stopCh)
}

3.3.1 服务调用链分析

初始化阶段, 通过CreateServerChain创建调用链:

创建过程主要有以下步骤:

  1. 根据配置构造apiserver的配置,调用方法CreateKubeAPIServerConfig
  2. 根据配置构造扩展的apiserver的配置,调用方法为createAPIExtensionsConfig
  3. 创建server,包括扩展的apiserver和原生的apiserver,调用方法为createAPIExtensionsServerCreateKubeAPIServer。主要就是将各个handler的路由方法注册到Container中去,完全遵循go-restful的设计模式,即将处理方法注册到Route中去,同一个根路径下的Route注册到WebService中去,WebService注册到Container中,Container负责分发。访问的过程为Container-->WebService-->Route
  4. 聚合server的配置和和创建。主要就是将原生的apiserver和扩展的apiserver的访问进行整合,添加后续的一些处理接口。调用方法为createAggregatorConfigcreateAggregatorServer
  5. 创建完成,返回配置的server信息

Aggregator 和 APIExtensionsServer 对应两种主要扩展 APIServer 资源的方式,即分别是 AA 和 CRD。

// CreateServerChain创建通过委托连接的apiserver。
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {// 1. 为 kubeAPIServer 创建配置kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)if err != nil {return nil, err}// 2. 判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))if err != nil {return nil, err}// 返回一个 HTTP 处理程序,该处理程序旨在在委托链的末尾执行。它检查是否在服务器安装所有已知的 HTTP 路径之前发出了请求。在这种情况下,它返回 503 响应,否则返回 404notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)//3. 创建 apiExtensionsServer 实例, 完成apiextensionsConfig的完全配置 以及创建一个包含通用apiserver(暴露group为"apiextensions.k8s.io"的api,支持crd等操作)的扩展服务apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))if err != nil {return nil, err}// 4. 初始化 KubeAPIServer实例kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)if err != nil {return nil, err}// 5. 创建 AggregatorConfigaggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)if err != nil {return nil, err}// 6. 初始化 AggregatorServer实例aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)if err != nil {// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routinesreturn nil, err}return aggregatorServer, nil
}

3.3.1.1 apiserver配置的创建:

CreateKubeAPIServerConfig->buildGenericConfig->genericapiserver.NewConfig

// 创建运行 API 服务器的所有资源,但不运行任何资源
func CreateKubeAPIServerConfig(s completedServerRunOptions) (*controlplane.Config,aggregatorapiserver.ServiceResolver,[]admission.PluginInitializer,error,
) {// 创建拨号器基础结构(隧道和传输层)以连接到节点。proxyTransport := CreateProxyTransport()// BuildGenericConfig 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)if err != nil {return nil, nil, nil, err}// s.AllowPrivileged 是否允许特权// 限制每个连接的吞吐量(目前只用于proxy、exec、attach方式)capabilities.Setup(s.AllowPrivileged, s.MaxConnectionBytesPerSec)// 应用指标选项 -- 执行些预置操作,比如启用 禁用指标s.Metrics.Apply()// 注册指标收集器serviceaccount.RegisterMetrics()// 构造控制平面controlplane的配置config := &controlplane.Config{GenericConfig: genericConfig,ExtraConfig: controlplane.ExtraConfig{APIResourceConfigSource: storageFactory.APIResourceConfigSource,StorageFactory:          storageFactory,EventTTL:                s.EventTTL,KubeletClientConfig:     s.KubeletConfig,EnableLogsSupport:       s.EnableLogsHandler,ProxyTransport:          proxyTransport,ServiceIPRange:          s.PrimaryServiceClusterIPRange,APIServerServiceIP:      s.APIServerServiceIP,SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,APIServerServicePort: 443,ServiceNodePortRange:      s.ServiceNodePortRange,KubernetesServiceNodePort: s.KubernetesServiceNodePort,EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),MasterCount:            s.MasterCount,ServiceAccountIssuer:        s.ServiceAccountIssuer,ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,ExtendExpiration:            s.Authentication.ServiceAccounts.ExtendExpiration,VersionedInformers: versionedInformers,},}// 获取 用来获取并验证证书内容的提供器clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()if err != nil {return nil, nil, nil, err}// 设置controlplane配置的验证证书内容的提供器config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider// 用来设置controlplane配置的请求头信息(包括认证证书等)requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()if err != nil {return nil, nil, nil, err}if requestHeaderConfig != nil {config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProviderconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNamesconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixesconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeadersconfig.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders}// 添加PostStartHook钩子函数if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {return nil, nil, nil, err}// 分发流量 --  controlplane cluster etcd等if config.GenericConfig.EgressSelector != nil {// 使用config.GenericConfig.EgressSelector loop查找以找到连接到 kubelet 的拨号器config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup// 使用 config.GenericConfig.EgressSelector 查找作为“代理”子资源使用的传输networkContext := egressselector.Cluster.AsNetworkContext()dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)if err != nil {return nil, nil, nil, err}c := proxyTransport.Clone()// 这里会替换拨号器c.DialContext = dialerconfig.ExtraConfig.ProxyTransport = c}// 加载公钥var pubKeys []interface{}for _, f := range s.Authentication.ServiceAccounts.KeyFiles {// 遍历serviceaccount的认证公私钥文件keys, err := keyutil.PublicKeysFromFile(f)if err != nil {return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)}// 追加到公钥数组中pubKeys = append(pubKeys, keys...)}// 设置serviceaccount的证书标识及公钥config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuers[0]config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURIconfig.ExtraConfig.ServiceAccountPublicKeys = pubKeysreturn config, serviceResolver, pluginInitializers, nil
}// 采用ServerRunOptions并生成与之关联的 genericapiserver.Config(kube-apiserver的通用配置)
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport,
) (genericConfig *genericapiserver.Config,versionedInformers clientgoinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,pluginInitializers []admission.PluginInitializer,admissionPostStartHook genericapiserver.PostStartHookFunc,storageFactory *serverstorage.DefaultStorageFactory,lastErr error,
) {// 使用apiserver 生成apiserver genericConfiggenericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)// 获取默认所有的gv(enable Disable)genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()// 把s.GenericServerRunOptions相关配置应用到genericConfigif lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {return}// 把s.SecureServing相关配置应用到genericConfig.SecureServing 并配置LoopbackClientConfig(具体看实现是否覆盖参数genericConfig.LoopbackClientConfig)if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {return}// 配置是否开启debug pprof和 争用debug pprof功能if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {return}// 将给定的 defaultAPIResourceConfig 与给定的 resourceConfigOverrides 合并。(合并原则是以defaultAPIResourceConfig为基础,以resourceConfigOverrides中的设置为目标)if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {return}// 将出口选择器设置EgressSelectorOptions中的设置项添加到服务器配置中if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {return}// 如果APIServerTracing对应在FeatureGate中设置为true(开启)if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {// 使用设置的追踪选项TracingOptions配置apiserver的跟踪配置if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {return}}// 包装定义以恢复禁用功能的任何更改 用来生成k8s的open api(类似于swagger)getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)// 使用DefaultOpenAPIConfig设置为OpenAPIConfig的默认值genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))// 设置OpenAPIConfig的标题为KubernetesgenericConfig.OpenAPIConfig.Info.Title = "Kubernetes"// 如果默认开启了OpenAPIV3功能if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"}// 判断是否是长运行请求(就是保持长期会话,比如watch动作,就需要持续监听)的方法genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(sets.NewString("watch", "proxy"),sets.NewString("attach", "exec", "proxy", "log", "portforward"),)// 版本信息(这里主要是git和go的版本信息)kubeVersion := version.Get()genericConfig.Version = &kubeVersionif genericConfig.EgressSelector != nil {// 配置EgressLookup -- 包含controlplane etcd cluster模式s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup}// 配置链路追踪if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider} else {s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()}if lastErr = s.Etcd.Complete(genericConfig.StorageObjectCountTracker, genericConfig.DrainedNotify(), genericConfig.AddPostStartHook); lastErr != nil {return}// 向c中添加etcd的健康检查并修改c中用于获取gr对应的RESTOptions的RESTOptionsGetter,覆盖s中StorageConfig.StorageObjectCountTracker属性storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfigstorageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()if lastErr != nil {return}if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {return}// Use protobufs for self-communication.// Since not every generic apiserver has to support protobufs, we// cannot default to it in generic apiserver and need to explicitly// set it in kube-apiserver.genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"// Disable compression for self-communication, since we are going to be// on a fast local networkgenericConfig.LoopbackClientConfig.DisableCompression = truekubeClientConfig := genericConfig.LoopbackClientConfigclientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)if err != nil {lastErr = fmt.Errorf("failed to create real external clientset: %v", err)return}versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if presentif lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {return}// 构建授权器和授权规则解析器genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)if err != nil {lastErr = fmt.Errorf("invalid authorization config: %v", err)return}// 如果授权mode中没有RABC模式if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {// 将"rbac/bootstrap-roles"插入到在不可用的PostStartHook集合中genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)}// 应用Audit日志审计功能到genericConfiglastErr = s.Audit.ApplyTo(genericConfig)if lastErr != nil {return}// 初始化准入插件所需的配置admissionConfig := &kubeapiserveradmission.Config{ExternalInformers:    versionedInformers,LoopbackClientConfig: genericConfig.LoopbackClientConfig,CloudConfigFile:      s.CloudProvider.CloudConfigFile,}// 根据enabledAggregatorRouting构建service解析器serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)// 获取准入所需的插件和开始挂钩pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)if err != nil {lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)return}// 将准入链选项添加到服务器配置中err = s.Admission.ApplyTo(genericConfig,versionedInformers,kubeClientConfig,utilfeature.DefaultFeatureGate,pluginInitializers...)if err != nil {lastErr = fmt.Errorf("failed to initialize admission: %v", err)return}// 如果APIPriorityAndFairness功能开启且 s.GenericServerRunOptions.EnablePriorityAndFairness = true(默认为true)if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)}// 构建 API 优先级和公平性过滤器的核心if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager()}return
}

3.3.1.2 构建apiextensionsapiserver.Config扩展配置 – 其实就是包装了通用apiserver配置和其他额外的配置

// 构建apiextensionsapiserver.Config扩展配置-- 其实就是包装了通用apiserver配置和其他额外的配置
func createAPIExtensionsConfig(kubeAPIServerConfig genericapiserver.Config,externalInformers kubeexternalinformers.SharedInformerFactory,pluginInitializers []admission.PluginInitializer,commandOptions *options.ServerRunOptions,masterCount int,serviceResolver webhook.ServiceResolver,authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
) (*apiextensionsapiserver.Config, error) {// 做一个浅拷贝让我们更改一些配置,大多数配置实际上保持不变。我们只需要处理一些与 apiextensions 的细节相关的配置genericConfig := kubeAPIServerConfiggenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}genericConfig.RESTOptionsGetter = nil// 使用备份(不能直接修改原始数据)的Etcd选项来做一些修改etcdOptions := *commandOptions.Etcd// 获取是否支持分页etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)// 构建编解码器  --  解码为v1beta或者v1,编码为internaletcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)// prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be storedetcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checksif err := etcdOptions.ApplyTo(&genericConfig); err != nil {return nil, err}// 使用 apiextensions 默认值和注册表覆盖 MergedResourceConfigif err := commandOptions.APIEnablement.ApplyTo(&genericConfig,apiextensionsapiserver.DefaultAPIResourceConfigSource(),apiextensionsapiserver.Scheme); err != nil {return nil, err}// 构建一个生成etcdOptions的工厂crdRESTOptionsGetter, err := apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions)if err != nil {return nil, err}apiextensionsConfig := &apiextensionsapiserver.Config{GenericConfig: &genericapiserver.RecommendedConfig{Config:                genericConfig,SharedInformerFactory: externalInformers,},ExtraConfig: apiextensionsapiserver.ExtraConfig{CRDRESTOptionsGetter: crdRESTOptionsGetter,MasterCount:          masterCount,AuthResolverWrapper:  authResolverWrapper,ServiceResolver:      serviceResolver,},}// 因为在之前的 CreateKubeAPIServerConfig 函数已经执行过了AddPostStartHook,所以我们需要清除 poststarthooks,这样我们就不会将它们多次(失败时)添加到所有服务器.apiextensionsConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}return apiextensionsConfig, nil
}

3.3.1.3 创建APIExtensionsServer

APIExtensionsServer最先初始化,在调用链的末尾, 处理CR、CRD相关资源,其中保护的controller功能如下:

  1. openapiController:将 crd 资源的变化同步至提供的 OpenAPI 文档,可通过访问 /openapi/v2 进行查看;
  2. crdController:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过 $ kubectl api-versions 和 $ kubectl api-resources 查看;
  3. namingController:检查 crd obj 中是否有命名冲突,可在 crd .status.conditions 中查看;
  4. establishingController:检查 crd 是否处于正常状态,可在 crd .status.conditions 中查看;
  5. nonStructuralSchemaController:检查 crd obj 结构是否正常,可在 crd .status.conditions 中查看;
  6. apiApprovalController:检查 crd 是否遵循 kubernetes API 声明策略,可在 crd .status.conditions 中查看;
  7. finalizingController:类似于 finalizes 的功能,与 CRs 的删除有关;
// 创建一个APIExtensionsServer,暴露group为"apiextensions.k8s.io"的api,支持crd等操作
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {return apiextensionsConfig.Complete().New(delegateAPIServer)
}//k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
//  New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {// 初始化 genericServergenericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)if err != nil {return nil, err}// 当 CRD informer server已经完全同步时,hasCRDInformerSyncedSignal就会关闭。它可确保在服务器尚未安装所有已知 HTTP 路径时对潜在自定义资源终结点的请求收到 503 错误,而不是 404hasCRDInformerSyncedSignal := make(chan struct{})if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {return nil, err}s := &CustomResourceDefinitions{GenericAPIServer: genericServer,}// 初始化apigroup, 即需要暴露的api,这里extension apiserver只注册了cr和crd相关的apiResourceConfig := c.GenericConfig.MergedResourceConfigapiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)storage := map[string]rest.Storage{}// customresourcedefinitionsif resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)if err != nil {return nil, err}storage[resource] = customResourceDefinitionStoragestorage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)}if len(storage) > 0 {apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage}// 注册apigroupif err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {return nil, err}// clientset创建crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)if err != nil {// it's really bad that this is leaking here, but until we can fix the test (which I'm pretty sure isn't even testing what it wants to test),// we need to be able to move forwardreturn nil, fmt.Errorf("failed to create clientset: %v", err)}s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)delegateHandler := delegationTarget.UnprotectedHandler()if delegateHandler == nil {delegateHandler = http.NotFoundHandler()}versionDiscoveryHandler := &versionDiscoveryHandler{discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},delegate:  delegateHandler,}groupDiscoveryHandler := &groupDiscoveryHandler{discovery: map[string]*discovery.APIGroupHandler{},delegate:  delegateHandler,}establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())crdHandler, err := NewCustomResourceDefinitionHandler(versionDiscoveryHandler,groupDiscoveryHandler,s.Informers.Apiextensions().V1().CustomResourceDefinitions(),delegateHandler,c.ExtraConfig.CRDRESTOptionsGetter,c.GenericConfig.AdmissionControl,establishingController,c.ExtraConfig.ServiceResolver,c.ExtraConfig.AuthResolverWrapper,c.ExtraConfig.MasterCount,s.GenericAPIServer.Authorizer,c.GenericConfig.RequestTimeout,time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,apiGroupInfo.StaticOpenAPISpec,c.GenericConfig.MaxRequestBodyBytes,)if err != nil {return nil, err}s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager)namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())finalizingController := finalizer.NewCRDFinalizer(s.Informers.Apiextensions().V1().CustomResourceDefinitions(),crdClient.ApiextensionsV1(),crdHandler,)// 将 informer 以及 controller加入到启动hook中s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {s.Informers.Start(context.StopCh)return nil})s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {// OpenAPIVersionedService 和 StaticOpenAPISpec 填充在通用 apiserver PrepareRun() 中。// 它们一起服务于通用 API 服务器上的 /openapi/v2  endpoint。  // 用 apiserver 可以选择不启用OpenAPI,方法是使用空的openAPIConfig,从而OpenAPIVersionedService// 和 StaticOpenAPISpec 都是空的。在这种情况下,我们不会运行CRD OpenAPI控制器。if s.GenericAPIServer.StaticOpenAPISpec != nil {if s.GenericAPIServer.OpenAPIVersionedService != nil {openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())go openapiController.Run(s.G enericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)}if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)}}go namingController.Run(context.StopCh)go establishingController.Run(context.StopCh)go nonStructuralSchemaController.Run(5, context.StopCh)go apiApprovalController.Run(5, context.StopCh)go finalizingController.Run(5, context.StopCh)discoverySyncedCh := make(chan struct{})go discoveryController.Run(context.StopCh, discoverySyncedCh)select {case <-context.StopCh:case <-discoverySyncedCh:}return nil})// 在我们可以处理所有已注册的 CRD 之前,我们不想报告健康检查状况。等到informer sync确保lister 在开始前都是有效的。// 启动后,可能还会有一段时间去添加CRDS。除非否则不会进行健康检查// we don't want to report healthy until we can handle all CRDs that have already been registered.  Waiting for the informer// to sync makes sure that the lister will be valid before we begin.  There may still be races for CRDs added after startup,// but we won't go healthy until we can handle the ones already present.s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {close(hasCRDInformerSyncedSignal)return true, nil}return false, nil}, context.StopCh)})return s, nil
}

3.3.1.4 创建kubeAPIServer及其初始化

KubeAPIServer 主要是提供对 API Resource 的操作请求,为 kubernetes 中众多 API 注册路由信息,暴露 RESTful API 并且对外提供 kubernetes service,使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。

KubeAPIServer初始化流程如下:

  1. 调用 c.GenericConfig.New 初始化 GenericAPIServer
  2. 判断是否支持 logs 相关的路由,如果支持,则添加 /logs 路由;
  3. 调用 m.InstallLegacyAPI 将核心 API Resource 添加到路由中,对应到 apiserver 就是以 /api 开头的 resource
  4. 调用 m.InstallAPIs 将扩展的 API Resource 添加到路由中,在 apiserver 中即是以 /apis 开头的 resource
// CreateKubeAPIServer创建kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)if err != nil {return nil, err}return kubeAPIServer, nil
}// New 从给定的配置返回一个新的 Master 实例。如果未设置,某些配置字段将设置为默认值。
// 必须指定某些配置字段,包括:KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")}s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)if err != nil {return nil, err}// 注册 logs 相关的路由if c.ExtraConfig.EnableLogsSupport {routes.Logs{}.Install(s.Handler.GoRestfulContainer)}// 元数据和key目前预计只会在重新启动后更改,// 因此,我们只需立即封送并提供缓存的 JSON 字节。s.md, err := serviceaccount.NewOpenIDMetadata(c.ExtraConfig.ServiceAccountIssuerURL,c.ExtraConfig.ServiceAccountJWKSURI,c.GenericConfig.ExternalAddress,c.ExtraConfig.ServiceAccountPublicKeys,)if err != nil {// 如果出现错误,请跳过安装endpoints 并记录错误,请继续。我们不会返回错误,因为// 元数据响应需要额外的向后进行不兼容命令行选项的验证。msg := fmt.Sprintf("Could not construct pre-rendered responses for"+" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+" enabled. Error: %v", err)if c.ExtraConfig.ServiceAccountIssuerURL != "" {// The user likely expects this feature to be enabled if issuer URL is// set and the feature gate is enabled. In the future, if there is no// longer a feature gate and issuer URL is not set, the user may not// expect this feature to be enabled. We log the former case as an Error// and the latter case as an Info.klog.Error(msg)} else {klog.Info(msg)}} else {routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).Install(s.Handler.GoRestfulContainer)}m := &Instance{GenericAPIServer:          s,ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,}// 安装 LegacyAPIif err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {return nil, err}clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)if err != nil {return nil, err}// TODO: update to a version that caches success but will recheck on failure, unlike memcache discoverydiscoveryClientForAdmissionRegistration := clientset.Discovery()// The order here is preserved in discovery.// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go// with specific priorities.// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery// handlers that we have.restStorageProviders := []RESTStorageProvider{apiserverinternalrest.StorageProvider{},authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},autoscalingrest.RESTStorageProvider{},batchrest.RESTStorageProvider{},certificatesrest.RESTStorageProvider{},coordinationrest.RESTStorageProvider{},discoveryrest.StorageProvider{},networkingrest.RESTStorageProvider{},noderest.RESTStorageProvider{},policyrest.RESTStorageProvider{},rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},schedulingrest.RESTStorageProvider{},storagerest.RESTStorageProvider{},flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.// See https://github.com/kubernetes/kubernetes/issues/42392appsrest.StorageProvider{},admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},resourcerest.RESTStorageProvider{},}// 只能APIif err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {return nil, err}m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err != nil {return err}controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)// generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver// TODO: See if we can pass ctx to the current methodctx, cancel := context.WithCancel(context.Background())go func() {select {case <-hookContext.StopCh:cancel() // stopCh closed, so cancel our contextcase <-ctx.Done():}}()// prime values and start listenersif m.ClusterAuthenticationInfo.ClientCA != nil {m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {// runonce to be sure that we have a value.if err := controller.RunOnce(ctx); err != nil {runtime.HandleError(err)}go controller.Run(ctx, 1)}}if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {// runonce to be sure that we have a value.if err := controller.RunOnce(ctx); err != nil {runtime.HandleError(err)}go controller.Run(ctx, 1)}}go controller.Run(ctx, 1)return nil})if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err != nil {return err}leaseName := m.GenericAPIServer.APIServerIDholderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())controller := lease.NewController(clock.RealClock{},kubeClient,holderIdentity,int32(IdentityLeaseDurationSeconds),nil,IdentityLeaseRenewIntervalPeriod,leaseName,metav1.NamespaceSystem,labelAPIServerHeartbeat)go controller.Run(hookContext.StopCh)return nil})m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err != nil {return err}go apiserverleasegc.NewAPIServerLeaseGC(kubeClient,IdentityLeaseGCPeriod,metav1.NamespaceSystem,KubeAPIServerIdentityLeaseLabelSelector,).Run(hookContext.StopCh)return nil})}m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err != nil {return err}go legacytokentracking.NewController(kubeClient).Run(hookContext.StopCh)return nil})return m, nil
}
3.3.1.4.1 m.InstallLegacyAPI

功能: 将 core API 注册到路由中,是 apiserver 初始化流程中最核心的方法之一,其将 API 注册到路由其最终的目的就是对外提供 RESTful API 来操作对应 resource,注册 API 主要分为两步,第一步是为 API 中的每个 resource 初始化 RESTStorage 以此操作后端存储中数据的变更,第二步是为每个 resource 根据其 verbs 构建对应的路由。m.InstallLegacyAPI 的主要逻辑为:

  • 1、调用 legacyRESTStorageProvider.NewLegacyRESTStorage 为 LegacyAPI 中各个资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来;
  • 2、初始化 bootstrap-controller,并将其加入到 PostStartHook 中,bootstrap-controller 是 apiserver 中的一个 controller,主要功能是创建系统所需要的一些 namespace 以及创建 kubernetes service 并定期触发对应的 sync 操作,apiserver 在启动后会通过调用 PostStartHook 来启动 bootstrap-controller
  • 3、在为资源创建完 RESTStorage 后,调用 m.GenericAPIServer.InstallLegacyAPIGroup 为 APIGroup 注册路由信息,InstallLegacyAPIGroup方法的调用链非常深,主要为InstallLegacyAPIGroup--> installAPIResources --> InstallREST --> Install --> registerResourceHandlers,最终核心的路由构造在registerResourceHandlers方法内,该方法比较复杂,其主要功能是通过上一步骤构造的 REST Storage 判断该资源可以执行哪些操作(如 create、update等),将其对应的操作存入到 action 中,每一个 action 对应一个标准的 REST 操作,如 create 对应的 action 操作为 POST、update 对应的 action 操作为PUT。最终根据 actions 数组依次遍历,对每一个操作添加一个 handler 方法,注册到 route 中去,再将 route 注册到 webservice 中去,webservice 最终会注册到 container 中,遵循 go-restful 的设计模式;
    func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{StorageFactory:              c.ExtraConfig.StorageFactory,ProxyTransport:              c.ExtraConfig.ProxyTransport,KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,EventTTL:                    c.ExtraConfig.EventTTL,ServiceIPRange:              c.ExtraConfig.ServiceIPRange,SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,ExtendExpiration:            c.ExtraConfig.ExtendExpiration,ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,APIAudiences:                c.GenericConfig.Authentication.APIAudiences,}legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)if err != nil {return fmt.Errorf("error building core storage: %v", err)}if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 { // if all core storage is disabled, return.return nil}controllerName := "bootstrap-controller"client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client)if err != nil {return fmt.Errorf("error creating bootstrap controller: %v", err)}m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {return fmt.Errorf("error in registering group versions: %v", err)}return nil
    }
    
3.3.1.4.2 m.InstallAPIs

installAPIS和InstallLegacyAPI的调用流程类似

3.3.1.5 创建AggregatorConfig

func createAggregatorConfig(kubeAPIServerConfig genericapiserver.Config,commandOptions *options.ServerRunOptions,externalInformers kubeexternalinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,proxyTransport *http.Transport,pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {// 浅拷贝kubeAPIServerConfig,修改其中部分配置genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}genericConfig.RESTOptionsGetter = nil// 阻止通用 API 服务器安装 OpenAPI 处理程序。Aggregator server有自己的自定义 OpenAPI 处理程序。genericConfig.SkipOpenAPIInstallation = trueif utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {// Add StorageVersionPrecondition handler to aggregator-apiserver.// The handler will block write requests to built-in resources until the// target resources' storage versions are up-to-date.genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition}// 拷贝etcd参数etcdOptions := *commandOptions.EtcdetcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIListChunking)etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checksif err := etcdOptions.ApplyTo(&genericConfig); err != nil {return nil, err}// 使用aggregator默认值和注册表覆盖MergedResourceConfig配置if err := commandOptions.APIEnablement.ApplyTo(&genericConfig,aggregatorapiserver.DefaultAPIResourceConfigSource(),aggregatorscheme.Scheme); err != nil {return nil, err}// etcd参数配置aggregatorConfig := &aggregatorapiserver.Config{GenericConfig: &genericapiserver.RecommendedConfig{Config:                genericConfig,SharedInformerFactory: externalInformers,},ExtraConfig: aggregatorapiserver.ExtraConfig{ProxyClientCertFile:       commandOptions.ProxyClientCertFile,ProxyClientKeyFile:        commandOptions.ProxyClientKeyFile,ServiceResolver:           serviceResolver,ProxyTransport:            proxyTransport,RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,},}// 需要清除 poststarthook,这样我们就不会将它们多次添加到所有server(失败后)aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}return aggregatorConfig, nil
}

3.3.1.6 创建 AggregatorServer

Aggregator通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发形式。Aggregator包括一个GenericAPIServer和维护自身状态的Controller。其中 GenericAPIServer主要处理apiregistration.k8s.io组下的APIService资源请求。

主要实现逻辑:

  1. 调用 aggregatorConfig.Complete().NewWithDelegate 创建 aggregatorServer;
  2. 初始化 crdRegistrationControllerautoRegistrationControllercrdRegistrationController 负责注册 CRD,autoRegistrationController 负责将 CRD 对应的 APIServices 自动注册到 apiserver 中,CRD 创建后可通过 $ kubectl get apiservices 查看是否注册到 apiservices 中;
  3. autoRegistrationControllercrdRegistrationController 加入到 PostStartHook 中

Aggregator除了处理资源请求外还包含几个controller:

  1. apiserviceRegistrationController:负责APIServices中资源的注册与删除;
  2. availableConditionController:维护APIServices的可用状态,包括其引用Service是否可用等;
  3. autoRegistrationController:用于保持API中存在的一组特定的APIServices
  4. crdRegistrationController:负责将CRD GroupVersions自动注册到APIServices中;
  5. openAPIAggregationController:将APIServices资源的变化同步至提供的OpenAPI文档; kubernetes中的一些附加组件,比如metrics-server就是通过 Aggregator的方式进行扩展的,实际环境中可以通过使用apiserver-builder工具轻松以Aggregator的扩展方式创建自定义资源。
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {// 初始化aggregatorServeraggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)if err != nil {return nil, err}// 创建并初始化 auto-registrationapiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)if err != nil {return nil, err}autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)crdRegistrationController := crdregistration.NewCRDRegistrationController(apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),autoRegistrationController)// Imbue all builtin group-priorities onto the aggregated discoveryif aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {for gv, entry := range apiVersionPriorities {aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))}}err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {go crdRegistrationController.Run(5, context.StopCh)go func() {// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.// we only need to do this if CRDs are enabled on this server.  We can't use discovery because we are the source for discovery.if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) {crdRegistrationController.WaitForInitialSync()}autoRegistrationController.Run(5, context.StopCh)}()return nil})if err != nil {return nil, err}// 添加健康检查err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(makeAPIServiceAvailableHealthCheck("autoregister-completion",apiServices,aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),),)if err != nil {return nil, err}return aggregatorServer, nil
}
3.3.1.6.1 aggregatorConfig.Complete().NewWithDelegate

aggregatorConfig.Complete().NewWithDelegate 是初始化 aggregatorServer 的方法,主要逻辑为:

  1. 调用 c.GenericConfig.New 初始化 GenericAPIServer;

  2. 调用 apiservicerest.NewRESTStorage 为 APIServices 资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来;

  3. 调用 s.GenericAPIServer.InstallAPIGroup 为 APIGroup 注册路由信息;

// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {// 初始化genericServergenericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)if err != nil {return nil, err}apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)if err != nil {return nil, err}informerFactory := informers.NewSharedInformerFactory(apiregistrationClient,5*time.Minute, // this is effectively used as a refresh interval right now.  Might want to do something nicer later on.)// apiServiceRegistrationControllerInitiated 在 APIServiceRegistrationController 完成“installing”所有已知 API 服务时关闭。// 此时我们知道代理proxy APIServices 可以处理客户端请求。在它可能导致 404 响应出现之前,这可能会对某些控制器(如 GC 和 NS)产生严重后果// APIServiceRegistrationController 在执行其工作之前会等待 APIServiceInformer 同步。apiServiceRegistrationControllerInitiated := make(chan struct{})if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {return nil, err}s := &APIAggregator{GenericAPIServer:           genericServer,delegateHandler:            delegationTarget.UnprotectedHandler(),proxyTransport:             c.ExtraConfig.ProxyTransport,proxyHandlers:              map[string]*proxyHandler{},handledGroups:              sets.String{},lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),APIRegistrationInformers:   informerFactory,serviceResolver:            c.ExtraConfig.ServiceResolver,openAPIConfig:              c.GenericConfig.OpenAPIConfig,openAPIV3Config:            c.GenericConfig.OpenAPIV3Config,egressSelector:             c.GenericConfig.EgressSelector,proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },rejectForwardingRedirects:  c.ExtraConfig.RejectForwardingRedirects,}// 稍后过滤已过期的资源resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)if err != nil {return nil, err}// 为API注册路由apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {return nil, err}enabledVersions := sets.NewString()for v := range apiGroupInfo.VersionedResourcesStorageMap {enabledVersions.Insert(v)}if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {return nil, fmt.Errorf("API group/version %s must be enabled", v1.SchemeGroupVersion.String())}// 初始化 apiserviceRegistrationController、availableControllerapisHandler := &apisHandler{codecs:         aggregatorscheme.Codecs,lister:         s.lister,discoveryGroup: discoveryGroup(enabledVersions),}if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager)s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport)} else {s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)}s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)if len(c.ExtraConfig.ProxyClientCertFile) > 0 && len(c.ExtraConfig.ProxyClientKeyFile) > 0 {aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)if err != nil {return nil, err}// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the// context is not used at all. So passing a empty context shouldn't be a problemctx := context.TODO()if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {return nil, err}aggregatorProxyCerts.AddListener(apiserviceRegistrationController)s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContents.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {// generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver// TODO: See if we can pass ctx to the current methodctx, cancel := context.WithCancel(context.Background())go func() {select {case <-postStartHookContext.StopCh:cancel() // stopCh closed, so cancel our contextcase <-ctx.Done():}}()go aggregatorProxyCerts.Run(ctx, 1)return nil})}availableController, err := statuscontrollers.NewAvailableConditionController(informerFactory.Apiregistration().V1().APIServices(),c.GenericConfig.SharedInformerFactory.Core().V1().Services(),c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),apiregistrationClient.ApiregistrationV1(),c.ExtraConfig.ProxyTransport,(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),s.serviceResolver,c.GenericConfig.EgressSelector,)if err != nil {return nil, err}// 添加 PostStartHooks.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {informerFactory.Start(context.StopCh)c.GenericConfig.SharedInformerFactory.Start(context.StopCh)return nil})s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)select {case <-context.StopCh:case <-apiServiceRegistrationControllerInitiated:}return nil})s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {// if we end up blocking for long periods of time, we may need to increase workers.go availableController.Run(5, context.StopCh)return nil})if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {// Spawn a goroutine in aggregator apiserver to update storage version for// all built-in resourcess.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {// Wait for apiserver-identity to exist first before updating storage// versions, to avoid storage version GC accidentally garbage-collecting// storage versions.kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)if err != nil {return err}if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})if apierrors.IsNotFound(err) {return false, nil}if err != nil {return false, err}return true, nil}, hookContext.StopCh); err != nil {return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",s.GenericAPIServer.APIServerID, err)}// Technically an apiserver only needs to update storage version once during bootstrap.// Reconcile StorageVersion objects every 10 minutes will help in the case that the// StorageVersion objects get accidentally modified/deleted by a different agent. In that// case, the reconciliation ensures future storage migration still works. If nothing gets// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,// therefore won't change the resource version and trigger storage migration.go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)// share the same generic apiserver config. The same StorageVersion manager is used// to register all built-in resources when the generic apiservers install APIs.s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)return false, nil}, hookContext.StopCh)// Once the storage version updater finishes the first round of update,// the PostStartHook will return to unblock /healthz. The handler chain// won't block write requests anymore. Check every second since it's not// expensive.wait.PollImmediateUntil(1*time.Second, func() (bool, error) {return s.GenericAPIServer.StorageVersionManager.Completed(), nil}, hookContext.StopCh)return nil})}return s, nil
}

基本调用链如下:

                    |--> CreateNodeDialer||--> CreateKubeAPIServerConfig|
CreateServerChain --|--> createAPIExtensionsConfig||                                                                       |--> c.GenericConfig.New|--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --||                                                                       |--> s.GenericAPIServer.InstallAPIGroup||                                                                 |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage|                                                                 ||--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI|                                                                 ||                                                                 |--> m.InstallAPIs|||--> createAggregatorConfig||                                                                             |--> c.GenericConfig.New|                                                                             ||--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage||--> s.GenericAPIServer.InstallAPIGroup

3.3.2 prepared.Run

Run 方法中首先调用 CreateServerChain 完成各 server 的初始化,然后调用 server.PrepareRun 完成服务启动前的准备工作,最后调用 prepared.Run 方法来启动安全的 http server。server.PrepareRun 主要完成了健康检查、存活检查和OpenAPI路由的注册工作

// PrepareRun 通过设置 OpenAPI spec和aggregated discovery document  来准备aggregator 运行。
聚合发现文档并调用通用 PrepareRun。 
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {// add post start hook before generic PrepareRun in order to be before /healthz installationif s.openAPIConfig != nil {s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {go s.openAPIAggregationController.Run(context.StopCh)return nil})}if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {go s.openAPIV3AggregationController.Run(context.StopCh)return nil})}if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {s.discoveryAggregationController = NewDiscoveryManager(s.GenericAPIServer.AggregatedDiscoveryGroupManager,)// 启动discovery  端点s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {// Run discovery manager's worker to watch for new/removed/updated// APIServices to the discovery document can be updated at runtimego s.discoveryAggregationController.Run(context.StopCh)return nil})}prepared := s.GenericAPIServer.PrepareRun()// 延迟OpenAPI直到delegate启动他们的handlers启动后再启动if s.openAPIConfig != nil {specDownloader := openapiaggregator.NewDownloader()openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(&specDownloader,s.GenericAPIServer.NextDelegate(),s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),s.openAPIConfig,s.GenericAPIServer.Handler.NonGoRestfulMux)if err != nil {return preparedAPIAggregator{}, err}s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)}if s.openAPIV3Config != nil && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {specDownloaderV3 := openapiv3aggregator.NewDownloader()openAPIV3Aggregator, err := openapiv3aggregator.BuildAndRegisterAggregator(specDownloaderV3,s.GenericAPIServer.NextDelegate(),s.GenericAPIServer.Handler.NonGoRestfulMux)if err != nil {return preparedAPIAggregator{}, err}s.openAPIV3AggregationController = openapiv3controller.NewAggregationController(openAPIV3Aggregator)}return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {return s.runnable.Run(stopCh)
}

相关文章:

kube-apiserver启动流程源码分析

1. 概述 KubeAPIServer 主要是提供对 API Resource 的操作请求&#xff0c;为 kubernetes 中众多 API 注册路由信息&#xff0c;暴露 RESTful API 并且对外提供 kubernetes service&#xff0c;使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。 2…...

Scala基础(二)

单例对象&#xff08;object&#xff09; Scala的类中无法定义静态成员&#xff0c;即无static关键字。如何像Java一样表达类的静态成员变量、成员方法与静态代码块&#xff1f; Scala解决方案&#xff1a;单例对象 使用“object”关键字声明&#xff0c;可包含变量、方法与…...

Python 生产者消费者模型是什么?

本文首发自「慕课网」&#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注&#xff01; 作者| 慕课网精英讲师 朱广蔚 1. 简介 生产者和消费者问题是线程模型中的经典问题&#xff1a; 生产者和消费者共享同一个存储空间生产者往存储空间中添…...

手机银行评测系列:北京银行“京彩生活”7.0从用户视角出发,实现沉浸式体验重塑

易观&#xff1a;2023年3月28日&#xff0c;北京银行发布“京彩生活”APP 7.0版本&#xff0c;从旅程再造、特色金融、场景生态、平台联动、协同经营、体验管理和安全守护七大方面全面升级&#xff0c;从用户视角出发&#xff0c;重塑用户旅程&#xff0c;简化操作流程&#xf…...

ZJYC2023 浙江省大学生程序设计竞赛校内选拔赛部分题解 C J B L

ZJYC2023 浙江省大学生程序设计竞赛校内选拔赛部分题解 C J B L 难度分布&#xff1a; 签到&#xff1a;CJ Easy&#xff1a;BL Midium&#xff1a;IAGKFE Hard&#xff1a;DH 题解&#xff1a; 签到&#xff1a;CJ C - ^{-1} 参考代码&#xff1a; #include<bits/std…...

百科创建:7种有效的百科词条创建技巧

百科词条是互联网上最常见的知识信息资源之一&#xff0c;它们是人们查找信息的主要途径之一。创建一个高质量的百科词条并不是一件容易的事情&#xff0c;需要一些技巧和经验才能做到。下面是一些创建百科词条的技巧&#xff1a; 一、确保词条的独特性 在创建百科词条之前&…...

ThreeJS-dat.gui界面控制颜色、隐藏、位置(六)

下载组件dat.gui npm install dat.gui -S 引入组件 import * as dat from dat.gui //界面控制 代码&#xff1a; <template> <div id"three_div"> </div> </template> <script> import * as THREE from "three"; import {O…...

接口自动化测试,完整入门篇

目录 1. 什么是接口测试2. 基本流程3. 需求分析4. 用例设计5. 脚本开发6. 结果分析7. 完整脚本8. 参考资料1. 什么是接口测试 顾名思义&#xff0c;接口测试是对系统或组件之间的接口进行测试&#xff0c;主要是校验数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及…...

利用ControlNet重新定义你的AI姿势

利用ControlNet重新定义你的AI姿势 前段时间给大家分享了如何利用colab实现AI绘画自由&#xff0c;现在Stable Diffusion WebUI Colab TW又更新了不少新功能。最重要的是可以通过谷歌硬盘的快捷方式导入模型&#xff0c;极大的节省了谷歌硬盘容量。 众所周知&#xff0c;谷歌…...

中医药NER命名实体识别基于SPANNER方式

一个不知名大学生&#xff0c;江湖人称菜狗 original author: Jacky Li Email : 3435673055qq.com Time of completion&#xff1a;2023.3.5 Last edited: 2023.3.5 导读 本文使用SPANNER方式实现对中医药进行实体识别&#xff0c;采用focal loss 进行优化。 本文章作用防止安静…...

Vue必掌握

目录 一、组件通信方式 二、v-if和v-for 三、生命周期 1、描述 2、setup和created谁先执行 3、setup中为什么没有beforeCreate和created 四、双向绑定 v-model 1、定义 2、本质&#xff0c;原理 3、好处 五、如何扩展一个组件 1、mixins 缺点 2、slot插槽 3、e…...

SSM部分

声明式事务 从之前的事务控制的代码中可以看出&#xff0c;是有规律可循&#xff0c;代码的结构基本是确定的&#xff0c;所以框架就可以将固定模式的代码抽取出来&#xff0c;进行相关的封装。 封装起来后&#xff0c;我们只需要在配置文件中进行简单的配置即可完成操作。 …...

【Springboot系列】Springboot接管所有Controller,magic-api源码阅读

系列文章地址:Spring Boot学习大纲,可以留言自己想了解的技术点 最近在项目中使用了一个第三方的包 magic-api,节省了很多的时间,整体来说就是只用写sql就好了,不用写service,controller那些,全部统一处理了。 具体的使用大家可以搜索下,网上到处都是,建议去官网看。…...

二、LED子系统数据结构详解

个人主页&#xff1a;董哥聊技术我是董哥&#xff0c;嵌入式领域新星创作者创作理念&#xff1a;专注分享高质量嵌入式文章&#xff0c;让大家读有所得&#xff01;文章目录1、核心数据结构1.1 gpio_led_platform_data1.2 gpio_leds_priv1.3 gpio_led1.4 gpio_led_data1.5 led_…...

Kubernetes(11):数据存储详解

在前面已经提到,容器的生命周期可能很短,会被频繁地创建和销毁。那么容器在销毁时,保存在容器中的数据也会被清除。这种结果对用户来说,在某些情况下是不乐意看到的。为了持久化保存容器的数据,kubernetes引入了Volume的概念。 Volume是Pod中能够被多个容器访问的共享目录…...

随想录Day43--动态规划: 1049. 最后一块石头的重量 II , 494. 目标和 , 474.一和零

最后一块石头重量转化为将一个集合分隔成两个集合&#xff0c;两个集合之间的差值最小&#xff0c;就是最后剩下最小的石头重量。这里可以求集合的一个平均值&#xff0c;如果正好等于平均值&#xff0c;说明可以抵消&#xff0c;这时候重量为0&#xff0c;如果不行&#xff0c…...

Qt中对TCP粘包的处理

当时用TCP协议传输数据时&#xff0c;经常出现粘包的现象 当服务器向客户端发送数据之后&#xff0c;客户端还没有接收数据的时候&#xff0c;这段时间数据在什么地方&#xff1f; 1、服务器&#xff1f;服务器已经发出数据了 2、网线&#xff1f;数据应该在内存&#xff0c;怎…...

贪心-单调递增的数字

当且仅当每个相邻位数上的数字 x 和 y 满足 x < y 时&#xff0c;我们称这个整数是单调递增的。 给定一个整数 n &#xff0c;返回 小于或等于 n 的最大数字&#xff0c;且数字呈 单调递增 。 示例 1: 输入: n 10 输出: 9示例 2: 输入: n 1234 输出: 1234示例 3: 输入…...

你真的会用搜索引擎吗?

作为一名在校大学生&#xff0c;对于搜索资料这一件事深有体会&#xff0c;特别是在期末考试突击的时候&#xff0c;如何利用搜索引擎&#xff0c;快速找到自己想要的知识&#xff0c;快速理解这个知识点&#xff0c;想必是每位大学生的必备技能了。 我们在学习一个知识点的过…...

KDCJ-20kV冲击耐压测试仪

一、产品简介 KDCJ-20kV冲击耐压测试仪是电力设备高压试验的基本项目之一&#xff0c;电力设备在设计、制造及修缮之后都要求进行冲击试验以验证或检验。因此&#xff0c;冲击电压试验设备有着广泛的应用&#xff0c;在工厂、研究机构及大专院校的高压试验室中都可以看到不同规…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏

一、引言 在深度学习中&#xff0c;我们训练出的神经网络往往非常庞大&#xff08;比如像 ResNet、YOLOv8、Vision Transformer&#xff09;&#xff0c;虽然精度很高&#xff0c;但“太重”了&#xff0c;运行起来很慢&#xff0c;占用内存大&#xff0c;不适合部署到手机、摄…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)

第一篇&#xff1a;Liunx环境下搭建PaddlePaddle 3.0基础环境&#xff08;Liunx Centos8.5安装Python3.10pip3.10&#xff09; 一&#xff1a;前言二&#xff1a;安装编译依赖二&#xff1a;安装Python3.10三&#xff1a;安装PIP3.10四&#xff1a;安装Paddlepaddle基础框架4.1…...