Skip to content

45_快锁实现篇

kuangyufei edited this page Sep 5, 2022 · 1 revision

本篇关键词:、、、

下载 >> 离线文档.鸿蒙内核源码分析(百篇博客分析.挖透鸿蒙内核).pdf

通讯机制相关篇为:

本篇为快锁下篇,说清楚快锁在内核态的实现,解答以下问题,它们在上篇的末尾被提出来。

  • 鸿蒙内核进程池默认上限是64个,除去两个内核进程外,剩下的都归属用户进程,理论上用户进程可以创建很多快锁,这些快锁可以用于进程间(共享快锁)也可以用于线程间(私有快锁),在快锁的生命周期中该如何保存 ?
  • 无锁时,前面已经有进程在申请锁时,如何处理好新等锁进程和旧等锁进程的关系 ?
  • 释放锁时,需要唤醒已经在等锁的进程,唤醒的顺序由什么条件决定 ?

系列篇多次提过,线程在内核层面叫任务,在内核任务比进程重要得多,调度也好,竞争也罢,都是围绕任务展开的。竞争快锁是任务间的竞争,自然会和任务(task)有紧密的联系,其在内核的表达也出现在了任务表达之中。

typedef struct { // 任务控制块
    ...
    LOS_DL_LIST     pendList;           /**< Task pend node | 如果任务阻塞时就通过它挂到各种阻塞情况的链表上,比如OsTaskWait时 */
    ...
    FutexNode       futex;		///< 指明任务在等待哪把快锁,一次只等一锁,锁和任务的关系是(1:N)关系
} LosTaskCB;    

任务 不清楚的请翻看系列相关篇,一定要搞懂,它是内核最重要的概念,甚至没有之一,搞不懂任务就一定搞不懂内核整体的运行机制。

快锁节点 | 内核表达

FutexNode(快锁节点) 是快锁模块核心结构体,熟悉这块源码的钥匙。

typedef struct {
    UINTPTR      key;           /* private:uvaddr | 私有锁,用虚拟地址         shared:paddr | 共享锁,用物理地址 */
    UINT32       index;         /* hash bucket index | 哈希桶索引 OsFutexKeyToIndex */
    UINT32       pid;           /* private:process id   shared:OS_INVALID(-1) | 私有锁:进程ID     , 共享锁为 -1 */
    LOS_DL_LIST  pendList;      /* point to pendList in TCB struct | 指向 TCB 结构中的 pendList, 通过它找到任务*/
    LOS_DL_LIST  queueList;     /* thread list blocked by this lock | 挂等待这把锁的任务,其实这里挂到是FutexNode.queueList , 通过 queueList 可以找到 pendList ,通过 pendList又可以找到真正的任务*/
    LOS_DL_LIST  futexList;     /* point to the next FutexNode | 下一把快锁节点*/
} FutexNode;

解读

  • 首先要明白 快锁快锁节点 的区别,否则看内核代码一定会懵圈,内核并没有快锁这个结构体,key就是快锁,它们的关系是 1:N 的关系 ,快锁分成了 私有锁共享锁 两种类型。用key表示唯一性。共享锁用物理地址 , 私有锁用虚拟地址。为什么要这么做呢 ?
    • 私有锁的意思是进程私有,作用于同一个进程的不同任务间, 因为任务是共享进程空间的, 所以可以用虚拟地址来表示进程内的唯一性 。 但两个不同的进程会出现两个虚拟地址一样的快锁。
    • 共享锁的意思是进程共享,作用于不同进程的不同任务间,因为不同的进程都会有相同的虚拟地址范围, 所以不能用虚拟地址来表示唯一性 ,只能用物理地址。虚拟地址 : 物理地址 = N: 1,不清楚的请查看系列篇之内存映射相关篇。
  • index 内核使用哈希桶来检索快锁 , indexkey的关系通过哈希算法(FNV-1a)来映射。注意会有同一个哈希桶中两个key一样的锁,虽然它会以极低概率出现。快锁的内核实现代码部分,个人觉得可以优化的空间很大,应好好测试下这块 ,说不定会有意想不到的 bug
  • pid 指快锁节点进程归属,作用于私有锁。
  • pendList 指向 LosTaskCB.pendList, 通过它去唤醒和挂起任务,但并没有在源码中看到指向动作,如有看到的请告知。
  • queueList 具有相同key值的节点被queue_list串联起来表示被同一把锁阻塞的任务队列,意思就是queueList上面挂的都是等值为相同key的快锁,并按快锁背后的任务优先级排好序。任务优先级高的可以先获取快锁使用权。
  • futexList 指向下一把快锁, 虽然挂的也是 FutexNode ,但是意义不一样 ! 是指queueList链表上的首个快锁节点,即不同key的快锁。能理解吗 ? 好吧 ,我承认这里面有点绕 。

