kubernetes: kube-kb88凯时官网登录

来自:
时间:2024-03-11
阅读:

kubernetes 架构中, 是一个永不休止的控制回路组件,其负责控制集群资源的状态。通过监控 kube-apiserver 的资源状态,比较当前资源状态和期望状态,如果不一致,更新 kube-apiserver 的资源状态以保持当前资源状态和期望状态一致。

kubernetes: kube-controller-manager 源码分析

下面从源码角度分析 kube-controller-manager 的工作方式。

kube-controller-manager 使用 作为应用命令行框架,和 kube-schedulerkube-apiserver 初始化过程类似,其流程如下:

kubernetes: kube-controller-manager 源码分析

这里,简要给出初始化代码示例:

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func newcontrollermanagercommand() *cobra.command {
    // 创建选项
    s, err := options.newkubecontrollermanageroptions()
    ...
    cmd := &cobra.command{
        ...
        rune: func(cmd *cobra.command, args []string) error {
            ...
            // 根据选项,创建配置
            c, err := s.config(knowncontrollers(), controllersdisabledbydefault(), controlleraliases())
			if err != nil {
				return err
			}
            ...
            return run(context.background(), c.complete())
        },
        ...
    }
    ...
}

进入 run 函数,看 kube-controller-manager 是怎么运行的。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func run(ctx context.context, c *config.completedconfig) error {
    ...
    run := func(ctx context.context, controllerdescriptors map[string]*controllerdescriptor) {
        // 创建上下文
		controllercontext, err := createcontrollercontext(logger, c, rootclientbuilder, clientbuilder, ctx.done())
		if err != nil {
			logger.error(err, "error building controller context")
			klog.flushandexit(klog.exitflushtimeout, 1)
		}
        // 开始控制器,这是主运行逻辑
		if err := startcontrollers(ctx, controllercontext, controllerdescriptors, unsecuredmux, healthzhandler); err != nil {
			logger.error(err, "error starting controllers")
			klog.flushandexit(klog.exitflushtimeout, 1)
		}
        // 启动 informer
		controllercontext.informerfactory.start(stopch)
		controllercontext.objectormetadatainformerfactory.start(stopch)
		close(controllercontext.informersstarted)
		<-ctx.done()
	}
    // no leader election, run directly
	if !c.componentconfig.generic.leaderelection.leaderelect {
        // 创建控制器描述符
		controllerdescriptors := newcontrollerdescriptors()
		controllerdescriptors[names.serviceaccounttokencontroller] = satokencontrollerdescriptor
		run(ctx, controllerdescriptors)
		return nil
	}
    ...
}

kube-scheduler 类似,kube-controller-manager 也是多副本单实例运行的组件,需要 leader election 作为 leader 组件运行。这里不过多介绍,具体可参考 。

运行控制器管理器。首先,在 newcontrollerdescriptors 中注册资源控制器的描述符。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func newcontrollerdescriptors() map[string]*controllerdescriptor {
    register := func(controllerdesc *controllerdescriptor) {
		...
		controllers[name] = controllerdesc
	}
    ...
    // register 函数注册资源控制器
    register(newendpointscontrollerdescriptor())
	register(newendpointslicecontrollerdescriptor())
	register(newendpointslicemirroringcontrollerdescriptor())
	register(newreplicationcontrollerdescriptor())
	register(newpodgarbagecollectorcontrollerdescriptor())
	register(newresourcequotacontrollerdescriptor())
    ...
    return controllers
}
# kubernetes/cmd/kube-controller-manager/app/apps.go
func newreplicasetcontrollerdescriptor() *controllerdescriptor {
	return &controllerdescriptor{
		name:     names.replicasetcontroller,
		aliases:  []string{"replicaset"},
		initfunc: startreplicasetcontroller,
	}
}

每个资源控制器描述符包括 initfunc 和启动控制器函数的映射。

