原文出处:
overview
kubernetes 默认调度器在调度 pod 时并不关心特殊资源例如磁盘、gpu 等,因此突发奇想来改造调度器,在翻阅官方调度器框架[1]、调度器配置[2]和参考大佬的文章[3]后,自己也来尝试改写一下。
环境配置
相关软件版本:
- kubernetes 版本:v1.19.0
- docker 版本:v26.1.2
- prometheus 版本:v2.49
- node exporter 版本:v1.7.0
集群内有 1 个 master 和 3 个 node。
实验部分
项目总览
项目结构如下:
.
├── dockerfile
├── deployment.yaml
├── go.mod
├── go.sum
├── main.go
├── pkg
│ ├── cpu
│ │ └── cputraffic.go
│ ├── disk
│ │ └── disktraffic.go
│ ├── diskspace
│ │ └── diskspacetraffic.go
│ ├── memory
│ │ └── memorytraffic.go
│ ├── network
│ │ └── networktraffic.go
│ └── prometheus.go
├── scheduler
├── scheduler.conf
└── scheduler.yaml
插件部分
下面以构建内存插件为例。
定义插件名称、变量和结构体
const memoryplugin = "memorytraffic"
var _ = framework.scoreplugin(&memorytraffic{})
type memorytraffic struct {
prometheus *pkg.prometheushandle
handle framework.frameworkhandle
}
下面来实现 framework.frameworkhandle 的接口。
先定义插件初始化入口
func new(plargs runtime.object, h framework.frameworkhandle) (framework.plugin, error) {
args := &memorytrafficargs{}
if err := fruntime.decodeinto(plargs, args); err != nil {
return nil, err
}
klog.infof("[memorytraffic] args received. device: %s; timerange: %d, address: %s", args.devicename, args.timerange, args.ip)
return &memorytraffic{
handle: h,
prometheus: pkg.newprome(args.ip, args.devicename, time.minute*time.duration(args.timerange)),
}, nil
}
实现 score 接口,score 进行初步打分
func (n *memorytraffic) score(ctx context.context, state *framework.cyclestate, p *corev1.pod, nodename string) (int64, *framework.status) {
nodebandwidth, err := n.prometheus.memorygetgauge(nodename)
if err != nil {
return 0, framework.newstatus(framework.error, fmt.sprintf("error getting node bandwidth measure: %s", err))
}
bandwidth := int64(nodebandwidth.value)
klog.infof("[memorytraffic] node '%s' bandwidth: %v", nodename, bandwidth)
return bandwidth, nil
}
实现 normalizescore,对上一步 score 的打分进行修正
func (n *memorytraffic) normalizescore(ctx context.context, state *framework.cyclestate, pod *corev1.pod, scores framework.nodescorelist) *framework.status {
var higherscore int64
for _, node := range scores {
if higherscore < node.score {
higherscore = node.score
}
}
// 计算公式为,满分 - (当前内存使用 / 总内存 * 100)
// 公式的计算结果为,内存使用率越大的节点,分数越低
for i, node := range scores {
scores[i].score = node.score * 100 / higherscore
klog.infof("[memorytraffic] nodes final score: %v", scores[i].score)
}
klog.infof("[memorytraffic] nodes final score: %v", scores)
return nil
}
配置插件名称和返回 scoreextension
func (n *memorytraffic) name() string {
return memoryplugin
}
// 如果返回framework.scoreextensions 就需要实现framework.scoreextensions
func (n *memorytraffic) scoreextensions() framework.scoreextensions {
return n
}
prometheus 部分
首先来编写查询内存可用率的 promql
const memorymeasurequerytemplate = ` (avg_over_time(node_memory_memavailable_bytes[30m]) / avg_over_time(node_memory_memtotal_bytes[30m])) * 100 * on(instance) group_left(nodename) (node_uname_info{nodename="%s"})`
然后来声明 prometheushandle
type prometheushandle struct {
devicename string
timerange time.duration
ip string
client v1.api
}
另外在插件部分也要声明查询 prometheus 的参数结构体
type memorytrafficargs struct {
ip string `json:"ip"`
devicename string `json:"devicename"`
timerange int `json:"timerange"`
}
编写初始化 prometheus 插件入口
func newprome(ip, devicename string, timerace time.duration) *prometheushandle {
client, err := api.newclient(api.config{address: ip})
if err != nil {
klog.fatalf("[prometheus plugin] fatalerror creating prometheus client: %s", err.error())
}
return &prometheushandle{
devicename: devicename,
ip: ip,
timerange: timerace,
client: v1.newapi(client),
}
}
编写通用查询接口,可供其他类型资源查询
func (p *prometheushandle) query(promql string) (model.value, error) {
results, warnings, err := p.client.query(context.background(), promql, time.now())
if len(warnings) > 0 {
klog.warningf("[prometheus query plugin] warnings: %v\n", warnings)
}
return results, err
}
获取内存可用率接口
func (p *prometheushandle) memorygetgauge(node string) (*model.sample, error) {
value, err := p.query(fmt.sprintf(memorymeasurequerytemplate, node))
fmt.println(fmt.sprintf(memorymeasurequerytemplate, node))
if err != nil {
return nil, fmt.errorf("[memorytraffic plugin] error querying prometheus: %w", err)
}
nodemeasure := value.(model.vector)
if len(nodemeasure) != 1 {
return nil, fmt.errorf("[memorytraffic plugin] invalid response, expected 1 value, got %d", len(nodemeasure))
}
return nodemeasure[0], nil
}
然后在程序入口里启用插件并执行
func main() {
rand.seed(time.now().unixnano())
command := app.newschedulercommand(
app.withplugin(network.networkplugin, network.new),
app.withplugin(disk.diskplugin, disk.new),
app.withplugin(diskspace.diskspaceplugin, diskspace.new),
app.withplugin(cpu.cpuplugin, cpu.new),
app.withplugin(memory.memoryplugin, memory.new),
)
// 对于外部注册一个plugin
// command := app.newschedulercommand(
// app.withplugin("example-plugin1", exampleplugin1.new))
if err := command.execute(); err != nil {
fmt.fprintf(os.stderr, "%v\n", err)
os.exit(1)
}
}
配置部分
为方便观察,这里使用二进制方式运行,准备运行时的配置文件
apiversion: kubescheduler.config.k8s.io/v1beta1
kind: kubeschedulerconfiguration
clientconnection:
kubeconfig: /etc/kubernetes/scheduler.conf
profiles:
- schedulername: custom-scheduler
plugins:
score:
enabled:
- name: "cputraffic"
weight: 3
- name: "memorytraffic"
weight: 4
- name: "diskspacetraffic"
weight: 3
- name: "networktraffic"
weight: 2
disabled:
- name: "*"
pluginconfig:
- name: "networktraffic"
args:
ip: "http://172.19.32.140:9090"
devicename: "eth0"
timerange: 60
- name: "cputraffic"
args:
ip: "http://172.19.32.140:9090"
devicename: "eth0"
timerange: 0
- name: "memorytraffic"
args:
ip: "http://172.19.32.140:9090"
devicename: "eth0"
timerange: 0
- name: "diskspacetraffic"
args:
ip: "http://172.19.32.140:9090"
devicename: "eth0"
timerange: 0
kubeconfig 处为 master 节点的 scheduler.conf,以实际路径为准,内包含集群的证书哈希,ip 为部署 prometheus 节点的 ip,端口为 promenade 配置中对外暴露的端口。
将二进制文件和 scheduler.yaml 放至 master 同一目录下运行:
./scheduler --logtostderr=true \
--address=127.0.0.1 \
--v=6 \
--config=`pwd`/scheduler.yaml \
--kubeconfig="/etc/kubernetes/scheduler.conf" \
验证结果
准备一个要部署的 pod,使用指定的调度器名称
apiversion: apps/v1
kind: deployment
metadata:
name: gin
namespace: default
labels:
app: gin
spec:
replicas: 2
selector:
matchlabels:
app: gin
template:
metadata:
labels:
app: gin
spec:
schedulername: my-custom-scheduler # 使用自定义调度器
containers:
- name: gin
image: jaydenchang/k8s_test:latest
imagepullpolicy: always
command: ["./app"]
ports:
- containerport: 9999
protocol: tcp
最后的可以查看日志,部分日志如下:
i0808 17:32:35.138289 27131 memorytraffic.go:83] [memorytraffic] node 'node1' bandwidth: %!s(int64=2680340)
i0808 17:32:35.138763 27131 memorytraffic.go:70] [memorytraffic] nodes final score: [{node1 2680340} {node2 0}]
i0808 17:32:35.138851 27131 memorytraffic.go:70] [memorytraffic] nodes final score: [{node1 71} {node2 0}]
i0808 17:32:35.138911 27131 memorytraffic.go:73] [memorytraffic] nodes final score: [{node1 71} {node2 0}]
i0808 17:32:35.139565 27131 default_binder.go:51] attempting to bind default/go-deployment-66878c4885-b4b7k to node1
i0808 17:32:35.141114 27131 eventhandlers.go:225] add event for scheduled pod default/go-deployment-66878c4885-b4b7k
i0808 17:32:35.141714 27131 eventhandlers.go:205] delete event for unscheduled pod default/go-deployment-66878c4885-b4b7k
i0808 17:32:35.143504 27131 scheduler.go:609] "successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no
de1" evaluatednodes=2 feasiblenodes=2
i0808 17:32:35.104540 27131 scheduler.go:609] "successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no
de1" evaluatednodes=2 feasiblenodes=2