当前位置:首页 > Web开发 > 正文

# IT明星不是梦 # 图解kubernetes资源扩展机制实现(上)

2024-03-31 Web开发

标签:

k8s目前主要撑持CPU和内存两种资源,为了撑持用户需要按需分配的其他硬件类型的资源的调理分配,k8s实现了设备插件框架(device plugin framework)来用于其他硬件类型的资源集成,好比此刻机器学习要使用GPU等资源,今天来看下其内部的关键实现

1. 根本观点

1.1 集成方法 1.1.1 DaemonSet与处事

当我们要集成本地硬件的资源的时候,我们可以在当前节点上通过DaemonSet来运行一个GRPC处事,通过这个处事来进行本地硬件资源的上报与分配

1.1.2 处事注册设计

当供给硬件处事需要与kubelet进行通信的时候,则首先需要进行注册,注册的方法,则是通过最原始的底层的socket文件,并且通过Linux文件系统的inotify机制,来实现处事的注册

1.2 插件处事感知

1.2.1 Watcher

Watcher主要是卖力感知当前节点上注册的处事,当发明新的要注册的插件处事,则会孕育产生对应的事件,注册到当前的kubelet中

1.2.2 期望状态与实际状态

这里的状态主要是指的是否需要注册,因为kubelet与对应的插件处事是通过网络进行通信的,当网络呈现问题、或者对应的插件办变乱障,则可能会导致处事注册掉败,但此时对应的处事的socket还依旧存在,即对应的插件处事依旧存在

此时就会有两种状态:期望状态与实际状态, 因为socket存在所以处事的期望状态其实是需要注册这个插件处事,但是实际上因为某些原因,这个插件处事并没有完成注册,后续会不停的通过期望状态,调解实际状态,从而到达一致

1.2.3 协调器

协调器则就是完成上述两种状态之间操纵的核心,其通过挪用对应插件的回调函数,其实就是挪用对应的grpc接口,来完成期望状态与实际状态的一致性

1.2.4 插件控制器

针对每种类型的插件,城市有对应的控制器,其实也就是实现对应设备注册和反注册并且完成底层资源的分配(Allocate)和收集(ListWatch)操纵

2. 插件处事发明

2.1 核心数据布局 type Watcher struct { // 感知插件处事注册的socket的路径 path string fs utilfs.Filesystem // inotify监测插件处事socket变革 fsWatcher *fsnotify.Watcher stopped chan struct{} // 存储期望状态 desiredStateOfWorld cache.DesiredStateOfWorld } 2.2 初始化

初始化其实就是创建对应的目录

func (w *Watcher) init() error { klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path) if err := w.fs.MkdirAll(w.path, 0755); err != nil { return fmt.Errorf("error (re-)creating root %s: %v", w.path, err) } return nil } 2.3 插件处事发明核心 go func(fsWatcher *fsnotify.Watcher) { defer close(w.stopped) for { select { case event := <-fsWatcher.Events: //如果发明对应目录的文件的变革,,则会触发对应的事件 if event.Op&fsnotify.Create == fsnotify.Create { err := w.handleCreateEvent(event) if err != nil { klog.Errorf("error %v when handling create event: %s", err, event) } } else if event.Op&fsnotify.Remove == fsnotify.Remove { w.handleDeleteEvent(event) } continue case err := <-fsWatcher.Errors: if err != nil { klog.Errorf("fsWatcher received error: %v", err) } continue case <-stopCh: // In case of plugin watcher being stopped by plugin manager, stop // probing the creation/deletion of plugin sockets. // Also give all pending go routines a chance to complete select { case <-w.stopped: case <-time.After(11 * time.Second): klog.Errorf("timeout on stopping watcher") } w.fsWatcher.Close() return } } }(fsWatcher) 2.4 赔偿机制

其实赔偿机制主要是在从头启动kubelet的时候,需要将之前已经存在的socket从头注册到当前的kubelet中

func (w *Watcher) traversePluginDir(dir string) error { return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { if path == dir { return fmt.Errorf("error accessing path: %s error: %v", path, err) } klog.Errorf("error accessing path: %s error: %v", path, err) return nil } switch mode := info.Mode(); { case mode.IsDir(): if err := w.fsWatcher.Add(path); err != nil { return fmt.Errorf("failed to watch %s, err: %v", path, err) } case mode&os.ModeSocket != 0: event := fsnotify.Event{ Name: path, Op: fsnotify.Create, } //TODO: Handle errors by taking corrective measures if err := w.handleCreateEvent(event); err != nil { klog.Errorf("error %v when handling create event: %s", err, event) } default: klog.V(5).Infof("Ignoring file %s with mode %v", path, mode) } return nil }) } 2.5 注册事件回调

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/30012.html