Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现
本文基于Istio 1.18.0版本进行源码学习
1、Pilot-Discovery工作原理
Pilot-Discovery是Istio控制面的核心,负责服务网格中的流量管理以及控制面和数据面之间的配置下发
Pilot-Discovery从注册中心(如Kubernetes)获取服务信息并汇集,从Kubernetes API Server中获取配置规则,将服务信息和配置数据转换为xDS接口的标准数据结构,通过GRPC下发到数据面的Envoy
2、Pilot-Discovery代码结构

Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中创建了Pilot Server,Pilot Server中主要包含三部分逻辑:
- ConfigController:管理各种配置数据,包括用户创建的流量管理规则和策略
- ServiceController:获取Service Registry中的服务发现数据
- DiscoveryService:主要包含下述逻辑:
- 启动GRPC Server并接收来自Envoy端的连接请求
- 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
- 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy
3、Pilot-Discovery启动流程
创建Pilot Server代码如下:
// pilot/pkg/bootstrap/server.go
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {e := model.NewEnvironment()e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffixe.SetLedger(buildLedger(args.RegistryOptions))ac := aggregate.NewController(aggregate.Options{MeshHolder: e,})e.ServiceDiscovery = acs := &Server{clusterID: getClusterID(args),environment: e,fileWatcher: filewatcher.NewWatcher(),httpMux: http.NewServeMux(),monitoringMux: http.NewServeMux(),readinessProbes: make(map[string]readinessProbe),readinessFlags: &readinessFlags{},workloadTrustBundle: tb.NewTrustBundle(nil),server: server.New(),shutdownDuration: args.ShutdownDuration,internalStop: make(chan struct{}),istiodCertBundleWatcher: keycertbundle.NewWatcher(),webhookInfo: &webhookInfo{},}// Apply custom initialization functions.for _, fn := range initFuncs {fn(s)}// Initialize workload Trust Bundle before XDS Servere.TrustBundle = s.workloadTrustBundle// 初始化discoveryServers.XDSServer = xds.NewDiscoveryServer(e, args.PodName, s.clusterID, args.RegistryOptions.KubeOptions.ClusterAliases)prometheus.EnableHandlingTimeHistogram()// make sure we have a readiness probe before serving HTTP to avoid marking ready too soons.initReadinessProbes()// 初始化http和grpc server,向grpc server注册discoveryServers.initServers(args)if err := s.initIstiodAdminServer(args, s.webhookInfo.GetTemplates); err != nil {return nil, fmt.Errorf("error initializing debug server: %v", err)}if err := s.serveHTTP(); err != nil {return nil, fmt.Errorf("error serving http: %v", err)}// Apply the arguments to the configuration.if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err)}// used for both initKubeRegistry and initClusterRegistriesargs.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)s.initMeshConfiguration(args, s.fileWatcher)spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())s.initMeshNetworks(args, s.fileWatcher)s.initMeshHandlers()s.environment.Init()if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {return nil, err}// Options based on the current 'defaults' in istio.caOpts := &caOptions{TrustDomain: s.environment.Mesh().TrustDomain,Namespace: args.Namespace,DiscoveryFilter: args.RegistryOptions.KubeOptions.GetFilter(),ExternalCAType: ra.CaExternalType(externalCaType),CertSignerDomain: features.CertSignerDomain,}if caOpts.ExternalCAType == ra.ExtCAK8s {// Older environment variable preserved for backward compatibilitycaOpts.ExternalCASigner = k8sSigner}// CA signing certificate must be created first if needed.if err := s.maybeCreateCA(caOpts); err != nil {return nil, err}// 初始化configController和serviceControllerif err := s.initControllers(args); err != nil {return nil, err}s.XDSServer.InitGenerators(e, args.Namespace, s.internalDebugMux)// Initialize workloadTrustBundle after CA has been initializedif err := s.initWorkloadTrustBundle(args); err != nil {return nil, err}// Parse and validate Istiod Address.istiodHost, _, err := e.GetDiscoveryAddress()if err != nil {return nil, err}// Create Istiod certs and setup watches.if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {return nil, err}// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.if err := s.initSecureDiscoveryService(args); err != nil {return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)}// common https server for webhooks (e.g. injection, validation)if s.kubeClient != nil {s.initSecureWebhookServer(args)wh, err := s.initSidecarInjector(args)if err != nil {return nil, fmt.Errorf("error initializing sidecar injector: %v", err)}s.webhookInfo.mu.Lock()s.webhookInfo.wh = whs.webhookInfo.mu.Unlock()if err := s.initConfigValidation(args); err != nil {return nil, fmt.Errorf("error initializing config validator: %v", err)}}// This should be called only after controllers are initialized.// 向configController和serviceController注册事件回调函数s.initRegistryEventHandlers()// 设置discoveryServer启动函数s.initDiscoveryService()// Notice that the order of authenticators matters, since at runtime// authenticators are activated sequentially and the first successful attempt// is used as the authentication result.authenticators := []security.Authenticator{&authenticate.ClientCertAuthenticator{},}if args.JwtRule != "" {jwtAuthn, err := initOIDC(args)if err != nil {return nil, fmt.Errorf("error initializing OIDC: %v", err)}if jwtAuthn == nil {return nil, fmt.Errorf("JWT authenticator is nil")}authenticators = append(authenticators, jwtAuthn)}// The k8s JWT authenticator requires the multicluster registry to be initialized,// so we build it later.if s.kubeClient != nil {authenticators = append(authenticators,kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient.Kube(), s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))}if len(features.TrustedGatewayCIDR) > 0 {authenticators = append(authenticators, &authenticate.XfccAuthenticator{})}if features.XDSAuth {s.XDSServer.Authenticators = authenticators}caOpts.Authenticators = authenticators// Start CA or RA server. This should be called after CA and Istiod certs have been created.s.startCA(caOpts)// TODO: don't run this if galley is started, one ctlz is enoughif args.CtrlZOptions != nil {_, _ = ctrlz.Run(args.CtrlZOptions, nil)}// This must be last, otherwise we will not know which informers to registerif s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil})}return s, nil
}
NewServer()方法中核心逻辑如下:
- 初始化DiscoveryServer
- 初始化HTTP和GRPC Server,向GRPC Server注册DiscoveryServer
- 初始化ConfigController和ServiceController
- 向ConfigController和ServiceController注册事件回调函数,有配置和服务信息变更时会通知DiscoveryServer
- 设置DiscoveryServer启动函数
Pilot Server定义如下:
// pilot/pkg/bootstrap/server.go
type Server struct {// discoveryServerXDSServer *xds.DiscoveryServerclusterID cluster.ID// pilot环境所需的api集合environment *model.Environment// 处理kubernetes主集群的注册中心kubeClient kubelib.Client// 处理kubernetes多个集群的注册中心multiclusterController *multicluster.Controller// 统一处理配置规则的controllerconfigController model.ConfigStoreController// 配置规则缓存ConfigStores []model.ConfigStoreController// 负责serviceEntry的服务发现serviceEntryController *serviceentry.ControllerhttpServer *http.Server // debug, monitoring and readiness Server.httpAddr stringhttpsServer *http.Server // webhooks HTTPS Server.grpcServer *grpc.ServergrpcAddress stringsecureGrpcServer *grpc.ServersecureGrpcAddress string// monitoringMux listens on monitoringAddr(:15014).// Currently runs prometheus monitoring and debug (if enabled).monitoringMux *http.ServeMux// internalDebugMux is a mux for *internal* calls to the debug interface. That is, authentication is disabled.internalDebugMux *http.ServeMux// httpMux listens on the httpAddr (8080).// If a Gateway is used in front and https is off it is also multiplexing// the rest of the features if their port is empty.// Currently runs readiness and debug (if enabled)httpMux *http.ServeMux// httpsMux listens on the httpsAddr(15017), handling webhooks// If the address os empty, the webhooks will be set on the default httpPort.httpsMux *http.ServeMux // webhooks// fileWatcher used to watch mesh config, networks and certificates.fileWatcher filewatcher.FileWatcher// certWatcher watches the certificates for changes and triggers a notification to Istiod.cacertsWatcher *fsnotify.WatcherdnsNames []stringCA *ca.IstioCARA ra.RegistrationAuthority// TrustAnchors for workload to workload mTLSworkloadTrustBundle *tb.TrustBundlecertMu sync.RWMutexistiodCert *tls.CertificateistiodCertBundleWatcher *keycertbundle.Watcher// pilot的所有组件都注册启动任务到此对象,便于在Start()方法中批量启动及管理server server.InstancereadinessProbes map[string]readinessProbereadinessFlags *readinessFlags// duration used for graceful shutdown.shutdownDuration time.Duration// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in// favor of AddStartFunc. This is only required if we *must* start something outside of this process.// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything// in AddStartFuncinternalStop chan struct{}webhookInfo *webhookInfostatusReporter *distribution.ReporterstatusManager *status.Manager// RWConfigStore is the configstore which allows updates, particularly for status.RWConfigStore model.ConfigStoreController
}
Pilot-Discovery启动流程如下图:

3、配置规则发现:ConfigController
Pilot的配置规则指网络路由规则及网络安全规则,包含Virtualservice、Destinationrule、Gateway、PeerAuthentication、RequestAuthentication等资源。目前支持三种类型的ConfigController:
- MCP:是一种服务网格配置传输协议,用于隔离Pilot和底层平台(Kubernetes、文件系统或者其他注册中心),使得Pilot无需感知底层平台的差异,更专注于Envoy xDS配置的生成与分发
- Kubernetes:基于Kubernetes的Config发现利用了Kubernetes Informer的List-Watch能力。在Kubernetes集群中,Config以CustomResource的形式存在。Pilot通过配置控制器(CRD Controller)监听Kubernetes APIServer配置规则资源,维护所有资源的缓存,并触发事件处理回调函数
- File:通过文件监视器周期性地读取本地配置文件,将配置规则缓存在内存中,并维护配置的增加、更新、删除事件,当缓存有变化时,异步通知内存控制器执行事件回调函数
1)、ConfigController的核心接口
ConfigController实现了ConfigStoreController接口:
// pilot/pkg/model/config.go
type ConfigStoreController interface {// 配置缓存接口ConfigStore// 注册事件处理函数// RegisterEventHandler adds a handler to receive config update events for a// configuration typeRegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)// 运行控制器// Run until a signal is received.// Run *should* block, so callers should typically call `go controller.Run(stop)`Run(stop <-chan struct{})// 配置缓存是否已同步// HasSynced returns true after initial cache synchronization is completeHasSynced() bool
}
ConfigStoreController继承ConfigStore接口,ConfigStore为控制器核心的资源缓存接口提供了对Config资源的增删改查功能:
// pilot/pkg/model/config.go
type ConfigStore interface {// Schemas exposes the configuration type schema known by the config store.// The type schema defines the bidirectional mapping between configuration// types and the protobuf encoding schema.Schemas() collection.Schemas// Get retrieves a configuration element by a type and a keyGet(typ config.GroupVersionKind, name, namespace string) *config.Config// List returns objects by type and namespace.// Use "" for the namespace to list across namespaces.List(typ config.GroupVersionKind, namespace string) []config.Config// Create adds a new configuration object to the store. If an object with the// same name and namespace for the type already exists, the operation fails// with no side effects.Create(config config.Config) (revision string, err error)// Update modifies an existing configuration object in the store. Update// requires that the object has been created. Resource version prevents// overriding a value that has been changed between prior _Get_ and _Put_// operation to achieve optimistic concurrency. This method returns a new// revision if the operation succeeds.Update(config config.Config) (newRevision string, err error)UpdateStatus(config config.Config) (newRevision string, err error)// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid// read-modify-write conflicts when there are many concurrent-writers to the same resource.Patch(orig config.Config, patchFn config.PatchFunc) (string, error)// Delete removes an object from the store by key// For k8s, resourceVersion must be fulfilled before a deletion is carried out.// If not possible, a 409 Conflict status will be returned.Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}
2)、ConfigController的初始化
Kubernetes ConfigController实际上是一个CRD Operator,它从Kubernetes API Server监听所有的Istio API资源,其初始化过程如下:
crdclient.New()方法代码如下:
// pilot/pkg/config/kube/crdclient/client.go
func New(client kube.Client, opts Option) (*Client, error) {schemas := collections.Pilotif features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI()}return NewForSchemas(client, opts, schemas)
}
collections.Pilot中定义了Istio所有的Config资源类型,代码如下:
// pkg/config/schema/collections/collections.gen.go// Pilot contains only collections used by Pilot.Pilot = collection.NewSchemasBuilder().MustAdd(AuthorizationPolicy).MustAdd(DestinationRule).MustAdd(EnvoyFilter).MustAdd(Gateway).MustAdd(PeerAuthentication).MustAdd(ProxyConfig).MustAdd(RequestAuthentication).MustAdd(ServiceEntry).MustAdd(Sidecar).MustAdd(Telemetry).MustAdd(VirtualService).MustAdd(WasmPlugin).MustAdd(WorkloadEntry).MustAdd(WorkloadGroup).Build()
crdclient.New()方法中调用了NewForSchemas()方法:
// pilot/pkg/config/kube/crdclient/client.go
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) (*Client, error) {schemasByCRDName := map[string]resource.Schema{}for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())schemasByCRDName[name] = s}// 实例化crd clientout := &Client{domainSuffix: opts.DomainSuffix,schemas: schemas,schemasByCRDName: schemasByCRDName,revision: opts.Revision,queue: queue.NewQueue(1 * time.Second),kinds: map[config.GroupVersionKind]*cacheHandler{},handlers: map[config.GroupVersionKind][]model.EventHandler{},client: client,// 创建crdWatcher,监听crd的创建crdWatcher: crdwatcher.NewController(client),logger: scope.WithLabels("controller", opts.Identifier),namespacesFilter: opts.NamespacesFilter,crdWatches: map[config.GroupVersionKind]*waiter{gvk.KubernetesGateway: newWaiter(),gvk.GatewayClass: newWaiter(),},}// 添加回调函数,当crd创建时调用handleCRDAdd方法out.crdWatcher.AddCallBack(func(name string) {handleCRDAdd(out, name)})// 获取集群中当前所有的crdknown, err := knownCRDs(client.Ext())if err != nil {return nil, err}// 遍历istio所有的config资源类型for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())if s.IsBuiltin() {handleCRDAdd(out, name)} else {// istio config资源类型对应crd已创建,调用handleCRDAdd方法if _, f := known[name]; f {handleCRDAdd(out, name)} else {out.logger.Warnf("Skipping CRD %v as it is not present", s.GroupVersionKind())}}}return out, nil
}
NewForSchemas()方法中实例化了CRD Client,CRD Client定义如下:
// pilot/pkg/config/kube/crdclient/client.go
type Client struct {// schemas defines the set of schemas used by this client.// Note: this must be a subset of the schemas defined in the codegenschemas collection.Schemas// domainSuffix for the config metadatadomainSuffix string// revision for this control plane instance. We will only read configs that match this revision.revision string// kinds keeps track of all cache handlers for known types// 记录所有资源类型对应的informer控制器kinds map[config.GroupVersionKind]*cacheHandlerkindsMu sync.RWMutex// 事件处理队列queue queue.Instance// handlers defines a list of event handlers per-type// 资源类型及对应的事件处理回调函数handlers map[config.GroupVersionKind][]model.EventHandler// crd相关的schemaschemasByCRDName map[string]resource.Schema// kubernetes客户端,包含istioClient操作istio api对象,istio informer监听istio api对象变更事件client kube.Client// 监听crd的创建crdWatcher *crdwatcher.Controllerlogger *log.Scope// namespacesFilter is only used to initiate filtered informer.namespacesFilter func(obj interface{}) bool// crdWatches notifies consumers when a CRD is presentcrdWatches map[config.GroupVersionKind]*waiterstop <-chan struct{}
}
3)、ConfigController的工作机制
Kubernetes ConfigController为每种Config资源都创建了一个Informer,用于监听所有Config资源并注册EventHandler
NewForSchemas()方法中,如果Istio Config资源类型对应CRD已创建或者crdWatcher监听CRD创建后,都会调用handleCRDAdd()方法:
// pilot/pkg/config/kube/crdclient/client.go
func handleCRDAdd(cl *Client, name string) {cl.logger.Debugf("adding CRD %q", name)s, f := cl.schemasByCRDName[name]if !f {cl.logger.Debugf("added resource that we are not watching: %v", name)return}resourceGVK := s.GroupVersionKind()gvr := s.GroupVersionResource()cl.kindsMu.Lock()defer cl.kindsMu.Unlock()if _, f := cl.kinds[resourceGVK]; f {cl.logger.Debugf("added resource that already exists: %v", resourceGVK)return}var i informers.GenericInformervar ifactory startervar err error// 根据api group添加到不同的sharedInformerFactory中switch s.Group() {case gvk.KubernetesGateway.Group:ifactory = cl.client.GatewayAPIInformer()i, err = cl.client.GatewayAPIInformer().ForResource(gvr)case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:ifactory = cl.client.KubeInformer()i, err = cl.client.KubeInformer().ForResource(gvr)case gvk.CustomResourceDefinition.Group:ifactory = cl.client.ExtInformer()i, err = cl.client.ExtInformer().ForResource(gvr)default:ifactory = cl.client.IstioInformer()i, err = cl.client.IstioInformer().ForResource(gvr)}if err != nil {// Shouldn't happencl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)return}_ = i.Informer().SetTransform(kube.StripUnusedFields)// 调用createCacheHandler方法,为informer添加事件回调函数cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)if w, f := cl.crdWatches[resourceGVK]; f {cl.logger.Infof("notifying watchers %v was created", resourceGVK)w.once.Do(func() {close(w.stop)})}// Start informer. In startup case, we will not start here as// we will start all factories once we are ready to initialize.// For dynamically added CRDs, we need to start immediately thoughif cl.stop != nil {// 启动informerifactory.Start(cl.stop)}
}
每种Informer的事件回调函数均通过createCacheHandler()方法注册,代码如下:
// pilot/pkg/config/kube/crdclient/cache_handler.go
func createCacheHandler(cl *Client, schema resource.Schema, i informers.GenericInformer) *cacheHandler {scope.Debugf("registered CRD %v", schema.GroupVersionKind())h := &cacheHandler{client: cl,schema: schema,// 创建informer,支持配置namespace级别隔离informer: kclient.NewUntyped(cl.client, i.Informer(), kclient.Filter{ObjectFilter: cl.namespacesFilter}),}kind := schema.Kind()// 添加事件回调函数h.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj any) {incrementEvent(kind, "add")// 创建任务对象并将其发送到任务队列中cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventAdd)})},UpdateFunc: func(old, cur any) {incrementEvent(kind, "update")cl.queue.Push(func() error {return h.onEvent(old, cur, model.EventUpdate)})},DeleteFunc: func(obj any) {incrementEvent(kind, "delete")cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventDelete)})},})return h
}
当Config资源在Kubernetes中创建、更新和删除时,EventHandler会创建任务对象并将其发送到任务列中,然后由任务处理协程处理。处理资源变化的onEvent()方法代码如下:
// pilot/pkg/config/kube/crdclient/cache_handler.go
func (h *cacheHandler) onEvent(old any, curr any, event model.Event) error {currItem := controllers.ExtractObject(curr)if currItem == nil {return nil}// 进行对象转换currConfig := TranslateObject(currItem, h.schema.GroupVersionKind(), h.client.domainSuffix)var oldConfig config.Configif old != nil {oldItem, ok := old.(runtime.Object)if !ok {log.Warnf("Old Object can not be converted to runtime Object %v, is type %T", old, old)return nil}oldConfig = TranslateObject(oldItem, h.schema.GroupVersionKind(), h.client.domainSuffix)}if h.client.objectInRevision(&currConfig) {// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, event)return nil}// Check if the object was in our revision, but has been moved to a different revision. If so,// it has been effectively deleted from our revision, so process it as a delete event.if event == model.EventUpdate && old != nil && h.client.objectInRevision(&oldConfig) {log.Debugf("Object %s/%s has been moved to a different revision, deleting",currConfig.Namespace, currConfig.Name)// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, model.EventDelete)return nil}log.Debugf("Skipping event %s for object %s/%s from different revision",event, currConfig.Namespace, currConfig.Name)return nil
}func (h *cacheHandler) callHandlers(old config.Config, curr config.Config, event model.Event) {// TODO we may consider passing a pointer to handlers instead of the value. While spec is a pointer, the meta will be copied// 执行该资源类型对应的事件处理回调函数for _, f := range h.client.handlers[h.schema.GroupVersionKind()] {f(old, curr, event)}
}
onEvent()方法中通过TranslateObject()方法进行对象转换,然后执行该资源类型对应的事件处理回调函数
h.client.handlers是各种资源类型的处理函数集合,是通过ConfigController的RegisterEventHandler()注册的,注册代码如下:
// pilot/pkg/bootstrap/server.go
func (s *Server) initRegistryEventHandlers() {...if s.configController != nil {configHandler := func(prev config.Config, curr config.Config, event model.Event) {defer func() {// 状态报告if event != model.EventDelete {s.statusReporter.AddInProgressResource(curr)} else {s.statusReporter.DeleteInProgressResource(curr)}}()log.Debugf("Handle event %s for configuration %s", event, curr.Key())// For update events, trigger push only if spec has changed.// 对于更新事件,仅当对象的spec发生变化时才触发xds推送if event == model.EventUpdate && !needsPush(prev, curr) {log.Debugf("skipping push for %s as spec has not changed", prev.Key())return}// 触发xds全量更新pushReq := &model.PushRequest{Full: true,ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),Reason: []model.TriggerReason{model.ConfigUpdate},}s.XDSServer.ConfigUpdate(pushReq)}schemas := collections.Pilot.All()if features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI().All()}for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.// 下面3种类型在serviceEntry controller中处理,这里不用为其注册事件处理函数if schema.GroupVersionKind() == gvk.ServiceEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadGroup {continue}// 注册其他所有api对象的事件处理函数s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)}...
}
完整的Config事件处理流程如下图所示:
- EventHandler构造任务(Task),任务实际上是对onEvent函数的封装
- EventHandler将任务推送到任务队列中
- 任务处理协程阻塞式地读取任务队列,执行任务,通过onEvent方法处理事件,并通过ConfigHandler触发xDS的更新
参考:
《Istio权威指南 下》
3.深入Istio源码:Pilot配置规则ConfigController
Istio Pilot代码深度解析
相关文章:
Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现
本文基于Istio 1.18.0版本进行源码学习 1、Pilot-Discovery工作原理 Pilot-Discovery是Istio控制面的核心,负责服务网格中的流量管理以及控制面和数据面之间的配置下发 Pilot-Discovery从注册中心(如Kubernetes)获取服务信息并汇集ÿ…...
数据结构:顺序表(C实现)
个人主页 水月梦镜花 个人专栏 C语言 ,数据结构 文章目录 一、顺序表二、实现思路1.存储结构2.初始化顺序表(SeqListInit)3.销毁顺序表(SeqListDestroty)4.打印顺序表(SeqListPrint)5.顺序表尾插(SeqListPushBack)and检查容量(SeqListCheckCapacity)6.顺序表头插(Se…...
素描基础知识
素描基础入门 1.基础线条 1.1 握笔姿势及长线条 2.排线 2.1 不同姿势画排线 2.1.1 姿势画排线 2.1.2 用手腕画排线 2.1.3 小拇指画排线 2.1.4 叠加排线 2.1.5交叉排线 2.2 纸张擦法 2.3 排线学习榜样 2.4 四种常见的排线 3、定向连线 4、一点透视 4.1 透视的规律 4.2 焦点透视…...
【Chat GPT】用 ChatGPT 运行 Python
前言 ChatGPT 是一个基于 GPT-2 模型的人工智能聊天机器人,它可以进行智能对话,同时还支持 Python 编程语言的运行,可以通过 API 接口进行调用。本文将介绍如何使用 ChatGPT 运行 Python 代码,并提供一个实际代码案例。 ChatGPT …...
cartographer发布畸变矫正后的scan数据
实现方式: 模仿源代码,在cartographer_ros写一个函数,以函数指针的方式传入cartographer后端,然后接收矫正后的scan数据,然后按照话题laserScan发布出来。 需要同时发布点云强度信息的,还要自己添加含有强度…...
Idea中git push to origin/master was rejected错误解决方案
Idea中git push to origin/master was rejected错误解决方案 问题描述解决方法 问题描述 idea开发中,需要将项目发布到gitee上,在gitee上创建仓库后,通过idea中git推送项目代码提示: push to origin/master was rejected 解决方法 gitee创建仓库时创建了README.md文件,本地…...
docker版jxTMS使用指南:自定义频率型动态管控
本文讲解4.4版jxTMS中如何自行定义一个频率型的动态管控,整个系列的文章请查看:docker版jxTMS使用指南:4.4版升级内容 docker版本的使用,请查看:docker版jxTMS使用指南 4.0版jxTMS的说明,请查看ÿ…...
【Docker】初识Docker以及Docker安装与阿里云镜像配置
目录 一、初识Docker 二、安装Docker 三、Docker架构 四、配置Docker镜像加速器 一、初识Docker Docker是一个开源的应用容器引擎,诞生于2013年,基于Go语言实现,dotCloud公司出品,Docker开源让开发者打包他们的应用以及依赖包到…...
C语言:动态内存管理
文章目录 一、动态内存函数1. malloc2. calloc3. realloc4. free 二、常见的错误1.malloc或calloc开辟的空间未检查2.越界访问3.对非malloc和calloc开辟的空间,用free释放4.对同一块动态内存多次释放5.用free释放动态内存的一部分 三、通讯录(动态版本改写)总结 一、…...
如何往MySQL中插入100万条数据?
需求 现在有一个 数据量 为100万的数据样本 100w_data.sql 其数据格式如下,截取最后十条数据 999991,XxGdnLZObA999991,XxGdnLZObA,XxGdnLZObA,2020-3-18,1 999992,TBBchSKobC999992,TBBchSKobC,TBBchSKobC,2020-9-8,2 999993,rfwgLkYhUz999993,rfwgLkYhUz,rfwgLk…...
IntelliJ IDEA 2023.2 最新变化
主要更新 AI Assistant 限定访问 Ultimate 在此版本中,我们为 IntelliJ IDEA 引入了一项重要补充 – AI Assistant。 AI Assistant 当前具备一组由 AI 提供支持的初始功能,提供集成式 AI 聊天,可以完成一些任务,例如自动编写文档…...
1300*B. T-primes
解析: 有且只有三个因数,当且仅当,完全平方数并且sqrt(n)为素数 #include<bits/stdc.h> using namespace std; typedef long long ll; const int N1e55; ll t,n; bool prime(ll x){if(x<2) return 0;for(int…...
重新C++系列之运算符重载
一、什么是运算符重载 简单来讲就是对运算符赋予新的意义,但是又不能改变原有的含义,它本身也就是一个函数。运算符重载的本质是以函数的方式来体现。 二、运算符重载有几种 1、按照作用域来划分,有全局操作符重载函数和成员函数操作符重载函…...
kotlin异常处理try-catch-finally
kotlin异常处理try-catch-finally fun main(args: Array<String>) {try {println("a")} catch (e: Exception) {//异常捕获println("a-catch: $e")} finally {//善后,无论是否异常,都会执行println("a-finally")}t…...
Pytorch在cuda、AMD DirectML和AMD CPU下性能比较
一、测试环境 CUDA环境: i7-8550u 16G DDR4 2133MHz nVidia MX150 2GB AMD DirectML环境: Ryzen 5 5600G 32G DDR4 3200MHz Vega7 4GB AMD 纯CPU环境:Ryzen 5 5600G 32G DDR4 3200MHz 其他硬件配置的硬盘、电源均一致。Pytorch版本为2.0.0,Pyt…...
哈工大计算机网络课程局域网详解之:交换机概念
哈工大计算机网络课程局域网详解之:交换机概念 文章目录 哈工大计算机网络课程局域网详解之:交换机概念以太网交换机(switch)交换机:多端口间同时传输交换机转发表:交换表交换机:自学习交换机互…...
Jenkins Pipeline的hasProperty函数
函数的作用 用于判断某个参数或者字段是否存在。 用法 例子一 def projectStr "P1,P2,P3" pipeline {agent anyparameters {extendedChoice(defaultValue: "${projectStr}",description: 选择要发布的项目,multiSelectDelimiter: ,,name: SELECT_PROJ…...
芯片制造详解.净洁室的秘密.学习笔记(三)
这是芯片制造系列的第三期跟学up主三圈,这里对其视频内容做了一下整理和归纳,喜欢的可以看原视频。 芯片制造详解03: 洁净室的秘密|为何芯片厂缺人? 芯片制造详解.净洁室的秘密.学习笔记 三 简介一、干净的级别二、芯片…...
可解释的 AI:在transformer中可视化注意力
Visualizing Attention in Transformers | Generative AI (medium.com) 一、说明 在本文中,我们将探讨可视化变压器架构核心区别特征的最流行的工具之一:注意力机制。继续阅读以了解有关BertViz的更多信息,以及如何将此注意力可视化工具整合到…...
k8s Webhook 使用java springboot实现webhook 学习总结
k8s Webhook 使用java springboot实现webhook 学习总结 大纲 基础概念准入控制器(Admission Controllers)ValidatingWebhookConfiguration 与 MutatingWebhookConfiguration准入检查(AdmissionReview)使用Springboot实现k8s-Web…...
CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
从面试角度回答Android中ContentProvider启动原理
Android中ContentProvider原理的面试角度解析,分为已启动和未启动两种场景: 一、ContentProvider已启动的情况 1. 核心流程 触发条件:当其他组件(如Activity、Service)通过ContentR…...
