当前位置: 首页 > news >正文

Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现

本文基于Istio 1.18.0版本进行源码学习

1、Pilot-Discovery工作原理

Pilot-Discovery是Istio控制面的核心,负责服务网格中的流量管理以及控制面和数据面之间的配置下发

Pilot-Discovery从注册中心(如Kubernetes)获取服务信息并汇集,从Kubernetes API Server中获取配置规则,将服务信息和配置数据转换为xDS接口的标准数据结构,通过GRPC下发到数据面的Envoy

2、Pilot-Discovery代码结构

在这里插入图片描述

Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中创建了Pilot Server,Pilot Server中主要包含三部分逻辑:

  • ConfigController:管理各种配置数据,包括用户创建的流量管理规则和策略
  • ServiceController:获取Service Registry中的服务发现数据
  • DiscoveryService:主要包含下述逻辑:
    • 启动GRPC Server并接收来自Envoy端的连接请求
    • 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
    • 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy

3、Pilot-Discovery启动流程

创建Pilot Server代码如下:

// pilot/pkg/bootstrap/server.go
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {e := model.NewEnvironment()e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffixe.SetLedger(buildLedger(args.RegistryOptions))ac := aggregate.NewController(aggregate.Options{MeshHolder: e,})e.ServiceDiscovery = acs := &Server{clusterID:               getClusterID(args),environment:             e,fileWatcher:             filewatcher.NewWatcher(),httpMux:                 http.NewServeMux(),monitoringMux:           http.NewServeMux(),readinessProbes:         make(map[string]readinessProbe),readinessFlags:          &readinessFlags{},workloadTrustBundle:     tb.NewTrustBundle(nil),server:                  server.New(),shutdownDuration:        args.ShutdownDuration,internalStop:            make(chan struct{}),istiodCertBundleWatcher: keycertbundle.NewWatcher(),webhookInfo:             &webhookInfo{},}// Apply custom initialization functions.for _, fn := range initFuncs {fn(s)}// Initialize workload Trust Bundle before XDS Servere.TrustBundle = s.workloadTrustBundle// 初始化discoveryServers.XDSServer = xds.NewDiscoveryServer(e, args.PodName, s.clusterID, args.RegistryOptions.KubeOptions.ClusterAliases)prometheus.EnableHandlingTimeHistogram()// make sure we have a readiness probe before serving HTTP to avoid marking ready too soons.initReadinessProbes()// 初始化http和grpc server,向grpc server注册discoveryServers.initServers(args)if err := s.initIstiodAdminServer(args, s.webhookInfo.GetTemplates); err != nil {return nil, fmt.Errorf("error initializing debug server: %v", err)}if err := s.serveHTTP(); err != nil {return nil, fmt.Errorf("error serving http: %v", err)}// Apply the arguments to the configuration.if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err)}// used for both initKubeRegistry and initClusterRegistriesargs.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)s.initMeshConfiguration(args, s.fileWatcher)spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())s.initMeshNetworks(args, s.fileWatcher)s.initMeshHandlers()s.environment.Init()if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {return nil, err}// Options based on the current 'defaults' in istio.caOpts := &caOptions{TrustDomain:      s.environment.Mesh().TrustDomain,Namespace:        args.Namespace,DiscoveryFilter:  args.RegistryOptions.KubeOptions.GetFilter(),ExternalCAType:   ra.CaExternalType(externalCaType),CertSignerDomain: features.CertSignerDomain,}if caOpts.ExternalCAType == ra.ExtCAK8s {// Older environment variable preserved for backward compatibilitycaOpts.ExternalCASigner = k8sSigner}// CA signing certificate must be created first if needed.if err := s.maybeCreateCA(caOpts); err != nil {return nil, err}// 初始化configController和serviceControllerif err := s.initControllers(args); err != nil {return nil, err}s.XDSServer.InitGenerators(e, args.Namespace, s.internalDebugMux)// Initialize workloadTrustBundle after CA has been initializedif err := s.initWorkloadTrustBundle(args); err != nil {return nil, err}// Parse and validate Istiod Address.istiodHost, _, err := e.GetDiscoveryAddress()if err != nil {return nil, err}// Create Istiod certs and setup watches.if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {return nil, err}// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.if err := s.initSecureDiscoveryService(args); err != nil {return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)}// common https server for webhooks (e.g. injection, validation)if s.kubeClient != nil {s.initSecureWebhookServer(args)wh, err := s.initSidecarInjector(args)if err != nil {return nil, fmt.Errorf("error initializing sidecar injector: %v", err)}s.webhookInfo.mu.Lock()s.webhookInfo.wh = whs.webhookInfo.mu.Unlock()if err := s.initConfigValidation(args); err != nil {return nil, fmt.Errorf("error initializing config validator: %v", err)}}// This should be called only after controllers are initialized.// 向configController和serviceController注册事件回调函数s.initRegistryEventHandlers()// 设置discoveryServer启动函数s.initDiscoveryService()// Notice that the order of authenticators matters, since at runtime// authenticators are activated sequentially and the first successful attempt// is used as the authentication result.authenticators := []security.Authenticator{&authenticate.ClientCertAuthenticator{},}if args.JwtRule != "" {jwtAuthn, err := initOIDC(args)if err != nil {return nil, fmt.Errorf("error initializing OIDC: %v", err)}if jwtAuthn == nil {return nil, fmt.Errorf("JWT authenticator is nil")}authenticators = append(authenticators, jwtAuthn)}// The k8s JWT authenticator requires the multicluster registry to be initialized,// so we build it later.if s.kubeClient != nil {authenticators = append(authenticators,kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient.Kube(), s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))}if len(features.TrustedGatewayCIDR) > 0 {authenticators = append(authenticators, &authenticate.XfccAuthenticator{})}if features.XDSAuth {s.XDSServer.Authenticators = authenticators}caOpts.Authenticators = authenticators// Start CA or RA server. This should be called after CA and Istiod certs have been created.s.startCA(caOpts)// TODO: don't run this if galley is started, one ctlz is enoughif args.CtrlZOptions != nil {_, _ = ctrlz.Run(args.CtrlZOptions, nil)}// This must be last, otherwise we will not know which informers to registerif s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil})}return s, nil
}