哈希桶 | 管理快锁

当用户态产生锁的竞争或释放需要进行相关线程的调度操作时,会触发Futex系统调用进入内核,此时会将用户态锁的地址传入内核,并在内核的Futex中以锁地址来区分用户态的每一把锁,因为用户态可用虚拟地址空间为1GiB,为了便于查找、管理,内核Futex采用哈希桶来存放用户态传入的锁。

哈希桶共有80个,0~63 号桶用于存放私有锁(以虚拟地址进行哈希),64~79号桶用于存放共享锁(以物理地址进行哈希),所有相同的 key都掉进了同一个桶里。私有/共享属性通过用户态锁的初始化以及Futex系统调用入参确定。

#define FUTEX_INDEX_PRIVATE_MAX     64	///< 0~63号桶用于存放私有锁(以虚拟地址进行哈希),同一进程不同线程共享futex变量,表明变量在进程地址空间中的位置
///< 它告诉内核,这个futex是进程专有的,不可以与其他进程共享。它仅仅用作同一进程的线程间同步。
#define FUTEX_INDEX_SHARED_MAX      16	///< 64~79号桶用于存放共享锁(以物理地址进行哈希),不同进程间通过文件共享futex变量,表明该变量在文件中的位置
#define FUTEX_INDEX_MAX             (FUTEX_INDEX_PRIVATE_MAX + FUTEX_INDEX_SHARED_MAX) ///< 80个哈希桶
#define FUTEX_INDEX_SHARED_POS      FUTEX_INDEX_PRIVATE_MAX ///< 共享锁开始位置
FutexHash g_futexHash[FUTEX_INDEX_MAX];///< 默认80个哈希桶

typedef struct {
    LosMux      listLock;///< 内核操作lockList的互斥锁
    LOS_DL_LIST lockList;///< 用于挂载 FutexNode (Fast userspace mutex,用户态快速互斥锁)
} FutexHash;

结构体很简单,没什么可说的,一把互斥锁确保一个链表的操作。 下图来源于官方文档,基本能准确的描述管理方式,暂且使用此图(后续可能重画) , 有了这张图理解上面FutexNode会更轻松

任务调度

  • 无锁时就需要将当前任务挂起,可详细跟踪函数OsFutexWaitTask,无非就是根据任务的优先级调整queueList futexList queueList 这些链表上的位置
    /// 将当前任务挂入等待链表中
    STATIC INT32 OsFutexWaitTask(const UINT32 *userVaddr, const UINT32 flags, const UINT32 val, const UINT32 timeOut)
    {
        INT32 futexRet;
        UINT32 intSave, lockVal;
        LosTaskCB *taskCB = NULL;
        FutexNode *node = NULL;
        UINTPTR futexKey = OsFutexFlagsToKey(userVaddr, flags);//通过地址和flags 找到 key
        UINT32 index = OsFutexKeyToIndex(futexKey, flags);//通过key找到哈希桶
        FutexHash *hashNode = &g_futexHash[index];
    
        if (OsFutexLock(&hashNode->listLock)) {//操作快锁节点链表前先上互斥锁
            return LOS_EINVAL;
        }
        //userVaddr必须是用户空间虚拟地址
        if (LOS_ArchCopyFromUser(&lockVal, userVaddr, sizeof(UINT32))) {//将值拷贝到内核空间
            PRINT_ERR("Futex wait param check failed! copy from user failed!\n");
            futexRet = LOS_EINVAL;
            goto EXIT_ERR;
        }
    
        if (lockVal != val) {//对参数内部逻辑检查
            futexRet = LOS_EBADF;
            goto EXIT_ERR;
        }
        //注意第二个参数 FutexNode *node = NULL 
        if (OsFutexInsertTaskToHash(&taskCB, &node, futexKey, flags)) {// node = taskCB->futex
            futexRet = LOS_NOK;
            goto EXIT_ERR;
        }
    
        SCHEDULER_LOCK(intSave);
        OsTaskWaitSetPendMask(OS_TASK_WAIT_FUTEX, futexKey, timeOut);
        OsSchedTaskWait(&(node->pendList), timeOut, FALSE);
        OsSchedLock();
        LOS_SpinUnlock(&g_taskSpin);
    
        futexRet = OsFutexUnlock(&hashNode->listLock);
        if (futexRet) {
            OsSchedUnlock();
            LOS_IntRestore(intSave);
            goto EXIT_UNLOCK_ERR;
        }
    
        LOS_SpinLock(&g_taskSpin);
        OsSchedUnlock();
    
        /*
        * it will immediately do the scheduling, so there's no need to release the
        * task spinlock. when this task's been rescheduled, it will be holding the spinlock.
        */
        OsSchedResched();
    
        if (taskCB->taskStatus & OS_TASK_STATUS_TIMEOUT) {
            taskCB->taskStatus &= ~OS_TASK_STATUS_TIMEOUT;
            SCHEDULER_UNLOCK(intSave);
            return OsFutexDeleteTimeoutTaskNode(hashNode, node);
        }
    
        SCHEDULER_UNLOCK(intSave);
        return LOS_OK;
    
    EXIT_ERR:
        (VOID)OsFutexUnlock(&hashNode->listLock);
    EXIT_UNLOCK_ERR:
        return futexRet;
    }
    针对本篇开始的问题二,等锁新任务来临后由任务优先级决定在queueList中的位置,OsFutexInsertTasktoPendList
    ///< 将快锁挂到任务的阻塞链表上
    STATIC INT32 OsFutexInsertTasktoPendList(FutexNode **firstNode, FutexNode *node, const LosTaskCB *run)
    {
        LosTaskCB *taskHead = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&((*firstNode)->pendList)));//获取阻塞链表首个任务
        LOS_DL_LIST *queueList = &((*firstNode)->queueList);
        FutexNode *tailNode = NULL;
        LosTaskCB *taskTail = NULL;
    
        if (run->priority < taskHead->priority) {//任务的优先级比较
            /* The one with the highest priority is inserted at the top of the queue */
            LOS_ListTailInsert(queueList, &(node->queueList));//查到queueList的尾部
            OsFutexReplaceQueueListHeadNode(*firstNode, node);//同时交换futexList链表上的位置
            *firstNode = node;
            return LOS_OK;
        }
        //如果等锁链表上没有任务或者当前任务大于链表首个任务
        if (LOS_ListEmpty(queueList) && (run->priority >= taskHead->priority)) {
            /* Insert the next position in the queue with equal priority */
            LOS_ListHeadInsert(queueList, &(node->queueList));//从头部插入当前任务,当前任务是要被挂起的
            return LOS_OK;
        }
        
        tailNode = OS_FUTEX_FROM_QUEUELIST(LOS_DL_LIST_LAST(queueList));//获取尾部节点
        taskTail = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(tailNode->pendList)));//获取阻塞任务的最后一个
        if ((run->priority >= taskTail->priority) ||//当前任务优先级比最后一个更高,或者 ... 没看懂, 为啥要这样 ? @notethinking
            ((run->priority - taskHead->priority) > (taskTail->priority - run->priority))) {//跟最后一个比较优先级
            return OsFutexInsertFindFormBackToFront(queueList, run, node);//从后往前插入
        }
    
        return OsFutexInsertFindFromFrontToBack(queueList, run, node);//否则从前往后插入
    }
  • 释放锁时就需要将queueList上挂起任务唤醒,可详细跟踪函数OsFutexWaitTask,如果没有任务再等锁了就DeleteKey
    STATIC INT32 OsFutexWakeTask(UINTPTR futexKey, UINT32 flags, INT32 wakeNumber, FutexNode **newHeadNode, BOOL *wakeAny)
     {
         UINT32 intSave;
         FutexNode *node = NULL;
         FutexNode *headNode = NULL;
         UINT32 index = OsFutexKeyToIndex(futexKey, flags);
         FutexHash *hashNode = &g_futexHash[index];
         FutexNode tempNode = { //先组成一个临时快锁节点,目的是为了找到哈希桶中是否有这个节点
             .key = futexKey,
             .index = index,
             .pid = (flags & FUTEX_PRIVATE) ? LOS_GetCurrProcessID() : OS_INVALID,
         };
    
         node = OsFindFutexNode(&tempNode);//找快锁节点
         if (node == NULL) {
             return LOS_EBADF;
         }
    
         headNode = node;
    
         SCHEDULER_LOCK(intSave);
         OsFutexCheckAndWakePendTask(headNode, wakeNumber, hashNode, newHeadNode, wakeAny);//再找到等这把锁的唤醒指向数量的任务
         if ((*newHeadNode) != NULL) {
             OsFutexReplaceQueueListHeadNode(headNode, *newHeadNode);
             OsFutexDeinitFutexNode(headNode);
         } else if (headNode->index < FUTEX_INDEX_MAX) {
             OsFutexDeleteKeyFromFutexList(headNode);
             OsFutexDeinitFutexNode(headNode);
         }
         SCHEDULER_UNLOCK(intSave);
    
         return LOS_OK;
     }

