SoFunction
Updated on 2025-03-10

Source code analysis of Kubernetes controller manager running mechanism

Run

Set goals

Understand the running mechanism of kube-controller-manager

Finding the run function from the main function, the code is long, so I have simplified it here

func Run(c *, stopCh <-chan struct{}) error {
	// configz module, already understood in kube-scheduler analysis	if cfgz, err := (ConfigzName); err == nil {
		()
	} else {
		("unable to register configz: %v", err)
	}
	// Health monitoring and http service, skip	var checks []
	var unsecuredMux *
	run := func(ctx ) {
		rootClientBuilder := {
			ClientConfig: ,
		}
    // client authentication related		var clientBuilder 
    // Create a context of the controller		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ())
		if err != nil {
			("error building controller context: %v", err)
		}
		saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(), unsecuredMux); err != nil {
			("error starting controllers: %v", err)
		}
    // The InformerFactory here is basically the same as the SharedInformerFactory we watched in kube-scheduler		()
		()
		close()
		select {}
	}
  // Whether to conduct elections	if ! {
		run(())
		panic("unreachable")
	}
  // Splice out a globally unique id	id, err := ()
	if err != nil {
		return err
	}
	id = id + "_" + string(())
	rl, err := (,
		,
		,
		.CoreV1(),
		.CoordinationV1(),
		{
			Identity:      id,
			EventRecorder: ,
		})
	if err != nil {
		("error creating lock: %v", err)
	}
  // Under normal circumstances, it is blocked in the RunOrDie function, and the election-related work is constantly carried out	((), {
		Lock:          rl,
		LeaseDuration: ,
		RenewDeadline: ,
		RetryPeriod:   ,
		Callbacks: {
      // When you start to become a leader, call the run function			OnStartedLeading: run,
			OnStoppedLeading: func() {
				("leaderelection lost")
			},
		},
		WatchDog: electionChecker,
		Name:     "kube-controller-manager",
	})
	panic("unreachable")
}

StartControllers

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *) error {
	// A critical loop starts each controller, the key is the controller name, and the value is the initialization function	for controllerName, initFn := range controllers {
    // Whether to enable startup		if !(controllerName) {
			("%q is disabled", controllerName)
			continue
		}
		((, ControllerStartJitter))
		(1).Infof("Starting %q", controllerName)
    // Call the init function to start		debugHandler, started, err := initFn(ctx)
		if err != nil {
			("Error starting %q", controllerName)
			return err
		}
		if !started {
			("Skipping %q", controllerName)
			continue
		}
    // Register the corresponding controller into the debug url		if debugHandler != nil && unsecuredMux != nil {
			basePath := "/debug/controllers/" + controllerName
			(basePath, (basePath, debugHandler))
			(basePath+"/", (basePath, debugHandler))
		}
		("Started %q", controllerName)
	}
	return nil
}
// Let's go to the function of passing in the controller to see what are the corresponding controllers? Here are many common concepts, and we will not explain them in detail.func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["endpointslice"] = startEndpointSliceController
	controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["csrsigning"] = startCSRSigningController
	controllers["csrapproving"] = startCSRApprovingController
	controllers["csrcleaner"] = startCSRCleanerController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	controllers["nodeipam"] = startNodeIpamController
	controllers["nodelifecycle"] = startNodeLifecycleController
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher
	controllers["ephemeral-volume"] = startEphemeralVolumeController
	return controllers
}

ReplicaSet

Since our example is to create a nginx pod, there is very little content involved in kube-controller-manager.

However, in order to deepen everyone's understanding of kube-controller-manager, we have introduced a new concept - ReplicaSet, the following is the official description:

A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

The purpose of ReplicaSet is to maintain a stable collection of Pod replicas that are running at any time. Therefore, it is often used to ensure the availability of a given number of exactly the same Pods.

Simply put, ReplicaSet is used to generate a specified number of pods.

The code is in pkg/controller/replica_set.go

ReplicaSetController

