GoLang协程库libtask学习笔记
Onemorelight95 人气:0协程解决了什么问题
我们先从一次网络IO请求过程中的read操作为例,请求数据会先拷贝到系统内核空间中,再从操作系统的内核空间拷贝到应用程序的用户空间中。从内核空间将数据拷贝到用户空间过程中,会经历两个阶段:
- 等待数据准备
- 拷贝数据
因为有这两个阶段,所以就有了各种网络IO的模型:
同步编程:应用程序等待IO结果(比如等待打开一个大的文件,或者等待远端服务器的响应),阻塞当前线程。
- 优点:逻辑简单。
- 缺点:效率太低,其他与IO无关的业务也要等待IO的响应。
异步多线程/进程:将IO操作频繁的逻辑、或者单纯的IO操作独立到一/多个线程中,业务线程与IO线程间靠通信/全局变量来共享数据。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:线程切换代价相对较高,异步逻辑代码复杂。
异步消息+回调函数:设计一个消息循环处理器,接收外部消息(包括系统通知和网络报文等),收到消息时调用注册的回调函数。
- 优点:充分利用CPU资源,防止阻塞资源。
- 缺点:代码逻辑复杂。
而协程,就是用同步的语义去解决异步问题,即业务逻辑看起来是同步的,但实际上并不阻塞当前线程(一般是靠事件循环处理来分发消息)。所以协程实际上是在单线程的环境下实现的应用程序级别的并发,就是把本来由操作系统控制的切换+保存状态在应用程序里面实现了。
由于协程在应用程序级别来处理任务,所以协程更像是一个函数,只是比普通的函数多了两个动作:yield()
和resume()
,即让出和恢复。让出的时候我们需要将寄存器的协程上下文保存起来,恢复的时候再将上下文重新压入寄存器,继续执行。
简介
Libtask 是一个简单的协程库,它展示了最简单的一种协程实现方式。操作系统只能看见一个内核线程,无法感知到客户端协程的存在。
Libtask中的协程是协作式的,也就是说,使用的不是时间片轮转算法,调度器根据先来先服务的策略来执行就绪队列中的协程,只有当每个协程主动退出时调度器才会把CPU分配给下一个协程。
对协程的抽象
在libtask中,协程被抽象成一个Task结构体,结构体中的字段用于描述协程的相关信息:
// 一个Task可以看成是一个需要异步执行的任务,coroutine的抽象描述 struct Task { char name[256]; char state[256]; // 前后指针 Task *next; Task *prev; Task *allnext; Task *allprev; // 执行上下文 Context context; // 睡眠时间 uvlong alarmtime; uint id; // 协程栈指针 uchar *stk; // 协程栈大小 uint stksize; // 协程是否退出了 int exiting; // 在在alltask的中的索引下标 int alltaskslot; // 是否是系统协程 int system; // 是否在就绪状态 int ready; // Task需要执行的函数 void (*startfn)(void*); // startfn的参数 void *startarg; // 自定义数据 void *udata; };
创建协程
int taskcreate(void (*fn)(void*), void *arg, uint stack) { int id; Task *t; // 分配task和stack的空间 t = taskalloc(fn, arg, stack); // 协程的数量+1 taskcount++; id = t->id; if(nalltask%64 == 0){ alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0])); if(alltask == nil){ fprint(2, "out of memory\n"); abort(); } } // 记录位置 t->alltaskslot = nalltask; // 保存到alltask中 alltask[nalltask++] = t; // 修改状态为就绪,可以被调度,并且加入到就绪队列 taskready(t); return id; }
我们可以使用taskcreate
函数来创建协程,在taskcreate
函数中,首先会调用taskalloc
函数为Task和执行栈分配内存,然后初始化协程的上下文信息,在此之后,一个协程就被创建成功了。
/* * taskalloc函数的主要逻辑是申请Task结构体所需的内存和执行时栈的内存, * 然后初始化各个字段。在此之后,一个协程就被创建成功了,接着执行taskready * 把协程加入就绪队列中。 * */ static Task* taskalloc(void (*fn)(void*), void *arg, uint stack) { Task *t; sigset_t zero; uint x, y; ulong z; /* allocate the task and stack together */ // 结构体本身的大小和栈大小 // 协程栈大小是256*1024 t = malloc(sizeof *t+stack); if(t == nil){ fprint(2, "taskalloc malloc: %r\n"); abort(); } memset(t, 0, sizeof *t); // 栈的内存位置 t->stk = (uchar*)(t+1); // 栈大小 t->stksize = stack; // 协程id t->id = ++taskidgen; // 协程工作函数和参数 t->startfn = fn; t->startarg = arg; /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); // 初始化uc_sigmask字段为空,即不阻塞信号 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); /* must initialize with current context */ // 初始化uc字段 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } /* call makecontext to do the real work. */ /* leave a few words open on both ends */ // 设置协程执行时的栈位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; #if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE) /* sigh */ #warning "doing sun thing" /* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */ t->context.uc.uc_stack.ss_sp = (char*)t->context.uc.uc_stack.ss_sp +t->context.uc.uc_stack.ss_size; #endif /* * All this magic is because you have to pass makecontext a * function that takes some number of word-sized variables, * and on 64-bit machines pointers are bigger than words. */ //print("make %p\n", t); z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; // 保存信息到uc字段 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x); return t; }
创建好一个协程之后,taskcreate
函数会调用taskready()
函数把协程的状态修改为就绪态,并加入到就绪队列中。
/* * 修改协程的状态并加入就绪队列 * */ void taskready(Task *t) { t->ready = 1; addtask(&taskrunqueue, t); }
如何保存上下文信息
我们可以发现,在调用taskalloc
函数初始化协程的时候,我们还会对协程的上下文进行初始化,以下代码的流程是将当前CPU寄存器的上下文信息保存到当前Task的上下文中,同时将当前Task的栈位置和大小保存进上下文中,最后将协程的工作函数保存到上下文信息中。
// 将上下文置为零值 memset(&t->context.uc, 0, sizeof t->context.uc); // 将信号集zero清空 sigemptyset(&zero); // 初始化uc_sigmask字段为空,即不阻塞信号 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); // 将当前上下文信息保存到t-context.uc结构体中 if(getcontext(&t->context.uc) < 0){ fprint(2, "getcontext: %r\n"); abort(); } // 设置协程执行时的栈位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; z = (ulong)t; y = z; z >>= 16; x = z>>16; // 设置协程的工作函数到上下文信息中 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
ucontext族函数
其实对协程上下文的初始化以及保存是通过linux下的ucontext族函数来实现的。
ucontext_t结构体
我们发现Task结构体中有一个Context字段,这个字段其实就是对ucontext_t
结构体的封装,ucontext_t
结构体用于保存当前的上下文信息,它的结构是这样的:
typedef struct ucontext { unsigned long int uc_flags; struct ucontext *uc_link;//后序上下文 __sigset_t uc_sigmask;// 信号屏蔽字掩码 stack_t uc_stack;// 上下文所使用的栈 // 保存的上下文的寄存器信息 // 比如pc、sp、bp // pc程序计数器:记录下一条指令的地址 // sp堆栈指针:指向函数调用栈栈顶的指针,所以新数据入栈将存入sp+1的地址 // bp基址指针:指向函数调用栈的首地址 mcontext_t uc_mcontext; long int uc_filler[5]; } ucontext_t; //其中mcontext_t 定义如下 typedef struct { gregset_t __ctx(gregs);//所装载寄存器 fpregset_t __ctx(fpregs);//寄存器的类型 } mcontext_t; //其中gregset_t 定义如下 typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()函数
函数原型:
int getcontext(ucontext_t* ucp)
getcontext()
函数的底层是通过汇编来实现的,其主要的功能是将当前运行到的寄存器信息保存到参数ucp中。
setcontext()函数
函数原型:
int setcontext(const ucontext_t *ucp)
setcontext()
函数的作用是将ucontext_t
结构体变量ucp中的上下文信息重新恢复到cpu中并执行。
makecontext()函数
函数原型:
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
makecontext()
函数的主要功能是设置协程的工作函数到上下文(ucontext_t)中,同时在用户设置的栈上保存一些信息,并且设置栈顶指针的值到上下文信息中。
argc是入口函数的参数个数,后面的...
是具体的入口函数参数,该参数必须是整形值。
swapcontext()函数
函数原型:
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
该函数可以将当前cpu中的上下文信息保存到oucp
结构体变量中,然后将ucp
结构体的上下文信息恢复到cpu中。
这里可以理解为调用了两个函数,第一次是调用了getcontext(oucp)
然后再调用setcontext(ucp)
。
协程的调度
在使用taskcreate
创建协程的时候,这个函数内部会调用taskready
函数修改新建协程的状态并加入就绪队列taskrunqueue
中,taskrunqueue
中的协程需要一个调度器来调度执行。
tasklib库中实现了一个协程调度中心的函数。调度中心会不断的从就绪队列中取出协程来执行,它的核心逻辑是这样的:
- 从就绪队列中拿出一个协程t,并把t移出就绪队列。
- 通过
contextswitch
函数将协程t的上下文信息切换到taskschedcontext
中执行。 - 将协程t切换回调度中心,如果t已经退出,修改数据结构,然后回收他的内存,然后继续调度其它的协程执行。这里的调度机制比较简单,是非抢占式的协作式调度,没有时间片的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用
taskyield()
函数让出cpu。
static void taskscheduler(void) { int i; Task *t; taskdebug("scheduler enter"); for(;;){ // 如果没有就绪态协程了,就退出 if(taskcount == 0) exit(taskexitval); // 从就绪队列中拿出一个协程 t = taskrunqueue.head; if(t == nil){ fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount); exit(1); } // 从就绪队列中删除这个协程 deltask(&taskrunqueue, t); // 将协程状态改为非就绪态 t->ready = 0; // 保存正在执行的协程 taskrunning = t; // 切换次数+1 tasknswitch++; taskdebug("run %d (%s)", t->id, t->name); // 切换到t执行,将当前cpu中的上下文信息保存到taskschedcontext中 // 然后将t->context中的上下文信息恢复到cpu中执行 contextswitch(&taskschedcontext, &t->context); // 执行结束 taskrunning = nil; // 刚才执行的协程t退出了 if(t->exiting){ // 如果不是系统协程,协程个数减一 if(!t->system) taskcount--; // 保存当前协程在alltask的索引 i = t->alltaskslot; // 将最后一个协程切换到当前协程的位置,因为当前协程要退出了 alltask[i] = alltask[--nalltask]; // 更新被置换协程的索引 alltask[i]->alltaskslot = i; // 释放堆内存 free(t); } } }
从上述调度器执行的代码中可以发现,我们使用contextswitch
函数实现了协程间的上下文切换,这个函数的内部调用了swapcontext(&from->uc, &to->uc)
函数,这个函数将当前cpu中的上下文信息保存到from结构体变量中,然后将to结构体的上下文信息恢复到cpu中执行。
执行结束之后,会重新将上下文切换回调度中心。
static void contextswitch(Context *from, Context *to) { if(swapcontext(&from->uc, &to->uc) < 0){ fprint(2, "swapcontext failed: %r\n"); assert(0); } }
其实我们还可以通过调用taskyield
函数来控制协程在没有执行完就主动让出,那么当前正在执行的task会被 插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。
/* * 协程主动让出CPU * 1.将主动让出的协程重新加入到就绪队列 * 2.将当前协程的状态标记为让出 * 3.执行协程切换的逻辑 * */ int taskyield(void) { int n; // 协程的让出次数 n = tasknswitch; // 将当前主动让出的协程放进等待队列 taskready(taskrunning); // 标记当前协程的状态为“让出” taskstate("yield"); // 切换协程 taskswitch(); // 等于0说明当前只有自己一个协程,调度的时候taskswitch加1,所以这里要减1 return tasknswitch - n - 1; }
我们可以发现,切换流程的时候实际上是调用了taskswitch()
函数,这个函数内部会调用contextswitch
函数来切换上下文。
/* * 切换协程 * */ void taskswitch(void) { needstack(0); // 将当前CPU中的上下文信息保存到taskrunning->context结构体中 // 然后将调度中心上下文恢复到CPU中执行 contextswitch(&taskrunning->context, &taskschedcontext); }
总结
所以整个调度流程是这样的:
每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。
当然我们也可以调用taskyield
函数主动让出CPU,他会将当前正在执行的task插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。
加载全部内容