百文说内核 | 抓住主脉络

  • 百文相当于摸出内核的肌肉和器官系统,让人开始丰满有立体感,因是直接从注释源码起步,在加注释过程中,每每有心得处就整理,慢慢形成了以下文章。内容立足源码,常以生活场景打比方尽可能多的将内核知识点置入某种场景,具有画面感,容易理解记忆。说别人能听得懂的话很重要! 百篇博客绝不是百度教条式的在说一堆诘屈聱牙的概念,那没什么意思。更希望让内核变得栩栩如生,倍感亲切。
  • 与代码需不断debug一样,文章内容会存在不少错漏之处,请多包涵,但会反复修正,持续更新,v**.xx 代表文章序号和修改的次数,精雕细琢,言简意赅,力求打造精品内容。
  • 百文在 < 鸿蒙研究站 | 开源中国 | 博客园 | 51cto | csdn | 知乎 | 掘金 > 站点发布,百篇博客系列目录如下。

按功能模块:

基础知识 进程管理 任务管理 内存管理
双向链表 内核概念 源码结构 地址空间 计时单位 优雅的宏 钩子框架 位图管理 POSIX main函数 调度故事 进程控制块 进程空间 线性区 红黑树 进程管理 Fork进程 进程回收 Shell编辑 Shell解析 任务控制块 并发并行 就绪队列 调度机制 任务管理 用栈方式 软件定时器 控制台 远程登录 协议栈 内存规则 物理内存 内存概念 虚实映射 页表管理 静态分配 TLFS算法 内存池管理 原子操作 圆整对齐
通讯机制 文件系统 硬件架构 内核汇编
通讯总览 自旋锁 互斥锁 快锁使用 快锁实现 读写锁 信号量 事件机制 信号生产 信号消费 消息队列 消息封装 消息映射 共享内存 文件概念 文件故事 索引节点 VFS 文件句柄 根文件系统 挂载机制 管道文件 文件映射 写时拷贝 芯片模式 ARM架构 指令集 协处理器 工作模式 寄存器 多核管理 中断概念 中断管理 编码方式 汇编基础 汇编传参 链接脚本 内核启动 进程切换 任务切换 中断切换 异常接管 缺页中断
编译运行 调测工具
编译过程 编译构建 GN语法 忍者无敌 ELF格式 ELF解析 静态链接 重定位 动态链接 进程映像 应用启动 系统调用 VDSO 模块监控 日志跟踪 系统安全 测试用例

百万注源码 | 处处扣细节

  • 百万汉字注解内核目的是要看清楚其毛细血管,细胞结构,等于在拿放大镜看内核。内核并不神秘,带着问题去源码中找答案是很容易上瘾的,你会发现很多文章对一些问题的解读是错误的,或者说不深刻难以自圆其说,你会慢慢形成自己新的解读,而新的解读又会碰到新的问题,如此层层递进,滚滚向前,拿着放大镜根本不愿意放手。

  • < gitee | github | coding | gitcode > 四大码仓推送 | 同步官方源码。

关注不迷路 | 代码即人生

期间不断得到小伙伴的支持,有学生,有职场新人,也有老江湖,在此一并感谢,大家的支持是前进的动力。尤其每次收到学生的赞助很感慨,后生可敬。 >> 查看捐助名单

据说喜欢 点赞 + 分享 的,后来都成了大神。:)

Clone this wiki locally