controller-runtime源码分析

本文主要分析controller-runtime的源码,源码版本为v0.16.3

1. 概述

controller-runtime源码地址:https://github.com/kubernetes-sigs/controller-runtime。

controller-runtime项目是一个用于快速构建k8s operator的工具包。其中kubebuilder和operator-sdk项目都是通过controller-runtime项目来快速编写k8s operator的工具。

本文以kubebuilder的代码生成架构为例,分析controller-runtime的逻辑。kubebuilder框架生成的代码参考:https://github.com/huweihuang/venus

2. controller-runtime架构图

代码目录:

pkg
├── builder
├── cache
├── client  # client用于操作k8s的对象
├── cluster
├── config
├── controller  # controller逻辑
├── envtest
├── event
├── finalizer
├── handler
├── internal  # 核心代码 controller的具体实现
├── leaderelection
├── manager   # 核心代码
├── predicate
├── ratelimiter
├── reconcile
├── recorder
├── scheme
├── source
└── webhook

3. Operator框架逻辑

代码参考:https://github.com/huweihuang/venus/blob/main/cmd/app/operator.go#L71

operator代码框架的主体逻辑包括以下几个部分。

  • manager:主要用来管理多个的controller,构建,注册,运行controller。

  • controller:主要用来封装reconciler的控制器。

  • reconciler:具体执行业务逻辑的函数。

manger的框架主要包含以下几个部分。

  • mgr:=ctrl.NewManager:构建一个manager对象。

  • Reconciler.SetupWithManager(mgr):注册controller到manager对象。

  • mgr.Start(ctrl.SetupSignalHandler()):运行manager从而运行controller的逻辑。

代码如下:

  // 构建manager对象,主要的初始化参数包括
	// - kubeconfig
	// - controller的option参数
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        Metrics:                metricsserver.Options{BindAddress: opt.MetricsAddr},
        HealthProbeBindAddress: opt.ProbeAddr,
        LeaderElection:         opt.EnableLeaderElection,
        LeaderElectionID:       "52609143.huweihuang.com",
        Controller: config.Controller{
            MaxConcurrentReconciles: opt.MaxConcurrentReconciles,
        },
    })

	// 将controller注册到manager中,并初始化controller对象。
    if err = (&venuscontroller.RedisReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "Redis")
        return err
    }

	// 运行manager对象,从而运行controller中的reconcile逻辑。
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        return err
    }

4. NewManager

NewManager初始化一个manager 用来管理和创建controller对象。一个manager可以关联多个controller对象。manager是一个接口,而最终是实现结构体是controllerManager的对象。

4.1. Manager接口

type Manager interface {
	// Cluster holds a variety of methods to interact with a cluster.
	cluster.Cluster

	// Add will set requested dependencies on the component, and cause the component to be
	// started when Start is called.
	// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
	// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
  
  // 通过Runnable接口将具体的controller注册到manager中。
	Add(Runnable) error

	// Start starts all registered Controllers and blocks until the context is cancelled.
	// Returns an error if there is an error starting any controller.
	//
	// If LeaderElection is used, the binary must be exited immediately after this returns,
	// otherwise components that need leader election might continue to run after the leader
	// lock was lost.
  
  // 运行具体的逻辑
	Start(ctx context.Context) error
}

4.2. NewControllerManager

New构建一个具体的controllerManager的对象。