runstartcontrollers 运行控制器。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func startcontrollers(ctx context.context, controllerctx controllercontext, controllerdescriptors map[string]*controllerdescriptor,
	unsecuredmux *mux.pathrecordermux, healthzhandler *controllerhealthz.mutablehealthzhandler) error {
    ...
    // 遍历获取资源控制器描述符
    for _, controllerdesc := range controllerdescriptors {
		if controllerdesc.requiresspecialhandling() {
			continue
		}
        // 运行资源控制器
		check, err := startcontroller(ctx, controllerctx, controllerdesc, unsecuredmux)
		if err != nil {
			return err
		}
		if check != nil {
			// healthchecker should be present when controller has started
			controllerchecks = append(controllerchecks, check)
		}
	}
    ...
    return nil
}
func startcontroller(ctx context.context, controllerctx controllercontext, controllerdescriptor *controllerdescriptor,
	unsecuredmux *mux.pathrecordermux) (healthz.healthchecker, error) {
    ...
    // 获取资源控制器描述符的启动函数
    initfunc := controllerdescriptor.getinitfunc()
    
    // 启动资源控制器
	ctrl, started, err := initfunc(klog.newcontext(ctx, klog.loggerwithname(logger, controllername)), controllerctx, controllername)
	if err != nil {
		logger.error(err, "error starting controller", "controller", controllername)
		return nil, err
	}
    ...
}

kubernetes 有多个控制器,这里以 replicaset 控制器为例,介绍控制器是怎么运行的。

进入 replicaset 控制器的 initfunc 函数运行控制器。

# kubernetes/cmd/kube-controller-manager/app/apps.go
func startreplicasetcontroller(ctx context.context, controllercontext controllercontext, controllername string) (controller.interface, bool, error) {
	go replicaset.newreplicasetcontroller(
		klog.fromcontext(ctx),
		controllercontext.informerfactory.apps().v1().replicasets(),
		controllercontext.informerfactory.core().v1().pods(),
		controllercontext.clientbuilder.clientordie("replicaset-controller"),
		replicaset.burstreplicas,
	).run(ctx, int(controllercontext.componentconfig.replicasetcontroller.concurrentrssyncs))
	return nil, true, nil
}

运行 initfunc 实际上运行的是 startreplicasetcontrollerstartreplicasetcontroller 启动一个 goroutine 运行 replicaset.newreplicasetcontrollerreplicasetcontroller.runreplicaset.newreplicasetcontroller 创建了 informereventhandlerreplicasetcontroller.run 负责对 eventhandler 中加入队列的资源做处理。示意图如下:

kubernetes: kube-controller-manager 源码分析

首先,进入 replicaset.newreplicasetcontroller 查看函数做了什么。

# kubernetes/pkg/controller/replicaset/replica_set.go
func newreplicasetcontroller(logger klog.logger, rsinformer appsinformers.replicasetinformer, podinformer coreinformers.podinformer, kubeclient clientset.interface, burstreplicas int) *replicasetcontroller {
	...
	return newbasecontroller(logger, rsinformer, podinformer, kubeclient, burstreplicas,
		apps.schemegroupversion.withkind("replicaset"),
		"replicaset_controller",
		"replicaset",
		controller.realpodcontrol{
			kubeclient: kubeclient,
			recorder:   eventbroadcaster.newrecorder(scheme.scheme, v1.eventsource{component: "replicaset-controller"}),
		},
		eventbroadcaster,
	)
}
func newbasecontroller(logger klog.logger, rsinformer appsinformers.replicasetinformer, podinformer coreinformers.podinformer, kubeclient clientset.interface, burstreplicas int,
	gvk schema.groupversionkind, metricownername, queuename string, podcontrol controller.podcontrolinterface, eventbroadcaster record.eventbroadcaster) *replicasetcontroller {
	rsc := &replicasetcontroller{
		groupversionkind: gvk,
		kubeclient:       kubeclient,
		podcontrol:       podcontrol,
		eventbroadcaster: eventbroadcaster,
		burstreplicas:    burstreplicas,
		expectations:     controller.newuidtrackingcontrollerexpectations(controller.newcontrollerexpectations()),
		queue:            workqueue.newnamedratelimitingqueue(workqueue.defaultcontrollerratelimiter(), queuename),
	}
	rsinformer.informer().addeventhandler(cache.resourceeventhandlerfuncs{
		addfunc: func(obj interface{}) {
			rsc.addrs(logger, obj)
		},
		updatefunc: func(oldobj, newobj interface{}) {
			rsc.updaters(logger, oldobj, newobj)
		},
		deletefunc: func(obj interface{}) {
			rsc.deleters(logger, obj)
		},
	})
	...
	podinformer.informer().addeventhandler(cache.resourceeventhandlerfuncs{
		addfunc: func(obj interface{}) {
			rsc.addpod(logger, obj)
		},
		updatefunc: func(oldobj, newobj interface{}) {
			rsc.updatepod(logger, oldobj, newobj)
		},
		deletefunc: func(obj interface{}) {
			rsc.deletepod(logger, obj)
		},
	})
	...
	rsc.synchandler = rsc.syncreplicaset
	return rsc
}

