workqueue源码分析
本文主要分析client-go中使用的workqueue,从而来分析k8s是如何基于任务队列做并发控制的。其中代码参考:
1. 概述
k8s的控制器大多是基于任务队列的方式进行并发控制,甚至包括基于controller-manager开发的自定义operator控制器。以下我们以deployment controller为例展示workqueue在k8s中的使用。
2. Deployment中的workqueue
2.1. 初始化workqueue
Deployment 的构建函数NewDeploymentController中初始化了任务队列和对于event事件处理相对应的workqueue的操作。
- 初始化workqueue的限速任务队列。
- 添加event handler使得deployment对象入队,等待处理。
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
dc := &DeploymentController{
// 初始化一个限速的任务队列
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
// 添加 add, update, delete的event handler, 其中handler的操作都添加对象到任务队列中。
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
// 定义队列处理函数
dc.enqueueDeployment = dc.enqueue
以下显示event handler具体逻辑,可见,无论是add,update,delete event handler都是调用enqueueDeployment的函数,而enqueueDeployment实际上就是初始化函数中的dc.enqueue。
func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
d := obj.(*apps.Deployment)
logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
oldD := old.(*apps.Deployment)
curD := cur.(*apps.Deployment)
logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
dc.enqueueDeployment(curD)
}
func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
d, ok := obj.(*apps.Deployment)
logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
2.2. 添加任务到队列
以下是dc.enqueue的具体实现,可见最终的调用workqueue的Add方法,添加对象到任务队列中。
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
// 添加到任务队列中。
dc.queue.Add(key)
}
其中key是namespace/name
的拼接字符串,通过MetaNamespaceKeyFunc函数获取对象的key。
// 通过该函数获取k8s对象中处理的key。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
objName, err := ObjectToName(obj)
if err != nil {
return "", err
}
return objName.String(), nil
}
// 拼接namespace和name
func (objName ObjectName) String() string {
if len(objName.Namespace) > 0 {
return objName.Namespace + "/" + objName.Name
}
return objName.Name
}
2.3. 读取任务队列
deployment controller会调用dc.queue.Get()
来读取任务队列中的对象。该goroutine是一个常驻的逻辑实时获取。
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
// 读取任务队列
key, quit := dc.queue.Get()
if quit {
return false
}
// 返回将key设置为done。
defer dc.queue.Done(key)
// 处理任务
err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)
return true
}
任务的错误处理:
- 如果错误为空,则调用Forget移出队列
- 如果小于最大重试次数,则加入延迟队列
- 如果大于最大重试次数,则移出队列。
func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
// 如果错误为空,则调用Forget移出队列
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
dc.queue.Forget(key)
return
}
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
// 如果小于最大重试次数,则加入延迟队列
if dc.queue.NumRequeues(key) < maxRetries {
dc.queue.AddRateLimited(key)
return
}
// 如果大于最大重试次数,则移出队列。
utilruntime.HandleError(err)
logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
dc.queue.Forget(key)
}
syncDeployment获取deployment对象,基于ns和name重新获取deployment。
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
// 拆分key获取ns和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 基于ns和name获取deployment对象
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
// deepcopy deployment对象
d := deployment.DeepCopy()
待完善
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.