func New(config *rest.Config, options Options) (Manager, error) {
    // Set default values for options fields
    options = setOptionsDefaults(options)
    ... 

    errChan := make(chan error)
    runnables := newRunnables(options.BaseContext, errChan)
    return &controllerManager{
        stopProcedureEngaged:          pointer.Int64(0),
        cluster:                       cluster,
        runnables:                     runnables,
        errChan:                       errChan,
        recorderProvider:              recorderProvider,
        resourceLock:                  resourceLock,
        metricsServer:                 metricsServer,
        controllerConfig:              options.Controller,
        logger:                        options.Logger,
        elected:                       make(chan struct{}),
        webhookServer:                 options.WebhookServer,
        leaderElectionID:              options.LeaderElectionID,
        leaseDuration:                 *options.LeaseDuration,
        renewDeadline:                 *options.RenewDeadline,
        retryPeriod:                   *options.RetryPeriod,
        healthProbeListener:           healthProbeListener,
        readinessEndpointName:         options.ReadinessEndpointName,
        livenessEndpointName:          options.LivenessEndpointName,
        pprofListener:                 pprofListener,
        gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
        internalProceduresStop:        make(chan struct{}),
        leaderElectionStopped:         make(chan struct{}),
        leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
    }, nil
}

5. SetupWithManager

SetupWithManager将具体的controller注册到manager中。其中通过Complete完成controller的初始化。

// SetupWithManager sets up the controller with the Manager.
func (r *RedisReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&venusv1.Redis{}).
        Complete(r)
}

SetupWithManager通过NewControllerManagedBy方法构建了一个Builder的对象。

// Builder builds a Controller.
type Builder struct {
    forInput         ForInput
    ownsInput        []OwnsInput
    watchesInput     []WatchesInput
    mgr              manager.Manager
    globalPredicates []predicate.Predicate
    ctrl             controller.Controller
    ctrlOptions      controller.Options
    name             string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func ControllerManagedBy(m manager.Manager) *Builder {
    return &Builder{mgr: m}
}

通过builder对象完成controller的初始化。

5.1. controller初始化

// Build builds the Application Controller and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {

    // Set the ControllerManagedBy
    if err := blder.doController(r); err != nil {  // 初始化controller
        return nil, err
    }

    // Set the Watch
    if err := blder.doWatch(); err != nil {   // 添加event handler
        return nil, err
    }

    return blder.ctrl, nil
}

doController最终通过调用NewUnmanaged构建一个controller对象。并传入自定义的reconciler对象。

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
    c, err := NewUnmanaged(name, mgr, options)
    ...
    // Add the controller as a Manager components
    return c, mgr.Add(c)
}

// NewUnmanaged returns a new controller without adding it to the manager. The
// caller is responsible for starting the returned controller.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
    if options.Reconciler == nil {
        return nil, fmt.Errorf("must specify Reconciler")
    }
    ...
    // Create controller with dependencies set
    return &controller.Controller{
        Do: options.Reconciler,   // 将具体的reconciler函数传递到controller的reconciler。
        MakeQueue: func() workqueue.RateLimitingInterface {
            return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
                Name: name,
            })
        }, // 初始化任务队列
        MaxConcurrentReconciles: options.MaxConcurrentReconciles, // 设置controller的并发数
        CacheSyncTimeout:        options.CacheSyncTimeout,
        Name:                    name,
        LogConstructor:          options.LogConstructor,
        RecoverPanic:            options.RecoverPanic,
        LeaderElected:           options.NeedLeaderElection,
    }, nil

5.2. 添加event handler

doWatch最终会运行informer.start添加event handler。

// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    // Controller hasn't started yet, store the watches locally and return.
    //
    // These watches are going to be held on the controller struct until the manager or user calls Start(...).
    if !c.Started {
        c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
        return nil
    }

    c.LogConstructor(nil).Info("Starting EventSource", "source", src)
    return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

add event handler

func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
    prct ...predicate.Predicate) error {
    // Informer should have been specified by the user.
    if is.Informer == nil {
        return fmt.Errorf("must specify Informer.Informer")
    }

    _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
    if err != nil {
        return err
    }
    return nil
}

6. mgr.Start

controllerManager运行之前注册的runnables的函数,其中包括controller的函数。

