亲宝软件园·资讯

展开

GoLang协程库libtask学习笔记

Onemorelight95 人气:0

协程解决了什么问题

我们先从一次网络IO请求过程中的read操作为例,请求数据会先拷贝到系统内核空间中,再从操作系统的内核空间拷贝到应用程序的用户空间中。从内核空间将数据拷贝到用户空间过程中,会经历两个阶段:

因为有这两个阶段,所以就有了各种网络IO的模型:

同步编程:应用程序等待IO结果(比如等待打开一个大的文件,或者等待远端服务器的响应),阻塞当前线程。

异步多线程/进程:将IO操作频繁的逻辑、或者单纯的IO操作独立到一/多个线程中,业务线程与IO线程间靠通信/全局变量来共享数据。

异步消息+回调函数:设计一个消息循环处理器,接收外部消息(包括系统通知和网络报文等),收到消息时调用注册的回调函数。

而协程,就是用同步的语义去解决异步问题,即业务逻辑看起来是同步的,但实际上并不阻塞当前线程(一般是靠事件循环处理来分发消息)。所以协程实际上是在单线程的环境下实现的应用程序级别的并发,就是把本来由操作系统控制的切换+保存状态在应用程序里面实现了。

由于协程在应用程序级别来处理任务,所以协程更像是一个函数,只是比普通的函数多了两个动作:yield()resume(),即让出和恢复。让出的时候我们需要将寄存器的协程上下文保存起来,恢复的时候再将上下文重新压入寄存器,继续执行。

简介

Libtask 是一个简单的协程库,它展示了最简单的一种协程实现方式。操作系统只能看见一个内核线程,无法感知到客户端协程的存在。

Libtask中的协程是协作式的,也就是说,使用的不是时间片轮转算法,调度器根据先来先服务的策略来执行就绪队列中的协程,只有当每个协程主动退出时调度器才会把CPU分配给下一个协程。

对协程的抽象

在libtask中,协程被抽象成一个Task结构体,结构体中的字段用于描述协程的相关信息:

// 一个Task可以看成是一个需要异步执行的任务,coroutine的抽象描述
struct Task
{
	char	name[256];
	char	state[256];
    // 前后指针
	Task	*next;
	Task	*prev;
	Task	*allnext;
	Task	*allprev;
    // 执行上下文
	Context	context;
    // 睡眠时间
	uvlong	alarmtime;
	uint	id;
    // 协程栈指针
	uchar	*stk;
    // 协程栈大小
	uint	stksize;
    // 协程是否退出了
	int	exiting;
    // 在在alltask的中的索引下标
	int	alltaskslot;
    // 是否是系统协程
	int	system;
    // 是否在就绪状态
	int	ready;
    // Task需要执行的函数
	void	(*startfn)(void*);
    // startfn的参数
	void	*startarg;
    // 自定义数据
	void	*udata;
};

创建协程

int taskcreate(void (*fn)(void*), void *arg, uint stack)
{
	int id;
	Task *t;
    // 分配task和stack的空间
	t = taskalloc(fn, arg, stack);
	// 协程的数量+1
	taskcount++;
	id = t->id;
	if(nalltask%64 == 0){
		alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0]));
		if(alltask == nil){
			fprint(2, "out of memory\n");
			abort();
		}
	}
    // 记录位置
	t->alltaskslot = nalltask;
    // 保存到alltask中
	alltask[nalltask++] = t;
    // 修改状态为就绪,可以被调度,并且加入到就绪队列
	taskready(t);
	return id;
}

我们可以使用taskcreate函数来创建协程,在taskcreate函数中,首先会调用taskalloc函数为Task和执行栈分配内存,然后初始化协程的上下文信息,在此之后,一个协程就被创建成功了。

/*
 * taskalloc函数的主要逻辑是申请Task结构体所需的内存和执行时栈的内存,
 * 然后初始化各个字段。在此之后,一个协程就被创建成功了,接着执行taskready
 * 把协程加入就绪队列中。
 * */
static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{
	Task *t;
	sigset_t zero;
	uint x, y;
	ulong z;
	/* allocate the task and stack together */
	// 结构体本身的大小和栈大小
	// 协程栈大小是256*1024
	t = malloc(sizeof *t+stack);
	if(t == nil){
		fprint(2, "taskalloc malloc: %r\n");
		abort();
	}
	memset(t, 0, sizeof *t);
	// 栈的内存位置
	t->stk = (uchar*)(t+1);
	// 栈大小
	t->stksize = stack;
	// 协程id
	t->id = ++taskidgen;
	// 协程工作函数和参数
	t->startfn = fn;
	t->startarg = arg;
	/* do a reasonable initialization */
	memset(&t->context.uc, 0, sizeof t->context.uc);
	sigemptyset(&zero);
	// 初始化uc_sigmask字段为空,即不阻塞信号
	sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
	/* must initialize with current context */
	// 初始化uc字段
	if(getcontext(&t->context.uc) < 0){
		fprint(2, "getcontext: %r\n");
		abort();
	}
	/* call makecontext to do the real work. */
	/* leave a few words open on both ends */
	// 设置协程执行时的栈位置和大小
	t->context.uc.uc_stack.ss_sp = t->stk+8;
	t->context.uc.uc_stack.ss_size = t->stksize-64;
#if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE)		/* sigh */
#warning "doing sun thing"
	/* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */
	t->context.uc.uc_stack.ss_sp = 
		(char*)t->context.uc.uc_stack.ss_sp
		+t->context.uc.uc_stack.ss_size;
#endif
	/*
	 * All this magic is because you have to pass makecontext a
	 * function that takes some number of word-sized variables,
	 * and on 64-bit machines pointers are bigger than words.
	 */
//print("make %p\n", t);
	z = (ulong)t;
	y = z;
	z >>= 16;	/* hide undefined 32-bit shift from 32-bit compilers */
	x = z>>16;
	// 保存信息到uc字段
	makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
	return t;
}

创建好一个协程之后,taskcreate函数会调用taskready()函数把协程的状态修改为就绪态,并加入到就绪队列中。

/*
 * 修改协程的状态并加入就绪队列
 * */
void taskready(Task *t)
{
	t->ready = 1;
	addtask(&taskrunqueue, t);
}

如何保存上下文信息

我们可以发现,在调用taskalloc函数初始化协程的时候,我们还会对协程的上下文进行初始化,以下代码的流程是将当前CPU寄存器的上下文信息保存到当前Task的上下文中,同时将当前Task的栈位置和大小保存进上下文中,最后将协程的工作函数保存到上下文信息中。

	// 将上下文置为零值
	memset(&t->context.uc, 0, sizeof t->context.uc);
	// 将信号集zero清空
	sigemptyset(&zero);
	// 初始化uc_sigmask字段为空,即不阻塞信号
	sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
	// 将当前上下文信息保存到t-context.uc结构体中
	if(getcontext(&t->context.uc) < 0){
		fprint(2, "getcontext: %r\n");
		abort();
	}
	// 设置协程执行时的栈位置和大小
	t->context.uc.uc_stack.ss_sp = t->stk+8;
	t->context.uc.uc_stack.ss_size = t->stksize-64;
	z = (ulong)t;
	y = z;
	z >>= 16;
	x = z>>16;
	// 设置协程的工作函数到上下文信息中
	makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);

ucontext族函数

其实对协程上下文的初始化以及保存是通过linux下的ucontext族函数来实现的。

ucontext_t结构体

我们发现Task结构体中有一个Context字段,这个字段其实就是对ucontext_t结构体的封装,ucontext_t结构体用于保存当前的上下文信息,它的结构是这样的:

typedef struct ucontext
  {
    unsigned long int uc_flags;
    struct ucontext *uc_link;//后序上下文
    __sigset_t uc_sigmask;// 信号屏蔽字掩码
    stack_t uc_stack;// 上下文所使用的栈
    // 保存的上下文的寄存器信息
    // 比如pc、sp、bp
    // pc程序计数器:记录下一条指令的地址
    // sp堆栈指针:指向函数调用栈栈顶的指针,所以新数据入栈将存入sp+1的地址
    // bp基址指针:指向函数调用栈的首地址
    mcontext_t uc_mcontext;
    long int uc_filler[5];
  } ucontext_t;
//其中mcontext_t 定义如下
typedef struct
  {
    gregset_t __ctx(gregs);//所装载寄存器
    fpregset_t __ctx(fpregs);//寄存器的类型
} mcontext_t;
//其中gregset_t 定义如下
typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息

getcontext()函数

函数原型:

int getcontext(ucontext_t* ucp)

getcontext()函数的底层是通过汇编来实现的,其主要的功能是将当前运行到的寄存器信息保存到参数ucp中。

setcontext()函数

函数原型:

int setcontext(const ucontext_t *ucp)

setcontext()函数的作用是将ucontext_t结构体变量ucp中的上下文信息重新恢复到cpu中并执行。

makecontext()函数