NewServer()方法中核心逻辑如下:

  1. 初始化DiscoveryServer
  2. 初始化HTTP和GRPC Server,向GRPC Server注册DiscoveryServer
  3. 初始化ConfigController和ServiceController
  4. 向ConfigController和ServiceController注册事件回调函数,有配置和服务信息变更时会通知DiscoveryServer
  5. 设置DiscoveryServer启动函数

Pilot Server定义如下:

// pilot/pkg/bootstrap/server.go
type Server struct {// discoveryServerXDSServer *xds.DiscoveryServerclusterID cluster.ID// pilot环境所需的api集合environment *model.Environment// 处理kubernetes主集群的注册中心kubeClient kubelib.Client// 处理kubernetes多个集群的注册中心multiclusterController *multicluster.Controller// 统一处理配置规则的controllerconfigController model.ConfigStoreController// 配置规则缓存ConfigStores []model.ConfigStoreController// 负责serviceEntry的服务发现serviceEntryController *serviceentry.ControllerhttpServer  *http.Server // debug, monitoring and readiness Server.httpAddr    stringhttpsServer *http.Server // webhooks HTTPS Server.grpcServer        *grpc.ServergrpcAddress       stringsecureGrpcServer  *grpc.ServersecureGrpcAddress string// monitoringMux listens on monitoringAddr(:15014).// Currently runs prometheus monitoring and debug (if enabled).monitoringMux *http.ServeMux// internalDebugMux is a mux for *internal* calls to the debug interface. That is, authentication is disabled.internalDebugMux *http.ServeMux// httpMux listens on the httpAddr (8080).// If a Gateway is used in front and https is off it is also multiplexing// the rest of the features if their port is empty.// Currently runs readiness and debug (if enabled)httpMux *http.ServeMux// httpsMux listens on the httpsAddr(15017), handling webhooks// If the address os empty, the webhooks will be set on the default httpPort.httpsMux *http.ServeMux // webhooks// fileWatcher used to watch mesh config, networks and certificates.fileWatcher filewatcher.FileWatcher// certWatcher watches the certificates for changes and triggers a notification to Istiod.cacertsWatcher *fsnotify.WatcherdnsNames       []stringCA *ca.IstioCARA ra.RegistrationAuthority// TrustAnchors for workload to workload mTLSworkloadTrustBundle     *tb.TrustBundlecertMu                  sync.RWMutexistiodCert              *tls.CertificateistiodCertBundleWatcher *keycertbundle.Watcher// pilot的所有组件都注册启动任务到此对象,便于在Start()方法中批量启动及管理server server.InstancereadinessProbes map[string]readinessProbereadinessFlags  *readinessFlags// duration used for graceful shutdown.shutdownDuration time.Duration// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in// favor of AddStartFunc. This is only required if we *must* start something outside of this process.// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything// in AddStartFuncinternalStop chan struct{}webhookInfo *webhookInfostatusReporter *distribution.ReporterstatusManager  *status.Manager// RWConfigStore is the configstore which allows updates, particularly for status.RWConfigStore model.ConfigStoreController
}