func (cm *controllerManager) Start(ctx context.Context) (err error) {
	// Start and wait for caches.
	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
	}

	// Start the non-leaderelection Runnables after the cache has synced.
	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
	}

	// Start the leader election and all required runnables.
	{
		ctx, cancel := context.WithCancel(context.Background())
		cm.leaderElectionCancel = cancel
		go func() {
			if cm.resourceLock != nil {
				if err := cm.startLeaderElection(ctx); err != nil {
					cm.errChan <- err
				}
			} else {
				// Treat not having leader election enabled the same as being elected.
				if err := cm.startLeaderElectionRunnables(); err != nil {
					cm.errChan <- err
				}
				close(cm.elected)
			}
		}()
	}
	...
}

6.1. controller.start

start主要包含2个部分

  • 同步cache:WaitForSync
  • 启动指定并发数的worker:processNextWorkItem

该部分的代码逻辑跟k8s controller-manager中的具体的controller的逻辑类似。

func (c *Controller) Start(ctx context.Context) error {
  ...
	err := func() error {
		...
		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
      // 同步list-watch中的cache数据。
			if err := func() error {
				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
          ...
				}
			}
		}

    // 运行指定并发数的processNextWorkItem任务。
		// Launch workers to process resources
		c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		wg.Add(c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go func() {
				defer wg.Done()
				// Run a worker thread that just dequeues items, processes them, and marks them done.
				// It enforces that the reconcileHandler is never invoked concurrently with the same object.
				for c.processNextWorkItem(ctx) {
				}
			}()
		}
	...
}

6.2. processNextWorkItem

经典的processNextWorkItem函数,最终调用reconcileHandler来处理具体的逻辑。

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	obj, shutdown := c.Queue.Get()
	if shutdown {
		// Stop working
		return false
	}

	defer c.Queue.Done(obj)

	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

	c.reconcileHandler(ctx, obj)
	return true
}

7. reconcileHandler

reconcileHandler部分的代码是整个reconciler逻辑中的核心,自定义的reconciler函数最终是调用了reconcileHandler来实现,并且该函数描述了具体的任务队列处理的几种类型。

  • err != nil:如果错误不为空,则重新入队,等待处理。
  • result.RequeueAfter > 0:如果指定RequeueAfter > 0,则做延迟入队处理。
  • result.Requeue:如何指定了requeue则表示马上重新入队处理。
  • err == nil : 如果错误为空,表示reconcile成功,则移除队列的任务。
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
	// 获取k8s crd的具体对象
	req, ok := obj.(reconcile.Request)
	if !ok {
		// As the item in the workqueue is actually invalid, we call
		// Forget here else we'd go into a loop of attempting to
		// process a work item that is invalid.
		c.Queue.Forget(obj)
		c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
		// Return true, don't take a break
		return
	}

	// 调用用户定义的Reconcile函数,并对返回结果进行处理。
	result, err := c.Reconcile(ctx, req)
	switch {
	case err != nil:
    // 如果错误不为空,则重新入队,等待处理
		if errors.Is(err, reconcile.TerminalError(nil)) {
			ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
		} else {
			c.Queue.AddRateLimited(req)
		}
	case result.RequeueAfter > 0:
		// 如果指定了延迟入队,则做延迟入队处理
		c.Queue.Forget(obj)
		c.Queue.AddAfter(req, result.RequeueAfter)
	case result.Requeue:
		// 如果指定了马上入队,则做相应处理
		c.Queue.AddRateLimited(req)
	default:
    // 如果错误为空,表示reconcile成功,则移除队列的任务
		log.V(5).Info("Reconcile successful")
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.Queue.Forget(obj)
	}
}

8. 总结

  1. controller-runtime封装了k8s-controller-manager控制器的主要逻辑,其中就包括创建list-watch对象,waitForSync等,创建任务队列,将任务处理的goroutine抽象成一个reconcile函数,使用户更方便的编写operator工具。
  2. kubebuilder是一个基于controller-runtime框架的命令生成工具。可以用于快速生成和部署crd对象,快速生成controller-runtime框架的基本代码。
  3. controller-runtime框架的最核心需要处理的代理及处理reconcile函数,该函数定义了4种错误处理及入队重试的类型。可以根据具体的业务需求选择合适的方法来处理。

参考: