workqueue源码分析

本文主要分析client-go中使用的workqueue,从而来分析k8s是如何基于任务队列做并发控制的。其中代码参考:

1. 概述

k8s的控制器大多是基于任务队列的方式进行并发控制,甚至包括基于controller-manager开发的自定义operator控制器。以下我们以deployment controller为例展示workqueue在k8s中的使用。

2. Deployment中的workqueue

2.1. 初始化workqueue

Deployment 的构建函数NewDeploymentController中初始化了任务队列和对于event事件处理相对应的workqueue的操作。

  1. 初始化workqueue的限速任务队列。
  2. 添加event handler使得deployment对象入队,等待处理。
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	dc := &DeploymentController{
    // 初始化一个限速的任务队列
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
		// 添加 add, update, delete的event handler, 其中handler的操作都添加对象到任务队列中。
  	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			dc.addDeployment(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			dc.updateDeployment(logger, oldObj, newObj)
		},
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: func(obj interface{}) {
			dc.deleteDeployment(logger, obj)
		},
	})
  // 定义队列处理函数
  dc.enqueueDeployment = dc.enqueue

以下显示event handler具体逻辑,可见,无论是add,update,delete event handler都是调用enqueueDeployment的函数,而enqueueDeployment实际上就是初始化函数中的dc.enqueue。

func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
	d := obj.(*apps.Deployment)
	logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
	dc.enqueueDeployment(d)
}

func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
	oldD := old.(*apps.Deployment)
	curD := cur.(*apps.Deployment)
	logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
	dc.enqueueDeployment(curD)
}

func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
	d, ok := obj.(*apps.Deployment)
	logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
	dc.enqueueDeployment(d)
}

2.2. 添加任务到队列

以下是dc.enqueue的具体实现,可见最终的调用workqueue的Add方法,添加对象到任务队列中。

func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
   key, err := controller.KeyFunc(deployment)
		// 添加到任务队列中。
   dc.queue.Add(key)
}

其中key是namespace/name的拼接字符串,通过MetaNamespaceKeyFunc函数获取对象的key。

// 通过该函数获取k8s对象中处理的key。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	objName, err := ObjectToName(obj)
	if err != nil {
		return "", err
	}
	return objName.String(), nil
}
// 拼接namespace和name
func (objName ObjectName) String() string {
	if len(objName.Namespace) > 0 {
		return objName.Namespace + "/" + objName.Name
	}
	return objName.Name
}

2.3. 读取任务队列

deployment controller会调用dc.queue.Get()来读取任务队列中的对象。该goroutine是一个常驻的逻辑实时获取。

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
  // 读取任务队列
	key, quit := dc.queue.Get()
	if quit {
		return false
	}
  // 返回将key设置为done。
	defer dc.queue.Done(key)
	// 处理任务
	err := dc.syncHandler(ctx, key.(string))
	dc.handleErr(ctx, err, key)

	return true
}

任务的错误处理:

  1. 如果错误为空,则调用Forget移出队列
  2. 如果小于最大重试次数,则加入延迟队列
  3. 如果大于最大重试次数,则移出队列。
func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
	// 如果错误为空,则调用Forget移出队列
	if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
		dc.queue.Forget(key)
		return
	}
	ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
	// 如果小于最大重试次数,则加入延迟队列
	if dc.queue.NumRequeues(key) < maxRetries {
		dc.queue.AddRateLimited(key)
		return
	}
	// 如果大于最大重试次数,则移出队列。
	utilruntime.HandleError(err)
	logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
	dc.queue.Forget(key)
}

syncDeployment获取deployment对象,基于ns和name重新获取deployment。

func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
  // 拆分key获取ns和name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	// 基于ns和name获取deployment对象
	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	if errors.IsNotFound(err) {
		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
		return nil
	}

	// deepcopy deployment对象
	d := deployment.DeepCopy()

待完善


最后修改 June 11, 2025: update k8s (3e78c6a)