Run
Set goals
Understand the running mechanism of kube-controller-manager
Finding the run function from the main function, the code is long, so I have simplified it here
func Run(c *, stopCh <-chan struct{}) error { // configz module, already understood in kube-scheduler analysis if cfgz, err := (ConfigzName); err == nil { () } else { ("unable to register configz: %v", err) } // Health monitoring and http service, skip var checks [] var unsecuredMux * run := func(ctx ) { rootClientBuilder := { ClientConfig: , } // client authentication related var clientBuilder // Create a context of the controller controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ()) if err != nil { ("error building controller context: %v", err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(), unsecuredMux); err != nil { ("error starting controllers: %v", err) } // The InformerFactory here is basically the same as the SharedInformerFactory we watched in kube-scheduler () () close() select {} } // Whether to conduct elections if ! { run(()) panic("unreachable") } // Splice out a globally unique id id, err := () if err != nil { return err } id = id + "_" + string(()) rl, err := (, , , .CoreV1(), .CoordinationV1(), { Identity: id, EventRecorder: , }) if err != nil { ("error creating lock: %v", err) } // Under normal circumstances, it is blocked in the RunOrDie function, and the election-related work is constantly carried out ((), { Lock: rl, LeaseDuration: , RenewDeadline: , RetryPeriod: , Callbacks: { // When you start to become a leader, call the run function OnStartedLeading: run, OnStoppedLeading: func() { ("leaderelection lost") }, }, WatchDog: electionChecker, Name: "kube-controller-manager", }) panic("unreachable") }
StartControllers
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *) error { // A critical loop starts each controller, the key is the controller name, and the value is the initialization function for controllerName, initFn := range controllers { // Whether to enable startup if !(controllerName) { ("%q is disabled", controllerName) continue } ((, ControllerStartJitter)) (1).Infof("Starting %q", controllerName) // Call the init function to start debugHandler, started, err := initFn(ctx) if err != nil { ("Error starting %q", controllerName) return err } if !started { ("Skipping %q", controllerName) continue } // Register the corresponding controller into the debug url if debugHandler != nil && unsecuredMux != nil { basePath := "/debug/controllers/" + controllerName (basePath, (basePath, debugHandler)) (basePath+"/", (basePath, debugHandler)) } ("Started %q", controllerName) } return nil } // Let's go to the function of passing in the controller to see what are the corresponding controllers? Here are many common concepts, and we will not explain them in detail.func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["endpointslice"] = startEndpointSliceController controllers["endpointslicemirroring"] = startEndpointSliceMirroringController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["csrcleaner"] = startCSRCleanerController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["nodeipam"] = startNodeIpamController controllers["nodelifecycle"] = startNodeLifecycleController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController controllers["route"] = startRouteController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController } controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher controllers["ephemeral-volume"] = startEphemeralVolumeController return controllers }
ReplicaSet
Since our example is to create a nginx pod, there is very little content involved in kube-controller-manager.
However, in order to deepen everyone's understanding of kube-controller-manager, we have introduced a new concept - ReplicaSet, the following is the official description:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
The purpose of ReplicaSet is to maintain a stable collection of Pod replicas that are running at any time. Therefore, it is often used to ensure the availability of a given number of exactly the same Pods.
Simply put, ReplicaSet is used to generate a specified number of pods.
The code is in pkg/controller/replica_set.go
ReplicaSetController
func startReplicaSetController(ctx ControllerContext) (, bool, error) { if ![{Group: "apps", Version: "v1", Resource: "replicasets"}] { return nil, false, nil } // Run asynchronously with goroutine, including two Informers of ReplicaSet and Pod // This is easy to understand: we want to control the number of ReplicaSet declarations and the number of running Pods, and we need to observe the two resources at the same time. go ( ().V1().ReplicaSets(), ().V1().Pods(), ("replicaset-controller"), , ).Run(int(), ) return nil, true, nil } // Run the functionfunc (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer () defer () controllerName := () ("Starting %v controller", controllerName) defer ("Shutting down %v controller", controllerName) if !(, stopCh, , ) { return } for i := 0; i < workers; i++ { // Working function go (, , stopCh) } <-stopCh } func (rsc *ReplicaSetController) worker() { // Continue to find the implementation for () { } } func (rsc *ReplicaSetController) processNextWorkItem() bool { // There is also a concept of queue here, which can be compared to the implementation in kube-scheduler // The difference is that the queue here is , that is, the rate limiting, so I won't look at the specific implementation today // Get elements key, quit := () if quit { return false } defer (key) // Process the corresponding elements err := (key.(string)) if err == nil { (key) return true } (("sync %q failed with %v", key, err)) (key) return true } // Go back and check the specific implementation of syncHandlerfunc NewBaseController(rsInformer , podInformer , kubeClient , burstReplicas int, gvk , metricOwnerName, queueName string, podControl ) *ReplicaSetController { = return rsc }
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error { startTime := () defer func() { (4).Infof("Finished syncing %v %q (%v)", , key, (startTime)) }() // Split namespace and name from key namespace, name, err := (key) if err != nil { return err } // Obtain the corresponding ReplicaSets information from Lister according to name rs, err := (namespace).Get(name) if (err) { (4).Infof("%v %v has been deleted", , key) (key) return nil } if err != nil { return err } rsNeedsSync := (key) // Get the selector (k8s is used to match ReplicaSets and Pods based on the label in the selector) selector, err := () if err != nil { (("error converting pod selector to selector: %v", err)) return nil } // Get all pods according to namespace and labels allPods, err := ().List(()) if err != nil { return err } // Filter invalid pods filteredPods := (allPods) // Filter pods according to selector filteredPods, err = (rs, selector, filteredPods) if err != nil { return err } var manageReplicasErr error if rsNeedsSync && == nil { // Management ReplicaSet, the following detailed analysis manageReplicasErr = (filteredPods, rs) } rs = () newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // Update status updatedRS, err := updateReplicaSetStatus(.AppsV1().ReplicaSets(), rs, newStatus) if err != nil { return err } if manageReplicasErr == nil && > 0 && == *() && != *() { (key, ()*) } return manageReplicasErr } // Let's take a look at how the number of pods works when the number of pods is different from the declaration in the ReplicaSet.func (rsc *ReplicaSetController) manageReplicas(filteredPods []*, rs *) error { // diff = current pod number - expected pod number diff := len(filteredPods) - int(*()) rsKey, err := (rs) if err != nil { (("couldn't get key for %v %#v: %v", , rs, err)) return nil } // diff is less than 0, indicating that capacity expansion is needed, that is, new Pod is added if diff < 0 { // I won't look at the specific implementation for the time being // diff greater than 0, that is, shrinkage is required } else if diff > 0 { } return nil }
Standing on the shoulders of your predecessors, pay tribute to your predecessors, Respect!
Summary
- The core idea of kube-controller-manager is to manage resources in Kubernetes based on the expected state and the current state. Taking ReplicaSet as an example, it compares the number of Pods declared with the number of Pods in the current cluster that meet the conditions and performs corresponding scaling.
The above is the detailed content of the source code analysis of the Kubernetes controller manager running mechanism. For more information about Kubernetes controller manager, please follow my other related articles!