Pilot-Discovery启动流程如下图:

在这里插入图片描述

3、配置规则发现:ConfigController

Pilot的配置规则指网络路由规则及网络安全规则,包含Virtualservice、Destinationrule、Gateway、PeerAuthentication、RequestAuthentication等资源。目前支持三种类型的ConfigController:

  • MCP:是一种服务网格配置传输协议,用于隔离Pilot和底层平台(Kubernetes、文件系统或者其他注册中心),使得Pilot无需感知底层平台的差异,更专注于Envoy xDS配置的生成与分发
  • Kubernetes:基于Kubernetes的Config发现利用了Kubernetes Informer的List-Watch能力。在Kubernetes集群中,Config以CustomResource的形式存在。Pilot通过配置控制器(CRD Controller)监听Kubernetes APIServer配置规则资源,维护所有资源的缓存,并触发事件处理回调函数
  • File:通过文件监视器周期性地读取本地配置文件,将配置规则缓存在内存中,并维护配置的增加、更新、删除事件,当缓存有变化时,异步通知内存控制器执行事件回调函数

1)、ConfigController的核心接口

ConfigController实现了ConfigStoreController接口:

// pilot/pkg/model/config.go
type ConfigStoreController interface {// 配置缓存接口ConfigStore// 注册事件处理函数// RegisterEventHandler adds a handler to receive config update events for a// configuration typeRegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)// 运行控制器// Run until a signal is received.// Run *should* block, so callers should typically call `go controller.Run(stop)`Run(stop <-chan struct{})// 配置缓存是否已同步// HasSynced returns true after initial cache synchronization is completeHasSynced() bool
}

ConfigStoreController继承ConfigStore接口,ConfigStore为控制器核心的资源缓存接口提供了对Config资源的增删改查功能:

// pilot/pkg/model/config.go
type ConfigStore interface {// Schemas exposes the configuration type schema known by the config store.// The type schema defines the bidirectional mapping between configuration// types and the protobuf encoding schema.Schemas() collection.Schemas// Get retrieves a configuration element by a type and a keyGet(typ config.GroupVersionKind, name, namespace string) *config.Config// List returns objects by type and namespace.// Use "" for the namespace to list across namespaces.List(typ config.GroupVersionKind, namespace string) []config.Config// Create adds a new configuration object to the store. If an object with the// same name and namespace for the type already exists, the operation fails// with no side effects.Create(config config.Config) (revision string, err error)// Update modifies an existing configuration object in the store.  Update// requires that the object has been created.  Resource version prevents// overriding a value that has been changed between prior _Get_ and _Put_// operation to achieve optimistic concurrency. This method returns a new// revision if the operation succeeds.Update(config config.Config) (newRevision string, err error)UpdateStatus(config config.Config) (newRevision string, err error)// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid// read-modify-write conflicts when there are many concurrent-writers to the same resource.Patch(orig config.Config, patchFn config.PatchFunc) (string, error)// Delete removes an object from the store by key// For k8s, resourceVersion must be fulfilled before a deletion is carried out.// If not possible, a 409 Conflict status will be returned.Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

2)、ConfigController的初始化

Kubernetes ConfigController实际上是一个CRD Operator,它从Kubernetes API Server监听所有的Istio API资源,其初始化过程如下:

crdclient.New()方法代码如下:

// pilot/pkg/config/kube/crdclient/client.go
func New(client kube.Client, opts Option) (*Client, error) {schemas := collections.Pilotif features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI()}return NewForSchemas(client, opts, schemas)
}

collections.Pilot中定义了Istio所有的Config资源类型,代码如下:

// pkg/config/schema/collections/collections.gen.go// Pilot contains only collections used by Pilot.Pilot = collection.NewSchemasBuilder().MustAdd(AuthorizationPolicy).MustAdd(DestinationRule).MustAdd(EnvoyFilter).MustAdd(Gateway).MustAdd(PeerAuthentication).MustAdd(ProxyConfig).MustAdd(RequestAuthentication).MustAdd(ServiceEntry).MustAdd(Sidecar).MustAdd(Telemetry).MustAdd(VirtualService).MustAdd(WasmPlugin).MustAdd(WorkloadEntry).MustAdd(WorkloadGroup).Build()