函数定义了 replicasetcontrollerpodinformer,负责监控 kube-apiserverreplicasetpod 的变化,根据资源的不同变动触发对应的 event handler

接着,进入 run 查看函数做了什么。

# kubernetes/pkg/controller/replicaset/replica_set.go
func (rsc *replicasetcontroller) run(ctx context.context, workers int) {
	...
	// 同步缓存和 kube-apiserver 中获取的资源
	if !cache.waitfornamedcachesync(rsc.kind, ctx.done(), rsc.podlistersynced, rsc.rslistersynced) {
		return
	}
	for i := 0; i < workers; i   {
		// worker 负责处理队列中的资源
		go wait.untilwithcontext(ctx, rsc.worker, time.second)
	}
	<-ctx.done()
}
func (rsc *replicasetcontroller) worker(ctx context.context) {
	// worker 是永不停止的
	for rsc.processnextworkitem(ctx) {
	}
}
func (rsc *replicasetcontroller) processnextworkitem(ctx context.context) bool {
	// 读取队列中的资源
	key, quit := rsc.queue.get()
	if quit {
		return false
	}
	defer rsc.queue.done(key)
	// 处理队列中的资源
	err := rsc.synchandler(ctx, key.(string))
	if err == nil {
		rsc.queue.forget(key)
		return true
	}
	...
	return true
}

可以看到,rsc.synchandler 处理队列中的资源,rsc.synchandler 实际执行的是 replicasetcontroller.syncreplicaset

理清了代码的结构,我们以一个删除 pod 示例看 kube-controller-manager 是怎么运行的。

1.1 删除 pod 示例

1.1.1 示例条件

创建 replicaset 如下:

# helm list
name    namespace       revision        updated                                 status          chart           app version
test    default         1               2024-02-29 16:24:43.896757193  0800 cst deployed        test-0.1.0      1.16.0
# kubectl get replicaset
name                       desired   current   ready   age
test-6d47479b6b            1         1         1       10d
# kubectl get pods
name                             ready   status    restarts   age
test-6d47479b6b-5k6cb            1/1     running   0          9d

删除 pod 查看 kube-controller-manager 是怎么运行的。

1.1.2 运行流程

删除 pod:

# kubectl delete pods test-6d47479b6b-5k6cb

删除 pod 后,podinformerevent handler 接受到 pod 的变化,调用 replicasetcontroller.deletepod 函数:

func (rsc *replicasetcontroller) deletepod(logger klog.logger, obj interface{}) {
	pod, ok := obj.(*v1.pod)
	...
	logger.v(4).info("pod deleted", "delete_by", utilruntime.getcaller(), "deletion_timestamp", pod.deletiontimestamp, "pod", klog.kobj(pod))
	...
	rsc.queue.add(rskey)
}

replicasetcontroller.deletepod 将删除的 pod 加入到队列中。接着,worker 中的 replicasetcontroller.processnextworkitem 从队列中获取删除的 pod,进入 replicasetcontroller.syncreplicaset 处理。