func startReplicaSetController(ctx ControllerContext) (, bool, error) {
	if ![{Group: "apps", Version: "v1", Resource: "replicasets"}] {
		return nil, false, nil
	}
  // Run asynchronously with goroutine, including two Informers of ReplicaSet and Pod  // This is easy to understand: we want to control the number of ReplicaSet declarations and the number of running Pods, and we need to observe the two resources at the same time.	go (
		().V1().ReplicaSets(),
		().V1().Pods(),
		("replicaset-controller"),
		,
	).Run(int(), )
	return nil, true, nil
}
// Run the functionfunc (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
	defer ()
	defer ()
	controllerName := ()
	("Starting %v controller", controllerName)
	defer ("Shutting down %v controller", controllerName)
	if !(, stopCh, , ) {
		return
	}
	for i := 0; i < workers; i++ {
    // Working function		go (, , stopCh)
	}
	<-stopCh
}
func (rsc *ReplicaSetController) worker() {
  // Continue to find the implementation	for () {
	}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
  // There is also a concept of queue here, which can be compared to the implementation in kube-scheduler  // The difference is that the queue here is , that is, the rate limiting, so I won't look at the specific implementation today  // Get elements	key, quit := ()
	if quit {
		return false
	}
	defer (key)
  // Process the corresponding elements	err := (key.(string))
	if err == nil {
		(key)
		return true
	}
	(("sync %q failed with %v", key, err))
	(key)
	return true
}
// Go back and check the specific implementation of syncHandlerfunc NewBaseController(rsInformer , podInformer , kubeClient , burstReplicas int,
	gvk , metricOwnerName, queueName string, podControl ) *ReplicaSetController {
	 = 
	return rsc
}

syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	startTime := ()
	defer func() {
		(4).Infof("Finished syncing %v %q (%v)", , key, (startTime))
	}()
	// Split namespace and name from key	namespace, name, err := (key)
	if err != nil {
		return err
	}
  // Obtain the corresponding ReplicaSets information from Lister according to name	rs, err := (namespace).Get(name)
	if (err) {
		(4).Infof("%v %v has been deleted", , key)
		(key)
		return nil
	}
	if err != nil {
		return err
	}
	rsNeedsSync := (key)
  // Get the selector (k8s is used to match ReplicaSets and Pods based on the label in the selector)	selector, err := ()
	if err != nil {
		(("error converting pod selector to selector: %v", err))
		return nil
	}
	// Get all pods according to namespace and labels	allPods, err := ().List(())
	if err != nil {
		return err
	}
        // Filter invalid pods	filteredPods := (allPods)
	// Filter pods according to selector	filteredPods, err = (rs, selector, filteredPods)
	if err != nil {
		return err
	}
	var manageReplicasErr error
	if rsNeedsSync &&  == nil {
    // Management ReplicaSet, the following detailed analysis		manageReplicasErr = (filteredPods, rs)
	}
	rs = ()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
	// Update status	updatedRS, err := updateReplicaSetStatus(.AppsV1().ReplicaSets(), rs, newStatus)
	if err != nil {
		return err
	}
	if manageReplicasErr == nil &&  > 0 &&
		 == *() &&
		 != *() {
		(key, ()*)
	}
	return manageReplicasErr
}
// Let's take a look at how the number of pods works when the number of pods is different from the declaration in the ReplicaSet.func (rsc *ReplicaSetController) manageReplicas(filteredPods []*, rs *) error {
	// diff = current pod number - expected pod number  diff := len(filteredPods) - int(*())
	rsKey, err := (rs)
	if err != nil {
		(("couldn't get key for %v %#v: %v", , rs, err))
		return nil
	}
  // diff is less than 0, indicating that capacity expansion is needed, that is, new Pod is added	if diff < 0 {
    // I won't look at the specific implementation for the time being  // diff greater than 0, that is, shrinkage is required	} else if diff > 0 {
	}
	return nil
}

Standing on the shoulders of your predecessors, pay tribute to your predecessors, Respect!

Summary

  • The core idea of ​​kube-controller-manager is to manage resources in Kubernetes based on the expected state and the current state. Taking ReplicaSet as an example, it compares the number of Pods declared with the number of Pods in the current cluster that meet the conditions and performs corresponding scaling.

The above is the detailed content of the source code analysis of the Kubernetes controller manager running mechanism. For more information about Kubernetes controller manager, please follow my other related articles!