crdclient.New()方法中调用了NewForSchemas()方法:

// pilot/pkg/config/kube/crdclient/client.go
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) (*Client, error) {schemasByCRDName := map[string]resource.Schema{}for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())schemasByCRDName[name] = s}// 实例化crd clientout := &Client{domainSuffix:     opts.DomainSuffix,schemas:          schemas,schemasByCRDName: schemasByCRDName,revision:         opts.Revision,queue:            queue.NewQueue(1 * time.Second),kinds:            map[config.GroupVersionKind]*cacheHandler{},handlers:         map[config.GroupVersionKind][]model.EventHandler{},client:           client,// 创建crdWatcher,监听crd的创建crdWatcher:       crdwatcher.NewController(client),logger:           scope.WithLabels("controller", opts.Identifier),namespacesFilter: opts.NamespacesFilter,crdWatches: map[config.GroupVersionKind]*waiter{gvk.KubernetesGateway: newWaiter(),gvk.GatewayClass:      newWaiter(),},}// 添加回调函数,当crd创建时调用handleCRDAdd方法out.crdWatcher.AddCallBack(func(name string) {handleCRDAdd(out, name)})// 获取集群中当前所有的crdknown, err := knownCRDs(client.Ext())if err != nil {return nil, err}// 遍历istio所有的config资源类型for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())if s.IsBuiltin() {handleCRDAdd(out, name)} else {// istio config资源类型对应crd已创建,调用handleCRDAdd方法if _, f := known[name]; f {handleCRDAdd(out, name)} else {out.logger.Warnf("Skipping CRD %v as it is not present", s.GroupVersionKind())}}}return out, nil
}

NewForSchemas()方法中实例化了CRD Client,CRD Client定义如下:

// pilot/pkg/config/kube/crdclient/client.go
type Client struct {// schemas defines the set of schemas used by this client.// Note: this must be a subset of the schemas defined in the codegenschemas collection.Schemas// domainSuffix for the config metadatadomainSuffix string// revision for this control plane instance. We will only read configs that match this revision.revision string// kinds keeps track of all cache handlers for known types// 记录所有资源类型对应的informer控制器kinds   map[config.GroupVersionKind]*cacheHandlerkindsMu sync.RWMutex// 事件处理队列queue queue.Instance// handlers defines a list of event handlers per-type// 资源类型及对应的事件处理回调函数handlers map[config.GroupVersionKind][]model.EventHandler// crd相关的schemaschemasByCRDName map[string]resource.Schema// kubernetes客户端,包含istioClient操作istio api对象,istio informer监听istio api对象变更事件client kube.Client// 监听crd的创建crdWatcher *crdwatcher.Controllerlogger     *log.Scope// namespacesFilter is only used to initiate filtered informer.namespacesFilter func(obj interface{}) bool// crdWatches notifies consumers when a CRD is presentcrdWatches map[config.GroupVersionKind]*waiterstop       <-chan struct{}
}

3)、ConfigController的工作机制

Kubernetes ConfigController为每种Config资源都创建了一个Informer,用于监听所有Config资源并注册EventHandler

NewForSchemas()方法中,如果Istio Config资源类型对应CRD已创建或者crdWatcher监听CRD创建后,都会调用handleCRDAdd()方法:

// pilot/pkg/config/kube/crdclient/client.go
func handleCRDAdd(cl *Client, name string) {cl.logger.Debugf("adding CRD %q", name)s, f := cl.schemasByCRDName[name]if !f {cl.logger.Debugf("added resource that we are not watching: %v", name)return}resourceGVK := s.GroupVersionKind()gvr := s.GroupVersionResource()cl.kindsMu.Lock()defer cl.kindsMu.Unlock()if _, f := cl.kinds[resourceGVK]; f {cl.logger.Debugf("added resource that already exists: %v", resourceGVK)return}var i informers.GenericInformervar ifactory startervar err error// 根据api group添加到不同的sharedInformerFactory中switch s.Group() {case gvk.KubernetesGateway.Group:ifactory = cl.client.GatewayAPIInformer()i, err = cl.client.GatewayAPIInformer().ForResource(gvr)case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:ifactory = cl.client.KubeInformer()i, err = cl.client.KubeInformer().ForResource(gvr)case gvk.CustomResourceDefinition.Group:ifactory = cl.client.ExtInformer()i, err = cl.client.ExtInformer().ForResource(gvr)default:ifactory = cl.client.IstioInformer()i, err = cl.client.IstioInformer().ForResource(gvr)}if err != nil {// Shouldn't happencl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)return}_ = i.Informer().SetTransform(kube.StripUnusedFields)// 调用createCacheHandler方法,为informer添加事件回调函数cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)if w, f := cl.crdWatches[resourceGVK]; f {cl.logger.Infof("notifying watchers %v was created", resourceGVK)w.once.Do(func() {close(w.stop)})}// Start informer. In startup case, we will not start here as// we will start all factories once we are ready to initialize.// For dynamically added CRDs, we need to start immediately thoughif cl.stop != nil {// 启动informerifactory.Start(cl.stop)}
}