函数原型:

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)

makecontext()函数的主要功能是设置协程的工作函数到上下文(ucontext_t)中,同时在用户设置的栈上保存一些信息,并且设置栈顶指针的值到上下文信息中。

argc是入口函数的参数个数,后面的...是具体的入口函数参数,该参数必须是整形值。

swapcontext()函数

函数原型:

int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

该函数可以将当前cpu中的上下文信息保存到oucp结构体变量中,然后将ucp结构体的上下文信息恢复到cpu中。

这里可以理解为调用了两个函数,第一次是调用了getcontext(oucp)然后再调用setcontext(ucp)

协程的调度

在使用taskcreate创建协程的时候,这个函数内部会调用taskready函数修改新建协程的状态并加入就绪队列taskrunqueue中,taskrunqueue中的协程需要一个调度器来调度执行。

tasklib库中实现了一个协程调度中心的函数。调度中心会不断的从就绪队列中取出协程来执行,它的核心逻辑是这样的:

static void taskscheduler(void)
{
	int i;
	Task *t;
	taskdebug("scheduler enter");
	for(;;){
		// 如果没有就绪态协程了,就退出
		if(taskcount == 0)
			exit(taskexitval);
		// 从就绪队列中拿出一个协程
		t = taskrunqueue.head;
		if(t == nil){
			fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);
			exit(1);
		}
		// 从就绪队列中删除这个协程
		deltask(&taskrunqueue, t);
		 // 将协程状态改为非就绪态
		t->ready = 0;
		// 保存正在执行的协程
		taskrunning = t;
		// 切换次数+1
		tasknswitch++;
		taskdebug("run %d (%s)", t->id, t->name);
		// 切换到t执行,将当前cpu中的上下文信息保存到taskschedcontext中
        // 然后将t->context中的上下文信息恢复到cpu中执行
		contextswitch(&taskschedcontext, &t->context);
		// 执行结束
		taskrunning = nil;
		// 刚才执行的协程t退出了
		if(t->exiting){
			// 如果不是系统协程,协程个数减一
			if(!t->system)
				taskcount--;
			// 保存当前协程在alltask的索引
			i = t->alltaskslot;
			// 将最后一个协程切换到当前协程的位置,因为当前协程要退出了
			alltask[i] = alltask[--nalltask];
			// 更新被置换协程的索引
			alltask[i]->alltaskslot = i;
			// 释放堆内存
			free(t);
		}
	}
}

从上述调度器执行的代码中可以发现,我们使用contextswitch函数实现了协程间的上下文切换,这个函数的内部调用了swapcontext(&from->uc, &to->uc)函数,这个函数将当前cpu中的上下文信息保存到from结构体变量中,然后将to结构体的上下文信息恢复到cpu中执行。

执行结束之后,会重新将上下文切换回调度中心。

static void contextswitch(Context *from, Context *to)
{
	if(swapcontext(&from->uc, &to->uc) < 0){
		fprint(2, "swapcontext failed: %r\n");
		assert(0);
	}
}

其实我们还可以通过调用taskyield函数来控制协程在没有执行完就主动让出,那么当前正在执行的task会被 插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。

/*
 * 协程主动让出CPU
 * 1.将主动让出的协程重新加入到就绪队列
 * 2.将当前协程的状态标记为让出
 * 3.执行协程切换的逻辑
 * */
int taskyield(void)
{
	int n;
	// 协程的让出次数
	n = tasknswitch;
	// 将当前主动让出的协程放进等待队列
	taskready(taskrunning);
	// 标记当前协程的状态为“让出”
	taskstate("yield");
	// 切换协程
	taskswitch();
	// 等于0说明当前只有自己一个协程,调度的时候taskswitch加1,所以这里要减1
	return tasknswitch - n - 1;
}

我们可以发现,切换流程的时候实际上是调用了taskswitch()函数,这个函数内部会调用contextswitch函数来切换上下文。

/*
 * 切换协程
 * */
void taskswitch(void)
{
	needstack(0);
	// 将当前CPU中的上下文信息保存到taskrunning->context结构体中
	// 然后将调度中心上下文恢复到CPU中执行
	contextswitch(&taskrunning->context, &taskschedcontext);
}

总结

所以整个调度流程是这样的:

每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。

当然我们也可以调用taskyield函数主动让出CPU,他会将当前正在执行的task插入就绪队列的尾部,等待后续的调度,然后调度器会从就绪队列的头部重新取出一个task来执行。

加载全部内容

相关教程
猜你喜欢
用户评论