func (rsc *replicasetcontroller) syncreplicaset(ctx context.context, key string) error {
	...
	namespace, name, err := cache.splitmetanamespacekey(key)
	...
	// 获取 pod 对应的 replicaset
	rs, err := rsc.rslister.replicasets(namespace).get(name)
	...
	// 获取所有 pod
	allpods, err := rsc.podlister.pods(rs.namespace).list(labels.everything())
	if err != nil {
		return err
	}
	// ignore inactive pods.
	filteredpods := controller.filteractivepods(logger, allpods)
	// 获取 replicaset 下的 pod
	// 这里 pod 被删掉了,filteredpods 为 0
	filteredpods, err = rsc.claimpods(ctx, rs, selector, filteredpods)
	if err != nil {
		return err
	}
	// replicaset 下的 pod 被删除
	// 进入 rsc.managereplicas
	var managereplicaserr error
	if rsneedssync && rs.deletiontimestamp == nil {
		managereplicaserr = rsc.managereplicas(ctx, filteredpods, rs)
	}
	...
}

继续进入 replicasetcontroller.managereplicas

func (rsc *replicasetcontroller) managereplicas(ctx context.context, filteredpods []*v1.pod, rs *apps.replicaset) error {
	diff := len(filteredpods) - int(*(rs.spec.replicas))
	...
	if diff < 0 {
		logger.v(2).info("too few replicas", "replicaset", klog.kobj(rs), "need", *(rs.spec.replicas), "creating", diff)
		...
		successfulcreations, err := slowstartbatch(diff, controller.slowstartinitialbatchsize, func() error {
			err := rsc.podcontrol.createpods(ctx, rs.namespace, &rs.spec.template, rs, metav1.newcontrollerref(rs, rsc.groupversionkind))
			if err != nil {
				if apierrors.hasstatuscause(err, v1.namespaceterminatingcause) {
					// if the namespace is being terminated, we don't have to do
					// anything because any creation will fail
					return nil
				}
			}
			return err
		})
		...
	}
	...
}

filteredpods 小于 replicaset 中 spec 域定义的 replicas 时,进入 rsc.podcontrol.createpods 创建 pod:

func (r realpodcontrol) createpods(ctx context.context, namespace string, template *v1.podtemplatespec, controllerobject runtime.object, controllerref *metav1.ownerreference) error {
	return r.createpodswithgeneratename(ctx, namespace, template, controllerobject, controllerref, "")
}
func (r realpodcontrol) createpodswithgeneratename(ctx context.context, namespace string, template *v1.podtemplatespec, controllerobject runtime.object, controllerref *metav1.ownerreference, generatename string) error {
	...
	return r.createpods(ctx, namespace, pod, controllerobject)
}
func (r realpodcontrol) createpods(ctx context.context, namespace string, pod *v1.pod, object runtime.object) error {
	...
	newpod, err := r.kubeclient.corev1().pods(namespace).create(ctx, pod, metav1.createoptions{})
	...
	logger.v(4).info("controller created pod", "controller", accessor.getname(), "pod", klog.kobj(newpod))
	...
	return nil
}

接着,回到 replicasetcontroller.syncreplicaset

func (rsc *replicasetcontroller) syncreplicaset(ctx context.context, key string) error {
	...
	newstatus := calculatestatus(rs, filteredpods, managereplicaserr)
	updatedrs, err := updatereplicasetstatus(logger, rsc.kubeclient.appsv1().replicasets(rs.namespace), rs, newstatus)
	if err != nil {
		return err
	}
	...
}

虽然 pod 重建过,不过这里的 filteredpods 是 0,updatereplicasetstatus 会更新 replicaset 的当前状态为 0。

更新了 replicaset 的状态又会触发 replicasetevent handler,从而再次进入 replicasetcontroller.syncreplicaset。这时,如果 pod 重建完成,filteredpods 将过滤出重建的 pod,调用 updatereplicasetstatus 更新 replicaset 的当前状态到期望状态。

本文介绍了 kube-controller-manager 的运行流程,并且从一个删除 pod 的示例入手,看 kube-controller-manager 是如何控制资源状态的。

返回顶部
顶部
网站地图