每种Informer的事件回调函数均通过createCacheHandler()方法注册,代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func createCacheHandler(cl *Client, schema resource.Schema, i informers.GenericInformer) *cacheHandler {scope.Debugf("registered CRD %v", schema.GroupVersionKind())h := &cacheHandler{client: cl,schema: schema,// 创建informer,支持配置namespace级别隔离informer: kclient.NewUntyped(cl.client, i.Informer(), kclient.Filter{ObjectFilter: cl.namespacesFilter}),}kind := schema.Kind()// 添加事件回调函数h.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj any) {incrementEvent(kind, "add")// 创建任务对象并将其发送到任务队列中cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventAdd)})},UpdateFunc: func(old, cur any) {incrementEvent(kind, "update")cl.queue.Push(func() error {return h.onEvent(old, cur, model.EventUpdate)})},DeleteFunc: func(obj any) {incrementEvent(kind, "delete")cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventDelete)})},})return h
}

当Config资源在Kubernetes中创建、更新和删除时,EventHandler会创建任务对象并将其发送到任务列中,然后由任务处理协程处理。处理资源变化的onEvent()方法代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func (h *cacheHandler) onEvent(old any, curr any, event model.Event) error {currItem := controllers.ExtractObject(curr)if currItem == nil {return nil}// 进行对象转换currConfig := TranslateObject(currItem, h.schema.GroupVersionKind(), h.client.domainSuffix)var oldConfig config.Configif old != nil {oldItem, ok := old.(runtime.Object)if !ok {log.Warnf("Old Object can not be converted to runtime Object %v, is type %T", old, old)return nil}oldConfig = TranslateObject(oldItem, h.schema.GroupVersionKind(), h.client.domainSuffix)}if h.client.objectInRevision(&currConfig) {// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, event)return nil}// Check if the object was in our revision, but has been moved to a different revision. If so,// it has been effectively deleted from our revision, so process it as a delete event.if event == model.EventUpdate && old != nil && h.client.objectInRevision(&oldConfig) {log.Debugf("Object %s/%s has been moved to a different revision, deleting",currConfig.Namespace, currConfig.Name)// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, model.EventDelete)return nil}log.Debugf("Skipping event %s for object %s/%s from different revision",event, currConfig.Namespace, currConfig.Name)return nil
}func (h *cacheHandler) callHandlers(old config.Config, curr config.Config, event model.Event) {// TODO we may consider passing a pointer to handlers instead of the value. While spec is a pointer, the meta will be copied// 执行该资源类型对应的事件处理回调函数for _, f := range h.client.handlers[h.schema.GroupVersionKind()] {f(old, curr, event)}
}

onEvent()方法中通过TranslateObject()方法进行对象转换,然后执行该资源类型对应的事件处理回调函数

h.client.handlers是各种资源类型的处理函数集合,是通过ConfigController的RegisterEventHandler()注册的,注册代码如下:

// pilot/pkg/bootstrap/server.go
func (s *Server) initRegistryEventHandlers() {...if s.configController != nil {configHandler := func(prev config.Config, curr config.Config, event model.Event) {defer func() {// 状态报告if event != model.EventDelete {s.statusReporter.AddInProgressResource(curr)} else {s.statusReporter.DeleteInProgressResource(curr)}}()log.Debugf("Handle event %s for configuration %s", event, curr.Key())// For update events, trigger push only if spec has changed.// 对于更新事件,仅当对象的spec发生变化时才触发xds推送if event == model.EventUpdate && !needsPush(prev, curr) {log.Debugf("skipping push for %s as spec has not changed", prev.Key())return}// 触发xds全量更新pushReq := &model.PushRequest{Full:           true,ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),Reason:         []model.TriggerReason{model.ConfigUpdate},}s.XDSServer.ConfigUpdate(pushReq)}schemas := collections.Pilot.All()if features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI().All()}for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.// 下面3种类型在serviceEntry controller中处理,这里不用为其注册事件处理函数if schema.GroupVersionKind() == gvk.ServiceEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadGroup {continue}// 注册其他所有api对象的事件处理函数s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)}...
}

完整的Config事件处理流程如下图所示:

  1. EventHandler构造任务(Task),任务实际上是对onEvent函数的封装
  2. EventHandler将任务推送到任务队列中
  3. 任务处理协程阻塞式地读取任务队列,执行任务,通过onEvent方法处理事件,并通过ConfigHandler触发xDS的更新

参考:

《Istio权威指南 下》

3.深入Istio源码:Pilot配置规则ConfigController

Istio Pilot代码深度解析

相关文章:

Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现

本文基于Istio 1.18.0版本进行源码学习 1、Pilot-Discovery工作原理 Pilot-Discovery是Istio控制面的核心&#xff0c;负责服务网格中的流量管理以及控制面和数据面之间的配置下发 Pilot-Discovery从注册中心&#xff08;如Kubernetes&#xff09;获取服务信息并汇集&#xff…...

数据结构:顺序表(C实现)

个人主页 水月梦镜花 个人专栏 C语言 &#xff0c;数据结构 文章目录 一、顺序表二、实现思路1.存储结构2.初始化顺序表(SeqListInit)3.销毁顺序表(SeqListDestroty)4.打印顺序表(SeqListPrint)5.顺序表尾插(SeqListPushBack)and检查容量(SeqListCheckCapacity)6.顺序表头插(Se…...

素描基础知识

素描基础入门 1.基础线条 1.1 握笔姿势及长线条 2.排线 2.1 不同姿势画排线 2.1.1 姿势画排线 2.1.2 用手腕画排线 2.1.3 小拇指画排线 2.1.4 叠加排线 2.1.5交叉排线 2.2 纸张擦法 2.3 排线学习榜样 2.4 四种常见的排线 3、定向连线 4、一点透视 4.1 透视的规律 4.2 焦点透视…...

【Chat GPT】用 ChatGPT 运行 Python

前言 ChatGPT 是一个基于 GPT-2 模型的人工智能聊天机器人&#xff0c;它可以进行智能对话&#xff0c;同时还支持 Python 编程语言的运行&#xff0c;可以通过 API 接口进行调用。本文将介绍如何使用 ChatGPT 运行 Python 代码&#xff0c;并提供一个实际代码案例。 ChatGPT …...

cartographer发布畸变矫正后的scan数据

实现方式&#xff1a; 模仿源代码&#xff0c;在cartographer_ros写一个函数&#xff0c;以函数指针的方式传入cartographer后端&#xff0c;然后接收矫正后的scan数据&#xff0c;然后按照话题laserScan发布出来。 需要同时发布点云强度信息的&#xff0c;还要自己添加含有强度…...

Idea中git push to origin/master was rejected错误解决方案

Idea中git push to origin/master was rejected错误解决方案 问题描述解决方法 问题描述 idea开发中,需要将项目发布到gitee上,在gitee上创建仓库后,通过idea中git推送项目代码提示: push to origin/master was rejected 解决方法 gitee创建仓库时创建了README.md文件,本地…...

docker版jxTMS使用指南:自定义频率型动态管控

本文讲解4.4版jxTMS中如何自行定义一个频率型的动态管控&#xff0c;整个系列的文章请查看&#xff1a;docker版jxTMS使用指南&#xff1a;4.4版升级内容 docker版本的使用&#xff0c;请查看&#xff1a;docker版jxTMS使用指南 4.0版jxTMS的说明&#xff0c;请查看&#xff…...

【Docker】初识Docker以及Docker安装与阿里云镜像配置

目录 一、初识Docker 二、安装Docker 三、Docker架构 四、配置Docker镜像加速器 一、初识Docker Docker是一个开源的应用容器引擎&#xff0c;诞生于2013年&#xff0c;基于Go语言实现&#xff0c;dotCloud公司出品&#xff0c;Docker开源让开发者打包他们的应用以及依赖包到…...

C语言:动态内存管理

文章目录 一、动态内存函数1. malloc2. calloc3. realloc4. free 二、常见的错误1.malloc或calloc开辟的空间未检查2.越界访问3.对非malloc和calloc开辟的空间&#xff0c;用free释放4.对同一块动态内存多次释放5.用free释放动态内存的一部分 三、通讯录(动态版本改写)总结 一、…...

如何往MySQL中插入100万条数据?

需求 现在有一个 数据量 为100万的数据样本 100w_data.sql 其数据格式如下&#xff0c;截取最后十条数据 999991,XxGdnLZObA999991,XxGdnLZObA,XxGdnLZObA,2020-3-18,1 999992,TBBchSKobC999992,TBBchSKobC,TBBchSKobC,2020-9-8,2 999993,rfwgLkYhUz999993,rfwgLkYhUz,rfwgLk…...

IntelliJ IDEA 2023.2 最新变化

主要更新 AI Assistant 限定访问 Ultimate 在此版本中&#xff0c;我们为 IntelliJ IDEA 引入了一项重要补充 – AI Assistant。 AI Assistant 当前具备一组由 AI 提供支持的初始功能&#xff0c;提供集成式 AI 聊天&#xff0c;可以完成一些任务&#xff0c;例如自动编写文档…...

1300*B. T-primes

解析&#xff1a; 有且只有三个因数&#xff0c;当且仅当&#xff0c;完全平方数并且sqrt&#xff08;n&#xff09;为素数 #include<bits/stdc.h> using namespace std; typedef long long ll; const int N1e55; ll t,n; bool prime(ll x){if(x<2) return 0;for(int…...

重新C++系列之运算符重载

一、什么是运算符重载 简单来讲就是对运算符赋予新的意义&#xff0c;但是又不能改变原有的含义&#xff0c;它本身也就是一个函数。运算符重载的本质是以函数的方式来体现。 二、运算符重载有几种 1、按照作用域来划分&#xff0c;有全局操作符重载函数和成员函数操作符重载函…...

kotlin异常处理try-catch-finally

kotlin异常处理try-catch-finally fun main(args: Array<String>) {try {println("a")} catch (e: Exception) {//异常捕获println("a-catch: $e")} finally {//善后&#xff0c;无论是否异常&#xff0c;都会执行println("a-finally")}t…...

Pytorch在cuda、AMD DirectML和AMD CPU下性能比较

一、测试环境 CUDA环境: i7-8550u 16G DDR4 2133MHz nVidia MX150 2GB AMD DirectML环境: Ryzen 5 5600G 32G DDR4 3200MHz Vega7 4GB AMD 纯CPU环境&#xff1a;Ryzen 5 5600G 32G DDR4 3200MHz 其他硬件配置的硬盘、电源均一致。Pytorch版本为2.0.0&#xff0c;Pyt…...

哈工大计算机网络课程局域网详解之:交换机概念

哈工大计算机网络课程局域网详解之&#xff1a;交换机概念 文章目录 哈工大计算机网络课程局域网详解之&#xff1a;交换机概念以太网交换机&#xff08;switch&#xff09;交换机&#xff1a;多端口间同时传输交换机转发表&#xff1a;交换表交换机&#xff1a;自学习交换机互…...

Jenkins Pipeline的hasProperty函数

函数的作用 用于判断某个参数或者字段是否存在。 用法 例子一 def projectStr "P1,P2,P3" pipeline {agent anyparameters {extendedChoice(defaultValue: "${projectStr}",description: 选择要发布的项目,multiSelectDelimiter: ,,name: SELECT_PROJ…...

芯片制造详解.净洁室的秘密.学习笔记(三)

这是芯片制造系列的第三期跟学up主三圈&#xff0c;这里对其视频内容做了一下整理和归纳&#xff0c;喜欢的可以看原视频。 芯片制造详解03&#xff1a; 洁净室的秘密&#xff5c;为何芯片厂缺人&#xff1f; 芯片制造详解.净洁室的秘密.学习笔记 三 简介一、干净的级别二、芯片…...

可解释的 AI:在transformer中可视化注意力

Visualizing Attention in Transformers | Generative AI (medium.com) 一、说明 在本文中&#xff0c;我们将探讨可视化变压器架构核心区别特征的最流行的工具之一&#xff1a;注意力机制。继续阅读以了解有关BertViz的更多信息&#xff0c;以及如何将此注意力可视化工具整合到…...

k8s Webhook 使用java springboot实现webhook 学习总结

k8s Webhook 使用java springboot实现webhook 学习总结 大纲 基础概念准入控制器&#xff08;Admission Controllers&#xff09;ValidatingWebhookConfiguration 与 MutatingWebhookConfiguration准入检查&#xff08;AdmissionReview&#xff09;使用Springboot实现k8s-Web…...

JS逆向之猿人学爬虫第20题-wasm

文章目录 题目地址sign参数分析python算法还原往期逆向文章推荐题目地址 https://match.yuanrenxue.cn/match/20第20题被置顶到了第1页,题目难度 写的是中等 算法很简单,就一个标准的md5算法,主要是盐值不确定, 而盐值就在wasm里面,可以说难点就在于wasm分析 sign参数分…...

【双指针优化DP】The 2022 Hangzhou Normal U Summer Trials H

Problem - H - Codeforces 题意&#xff1a; 思路&#xff1a; 首先很明显是DP 因为只有1e6个站点&#xff0c;因此可以以站点作为阶段 注意到K很小&#xff0c;因此可以尝试把这个当作第二维 设dp[i][j]为到达第i个站点&#xff0c;已经花了j元钱的最小步数 然后就想了一…...

[论文笔记] LLM数据集——金融数据集

一、chatglm_金融 ModelScope 魔搭社区 请将modelscope sdk升级到v1.7.2rc0&#xff0c;执行&#xff1a; ​ pip3 install "modelscope1.7.2rc0" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html # 方式1 git clone http://www.modelscope…...

在亚马逊平台,如何有效举报违规行为?

众所周知&#xff0c;在每个行业都有一些违规现象&#xff0c;甚至这些违规现象还会给自己带来利益方面的损失&#xff0c;一旦触犯到自己的利益的话&#xff0c;那自己是需要想办法解决的&#xff0c;想办法规避。 就拿开亚马逊店铺来说&#xff0c;比较容易遇到的就是产品侵…...

深度学习入门教学——神经网络

深度学习就是训练神经网络。 1、神经网络 举个最简单的例子&#xff0c;以下是一个使用线性回归来预测房屋价格的函数。这样一个用于预测房屋价格的函数被称作是一单个神经元。大一点的神经网络&#xff0c;就是将这些单个神经元叠加起来。例如&#xff1a;神经网络根据多个相…...

阿里Java开发手册~OOP 规约

1. 【强制】避免通过一个类的对象引用访问此类的静态变量或静态方法&#xff0c;无谓增加编译器解析成 本&#xff0c;直接用 类名 来访问即可。 2. 【强制】所有的覆写方法&#xff0c;必须加 Override 注解。 说明&#xff1a; getObject() 与 get 0 bject() 的问题。…...

【Mysql数据库面试01】内连接 左连接 右连接 全连接

【Mysql数据库】内连接 左连接 右连接 全连接 0.准备1.内连接1.1 SQL(不带where)1.2 SQL&#xff08;带where&#xff09;1.3总结 2.左连接2.1SQL&#xff08;不带where&#xff09;2.2SQL&#xff08;带where&#xff09;2.3总结 3.右连接3.1 SQL&#xff08;不带where&#x…...

事务隔离:为什么你改了我还看不见

前提概要 你肯定不陌生&#xff0c;和数据库打交道的时候&#xff0c;我们总是会用到事务。最经典的例子就 是转账&#xff0c;你要给朋友小王转 100 块钱&#xff0c;而此时你的银行卡只有 100 块钱。 转账过程具体到程序里会有一系列的操作&#xff0c;比如查询余额、做加减法…...

吴恩达ChatGPT《LangChain Chat with Your Data》笔记

文章目录 1. Introduction2. Document Loading2.1 Retrieval Augmented Generation&#xff08;RAG&#xff09;2.2 Load PDFs2.3 Load YouTube2.4 Load URLs2.5 Load Notion 3. Document Splitting3.1 Splitter Flow3.2 Character Splitter3.3 Token Splitter3.4 Markdown Spl…...

https和http有什么区别

https和http有什么区别 简要 区别如下&#xff1a; ​ https的端口是443.而http的端口是80&#xff0c;且二者连接方式不同&#xff1b;http传输时明文&#xff0c;而https是用ssl进行加密的&#xff0c;https的安全性更高&#xff1b;https是需要申请证书的&#xff0c;而h…...