kube-controller-manager源码分析(四)之 ReplicaSetController

本文主要分析replicaset-controller的源码逻辑,replicas对象创建主要是由deployment-controller中封装。而replicas是pod的维护控制器。可以把replicas理解为deployment中的版本控制器,该控制器封装每次版本的pod对象。

1. startReplicaSetController

startReplicaSetController函数是ReplicaSetController的入口函数。基本的操作即new controller对象,然后起一个goroutine运行run函数。

func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	go replicaset.NewReplicaSetController(
		klog.FromContext(ctx),
		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
	return nil, true, nil
}

2. NewReplicaSetController

NewReplicaSetController初始化controller对象,最终通过NewBaseController实现具体的初始化操作。

// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
	eventBroadcaster := record.NewBroadcaster()
	if err := metrics.Register(legacyregistry.Register); err != nil {
		logger.Error(err, "unable to register metrics")
	}
	return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
		apps.SchemeGroupVersion.WithKind("ReplicaSet"),
		"replicaset_controller",
		"replicaset",
		controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
		},
		eventBroadcaster,
	)
}

2.1. NewBaseController

NewBaseController是一个常见的k8s controller构建函数,主要包括以下几个部分:

  • 初始化常用client,包括kube client
  • 添加event handler对象。
  • 添加informer索引。
  • 添加informer syncCache的函数,在处理controller逻辑前先同步一下etcd的数据到本地cache。
  • 赋值syncHandler函数,是具体实现controller逻辑的函数。
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {

  // 初始化常用配置
	rsc := &ReplicaSetController{
		GroupVersionKind: gvk,
		kubeClient:       kubeClient,
		podControl:       podControl,
		eventBroadcaster: eventBroadcaster,
		burstReplicas:    burstReplicas,
		expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
	}

  // 添加event handler对象
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addRS(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updateRS(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deleteRS(logger, obj)
		},
	})
  
  // 添加informer索引
	rsInformer.Informer().AddIndexers(cache.Indexers{
		controllerUIDIndex: func(obj interface{}) ([]string, error) {
			rs, ok := obj.(*apps.ReplicaSet)
			if !ok {
				return []string{}, nil
			}
			controllerRef := metav1.GetControllerOf(rs)
			if controllerRef == nil {
				return []string{}, nil
			}
			return []string{string(controllerRef.UID)}, nil
		},
	})
	rsc.rsIndexer = rsInformer.Informer().GetIndexer()
	rsc.rsLister = rsInformer.Lister()
  
  // 初始化informer的sync函数
	rsc.rsListerSynced = rsInformer.Informer().HasSynced

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addPod(logger, obj)
		},
		// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
		// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
		// local storage, so it should be ok.
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updatePod(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deletePod(logger, obj)
		},
	})
	rsc.podLister = podInformer.Lister()
	rsc.podListerSynced = podInformer.Informer().HasSynced

  // 初始化syncHandler函数,该函数为具体实现controller业务逻辑的函数。
	rsc.syncHandler = rsc.syncReplicaSet

	return rsc
}

3. Run

Run函数仍然是k8s controller的代码风格。主要包含了以下几个部分。

  • 同步本地cache内容
  • 运行多个不退出的goroutine处理控制器逻辑。
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
	// 已删除非核心逻辑代码。
	defer rsc.queue.ShutDown()

  // 处理前同步下cache的内容。
	if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
		return
	}

  // 运行指定个数的goroutine来处理controller逻辑。
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
	}

	<-ctx.Done()
}

3.1. processNextWorkItem

processNextWorkItemz主要运行syncHandler函数,和对返回的错误进行处理。

  • 如果错误为空,则不再入队。
  • 如果错误不为空,则入队重新处理。
func (rsc *ReplicaSetController) worker(ctx context.Context) {
	for rsc.processNextWorkItem(ctx) {
	}
}

func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
	key, quit := rsc.queue.Get()
	if quit {
		return false
	}
	defer rsc.queue.Done(key)

  // 具体的逻辑代码由syncHandler实现。
	err := rsc.syncHandler(ctx, key.(string))
	if err == nil {
    // 如果错误为空,则不再入队
		rsc.queue.Forget(key)
		return true
	}

	utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
  // 如果错误不为空,则重新入队
	rsc.queue.AddRateLimited(key)

	return true
}

4. syncReplicaSet

syncReplicaSet是syncHandler的具体实现,常见的syncHandler的实现包含以下几个部分

  • 获取集群中的controller对象,例如 rs。
  • 获取该controller对象及其子对象的当前状态。
  • 对比当前状态与预期状态是否一致。
  • 更新当前状态,以上循环直到当前状态达到期望状态。
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
	// 已删除非核心代码
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
  // 获取集群中的rs对象
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	if apierrors.IsNotFound(err) {
		logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
		rsc.expectations.DeleteExpectations(logger, key)
		return nil
	}

	rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
  // 获取指定selector下pod
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	filteredPods := controller.FilterActivePods(logger, allPods)
	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)

	// 处理replica逻辑
	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
	}
	rs = rs.DeepCopy()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

	// 更新状态
	updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)

	// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}

5. manageReplicas

manageReplicas主要实现pod的创建和删除,从而保证当前rs下的pod跟预期的一致。

func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
  // 计算当前的pod数量和预期的pod是否一致。
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	// 如果少于预期
	if diff < 0 {
		// 则批量创建pod
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
		})
	// 如果pod数量多于预期
	} else if diff > 0 {

		relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
		utilruntime.HandleError(err)

		// Choose which Pods to delete, preferring those in earlier phases of startup.
		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)


		errCh := make(chan error, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				defer wg.Done()
        // 批量删除pod
				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
				}
			}(pod)
		}
		wg.Wait()

    // 处理错误
		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}
	}

	return nil
}

总结

replicaset-controller的代码逻辑相对简单,基本的代码风格是k8s控制器通用的代码逻辑,由于k8s的代码风格高度一致,因此如果读清楚一类controller的控制逻辑。其他的控制器的代码逻辑大同小异。

参考: