汇编代码解析synchronized偏向锁
自成溪 人气:0前言
我们都知道java之所以跨平台能力强,是因为java在编译期没有被编译成机器码,而是被编译成字节码。早期的jvm会将编译好的字节码翻译成机器码解释执行,我们在jvm的源码中还可以看到早期的解释器——bytecodeInterpreter.cpp(虽然已经不再使用)。对于字节码这种总数固定,解释逻辑固定的命令,现代jvm将其执行进行了优化,在jvm初始化的时候,直接将每个字节码指令将要执行的汇编代码加载到内存中,在运行时执行某段字节码时直接调用内存中对应的汇编代码即可,这样的解释器就时模板解释器——templateTable.cpp。而synchronized修饰代码块时,其编译成字节码后就是monitorenter和monitorexit(关于如何查看编译后的字节码可以查看笔者往期的博客)。
所以要想看现代jvm的synchronized实现还要从模板解释器(templateTable)的monitorenter方法看起(网上许多文章都是从bytecodeInterpreter开始分析,虽然大致逻辑一样,更有甚者将偏向锁撤销逻辑硬是理解成偏向锁加锁逻辑,非常混乱),本文笔者就从模板解释器汇编源码开始分析还原最真实的偏向锁实现,解释monitorenter字节码命令的方法开始,从汇编代码开始全面解析synchronized。
一.TemplateTable::monitorenter()
关于这个monitorenter()方法,主要包括在方法栈帧中获取lockRecord以及若lockRecord不够则扩容的逻辑,由于这部分代码是将字节码直接解释成机器码,所以以方法名的形式将机器码封装成了对应的汇编命令,我们碰到的汇编方法将其当成对应的汇编命令即可(值得注意的是里面又很多jmp,jcc,jccb等跳转指令,由于篇幅有限本文就不过多介绍,有兴趣的读者可以自行了解,本文就将其当成跳转指令),其他汇编命令也比较简单,这里就不过多介绍,读者如果碰到相关不熟悉的命令可以自行搜索下相关概念,好了话不多说我们直接看源码:
void TemplateTable::monitorenter() { transition(atos, vtos); // 检查对象是否为null,此时对象存在rax寄存器中 __ null_check(rax); // rbp是堆栈寄存器,通常指向栈底 // 栈帧中存在一个monitor数组用于保存锁相关信息,又叫lockRecord(后面都统称为lockRecord) // frame::interpreter_frame_monitor_block_top_offset和frame::interpreter_frame_initial_sp_offset // 表示monitor top 和monitor bot偏移量 // Address(x, j)表示距离x地址j偏移量的地址 // 所以这里声明的两个变量我们可以简单理解为栈帧中的monitor top 和monitor bot地址 const Address monitor_block_top( rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize); const Address monitor_block_bot( rbp, frame::interpreter_frame_initial_sp_offset * wordSize); const int entry_size = frame::interpreter_frame_monitor_size() * wordSize; Label allocated; // 初始化c_rarg1寄存器中的值(这里本质是一个异或运算) __ xorl(c_rarg1, c_rarg1); // points to free slot or NULL // 这部分代码逻辑是循环从lockRecord数组中找到一个空的槽位,并将其放入c_rarg1寄存器中 { Label entry, loop, exit; __ movptr(c_rarg3, monitor_block_top); __ lea(c_rarg2, monitor_block_bot); // 直接跳到entry标签位 __ jmpb(entry); // 绑定loop标签开始循环 __ bind(loop); // 检查当前LockRecord是否被使用 __ cmpptr(Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes()), (int32_t) NULL_WORD); // 没有被使用则将其放到c_rarg1 __ cmov(Assembler::equal, c_rarg1, c_rarg3); // 检查和当前对象是否一样 __ cmpptr(rax, Address(c_rarg3, BasicObjectLock::obj_offset_in_bytes())); // 如果一样则表示重入,跳出循环 __ jccb(Assembler::equal, exit); // 否则则跳到下一个entry __ addptr(c_rarg3, entry_size); // 绑定entry标签 __ bind(entry); // 比较c_rarg3与c_rarg2寄存器中的值,即是否相等 __ cmpptr(c_rarg3, c_rarg2); // 若不等则跳到loop继续循环 __ jcc(Assembler::notEqual, loop); __ bind(exit); } //检测一个空槽位是否被找到(如果是重入则不会跳转会去新申请) __ testptr(c_rarg1, c_rarg1); //找到则跳到 allocated标签 __ jcc(Assembler::notZero, allocated); // 如果没有空的slot则申请一个,这里还包括了申请后调整位置的逻辑 { Label entry, loop; // 将lockrecord底部的指针放到c_rarg1寄存器中 __ movptr(c_rarg1, monitor_block_bot); // 计算并移动栈顶和栈底到新位置,均移动entry_size(rsp寄存器指向栈顶) __ subptr(rsp, entry_size); __ subptr(c_rarg1, entry_size); // 设置新的栈顶位置和栈底位置分别到c_rarg3寄存器和monitor_block_bot地址上 __ mov(c_rarg3, rsp); __ movptr(monitor_block_bot, c_rarg1); // 跳到entry标签——为了先比较下然后开始循环 // c_rarg1则是新的空slot __ jmp(entry); __ bind(loop); // 这两行是将老栈顶位置的值存到新栈顶位置 __ movptr(c_rarg2, Address(c_rarg3, entry_size)); __ movptr(Address(c_rarg3, 0), c_rarg2); // 推进到下一个位置 __ addptr(c_rarg3, wordSize); __ bind(entry); __ cmpptr(c_rarg3, c_rarg1); __ jcc(Assembler::notEqual, loop); } // 绑定allocated标签 __ bind(allocated); __ increment(r13); // 保存对象到lockRecord中,locrRecord对象有两个属性分别是对象指针和锁 // BasicObjectLock::obj_offset_in_bytes()也表示偏移量 __ movptr(Address(c_rarg1, BasicObjectLock::obj_offset_in_bytes()), rax); // 加锁方法 __ lock_object(c_rarg1); // 检查以确保该监视器在锁定后不会导致堆栈溢出 __ save_bcp(); __ generate_stack_overflow_check(0); // 调用下一个指令 __ dispatch_next(vtos); }
我们看到下一个方法是_lock_object()方法,这个方法我们等下在分析,在这之前笔者先介绍下我们源码中看到的lockRecord,其实时basicLock.cpp中的BasicObjectLock类:
//只有两个属性 class BasicObjectLock VALUE_OBJ_CLASS_SPEC { private: //锁对象 BasicLock _lock; //表示持有锁的对象 oop _obj; } //再来看看锁对象——只有一个属性 class BasicLock VALUE_OBJ_CLASS_SPEC { private: //markword一般保存的是持有锁对象的markword volatile markOop _displaced_header; }
可以看到lockRecord是用于关联对象和锁的关系的,如果在当前方法中有加锁的对象,就会在解释栈帧中添加一个lockRecord用于记录相应的对象和锁的关系,不仅如此lockRecord还会隐式的锁重入的计数器,当发生重入时,就会为同一个对象创建多个lockRecord。从源码中我们也可以看到在解释的方法执行期间,lockRecord的数组会根据持有的锁数量增长或缩小。
二.lock_object():
接下来我们来一起看看lock_object()方法:
//在interp_masm_x86_64.cpp文件中 void InterpreterMacroAssembler::lock_object(Register lock_reg) { assert(lock_reg == c_rarg1, "The argument is only for looks. It must be c_rarg1"); //判断是否强制使用重锁,默认是false if (UseHeavyMonitors) { call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); } else { //定义完成标签 Label done; const Register swap_reg = rax; const Register obj_reg = c_rarg3; //声明一些偏移量 const int obj_offset = BasicObjectLock::obj_offset_in_bytes(); const int lock_offset = BasicObjectLock::lock_offset_in_bytes (); const int mark_offset = lock_offset + BasicLock::displaced_header_offset_in_bytes(); Label slow_case; // 传入的basicObjectLock中的对象地址存到obj_reg中,即c_rarg3寄存器中 movptr(obj_reg, Address(lock_reg, obj_offset)); //使用偏向锁 if (UseBiasedLocking) { //偏向锁加锁方法 biased_locking_enter(lock_reg, obj_reg, swap_reg, rscratch1, false, done, &slow_case); } //后面的方法是关于偏向锁撤销和升级的,不是本文重点,本文先略过 movl(swap_reg, 1); orptr(swap_reg, Address(obj_reg, 0)); movptr(Address(lock_reg, mark_offset), swap_reg); assert(lock_offset == 0, "displached header must be first word in BasicObjectLock"); if (os::is_MP()) lock(); cmpxchgptr(lock_reg, Address(obj_reg, 0)); if (PrintBiasedLockingStatistics) { cond_inc32(Assembler::zero, ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr())); } jcc(Assembler::zero, done); subptr(swap_reg, rsp); andptr(swap_reg, 7 - os::vm_page_size()); movptr(Address(lock_reg, mark_offset), swap_reg); if (PrintBiasedLockingStatistics) { cond_inc32(Assembler::zero, ExternalAddress((address) BiasedLocking::fast_path_entry_count_addr())); } jcc(Assembler::zero, done); bind(slow_case); call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); bind(done); } }
三.biased_locking_enter()
1).参数
lock_object()调用的biased_locking_enter()方法中才是真正偏向锁逻辑,我们这里介绍下传入的几个参数(便于我们后续分析):
lock_reg
:lock_object方法传入的空的lockRecord,其内部已经保存了当前要加锁的对象
obj_reg
:持有对象的寄存器,其内部保存了将要加锁的对象
swap_reg
:目前是一个空的寄存器,会用于保存中间值
rscratch1
: 临时寄存器,用于保存中间值
done
:done标签方便直接跳到方法结束
&slow_case
:slow_case标签方便直接跳到锁升级逻辑
比较重要的是前两个参数,分别保存我们要判断的lockRecord和对象,后面两个参数其实是方便我们直接跳转到对应逻辑的标签。
2).概念
这个方法中还会涉及到一些概念,网上也有一些介绍,笔者先简单介绍下,方便大家阅读:
markword
:一般用二进制表示,对象头中的markword,主要用来表示对象的线程锁状态,另外还可以用来配合GC、存放该对象的hashCode
这张图就表示markword的几个状态。
klassword
:一个指向方法区中Class信息的指针一般用二进制表示,通过指针可以获取其相应的klass对象(即方法区中表示class信息的对象)
偏向模式:表示对象是否当前是偏向状态,即markword最后三位是否是101,这里需要注意的是不光普通对象具有偏向模式,klass对象也有偏向模式,具体可以在systemDictionary.cpp 的update_dictionary方法中可以看到,所有创建的klass锁状态起始是001,然后会被更新为101。创建普通对象时会将klass中的markword填充到oop对象中。Klass对象除了再刚开始创建时锁状态时001,再进行批量偏向锁撤销的时候也会恢复成001(这部分不是本文重点,具体细节就先不分析)。所以当一个对象是偏向模式时,其不一定是持有偏向锁的,因为对象刚创建出来其markword后三位即101,需要通过线程ID,epoch来判断其是否持有偏向锁。
3).源码
让我们继续看biased_locking_enter()方法:
//调用的是macroAssembler_x86.cpp中的方法 int MacroAssembler::biased_locking_enter(Register lock_reg, Register obj_reg, Register swap_reg, Register tmp_reg, bool swap_reg_contains_mark, Label& done, Label* slow_case, BiasedLockingCounters* counters) { ...... bool need_tmp_reg = false; //noreg是一个宏,表示空的寄存器 if (tmp_reg == noreg) { need_tmp_reg = true; tmp_reg = lock_reg; } else { assert_different_registers(lock_reg, obj_reg, swap_reg, tmp_reg); } //定义一些地址,分别是markword,klass和lockRecord中的锁对象地址 Address mark_addr (obj_reg, oopDesc::mark_offset_in_bytes()); Address klass_addr (obj_reg, oopDesc::klass_offset_in_bytes()); Address saved_mark_addr(lock_reg, 0); // 偏向锁逻辑开始 // 分支1:查看当前对象是否开启偏向模式 Label cas_label; int null_check_offset = -1; //swap_reg_contains_mark传入的是false,表示swap_reg不包括markword地址 if (!swap_reg_contains_mark) { null_check_offset = offset(); //将对象的markword放入swap_reg movl(swap_reg, mark_addr); } if (need_tmp_reg) { push(tmp_reg); } //将对象的markword放入tmp_reg movl(tmp_reg, swap_reg); //取其锁标记位(与指令) //markOopDesc::biased_lock_mask_in_place=111 这里是取markword的后三位到tmp_reg寄存器中 andl(tmp_reg, markOopDesc::biased_lock_mask_in_place); //判断是否有锁(比较指令) //markOopDesc::biased_lock_pattern=101 cmpl(tmp_reg, markOopDesc::biased_lock_pattern); if (need_tmp_reg) { pop(tmp_reg); } //如果不相等则表示没有开启对象偏向模式(即已经是轻量级锁)则跳到cas_label标签到方法末尾 jcc(Assembler::notEqual, cas_label); // 分支2:相等则表示对象markword后三位是101即现在对象是偏向锁模式(但不一定持有偏向锁) // 这部分的逻辑是将线程id和epoch信息做比对,判断是否已经持有偏向锁 movl(saved_mark_addr, swap_reg); if (need_tmp_reg) { push(tmp_reg); } //获取线程id get_thread(tmp_reg); //对象的markword与线程id异或,若线程id部分一样则线程id部分会变成0 xorl(swap_reg, tmp_reg); if (swap_reg_contains_mark) { null_check_offset = offset(); } //将klass放入tmp_reg寄存器 movl(tmp_reg, klass_addr); //与klass的markword异或,若两者同位部分一样则同位会变成0,这里是为了判断epoch和锁标志位是否与klass一样 xorl(swap_reg, Address(tmp_reg, Klass::prototype_header_offset())); //设置分代年龄掩码即年龄为0 andl(swap_reg, ~((int) markOopDesc::age_mask_in_place)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->biased_lock_entry_count_addr())); } //前面已经处理过markword,将其关键信息已经存入swap_reg中,后面只使用swap_reg进行判断 //如果swap等于0,则表明线程id是本线程id,且epoch和锁标志位都与klass中的一样,即已经偏向本线程,跳到加锁结束 jcc(Assembler::equal, done); //定义撤销偏向锁标签 Label try_revoke_bias; //定义重偏向锁标签 Label try_rebias; //若不等则证明线程id,epoch和锁标志位有不一样的 //分支3:先判断锁标志位,即判断类的偏向模式是否是关闭 //test可以理解为与运算 //因为之前已经判断过对象的是偏向模式,而klass与对象的锁标记位不等,则证明klass对象不是偏向模式 //如果类偏向模式是关闭,表明正在进行批量撤销偏向锁的行为,即正在进行锁升级 //所以需要cas替换修复对象的markword,修复成类的markword,跳到撤销标签 testl(swap_reg, markOopDesc::biased_lock_mask_in_place); jcc(Assembler::notZero, try_revoke_bias); //分支4:再判断是否是epoch过期,过期则跳到重偏向标签 testl(swap_reg, markOopDesc::epoch_mask_in_place); jcc(Assembler::notZero, try_rebias); //分支5:到这里只剩线程id并不是本线程,进行一次cas替换尝试加偏向锁 //将对象的markword读到swap_reg中 movl(swap_reg, saved_mark_addr); //进行与运算,获取对象markword的锁标志位和age,epoch用来构造一个新的带锁的markword andl(swap_reg, markOopDesc::biased_lock_mask_in_place | markOopDesc::age_mask_in_place | markOopDesc::epoch_mask_in_place); if (need_tmp_reg) { push(tmp_reg); } get_thread(tmp_reg); //将线程id也加入到构造的markword中 orl(tmp_reg, swap_reg); //判断是否是多核cpu如果是则加锁——执行Lock命令 if (os::is_MP()) { lock(); } //cas替换对象的对象的markword为刚刚构造的持有锁信息的markword //Address(obj_reg, 0)表示对象的markword位置 cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->anonymously_biased_lock_entry_count_addr())); } //cas不为0则证明偏向我们失败,意味着有另一个线程成功偏向,有竞争 //则进入slow逻辑,跳转到slow_case标签,执行撤销升级逻辑 if (slow_case != NULL) { jcc(Assembler::notZero, *slow_case); } //成功证明已经偏向成功,跳转到done标签 jmp(done); //epoch过期,重新偏向标签 bind(try_rebias); if (need_tmp_reg) { push(tmp_reg); } //获取当前线程ID get_thread(tmp_reg); movl(swap_reg, klass_addr); //或运算,以klass的markword为基础和线程id组合构成新的markword orl(tmp_reg, Address(swap_reg, Klass::prototype_header_offset())); movl(swap_reg, saved_mark_addr); if (os::is_MP()) { lock(); } //将新构造的markword cas替换 对象的markword cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->rebiased_lock_entry_count_addr())); } //偏向失败则证明有另外的线程偏向成功,需要撤销偏向 if (slow_case != NULL) { jcc(Assembler::notZero, *slow_case); } //跳到结束 jmp(done); //撤销偏向,将对象markword重置为klass(类)的markword //这里只有判断类的markword不是偏向标记才会进入,所以会将对象的markword重置为非偏向标记 bind(try_revoke_bias); movl(swap_reg, saved_mark_addr); if (need_tmp_reg) { push(tmp_reg); } //获取对象klass的markword movl(tmp_reg, klass_addr); movl(tmp_reg, Address(tmp_reg, Klass::prototype_header_offset())); if (os::is_MP()) { lock(); } //用对象klass的markword cas替换对象的markword cmpxchgptr(tmp_reg, Address(obj_reg, 0)); if (need_tmp_reg) { pop(tmp_reg); } //无论cas的结果成功与否,都证明有线程撤销成功,所以继续执行 if (counters != NULL) { cond_inc32(Assembler::zero, ExternalAddress((address)counters->revoked_lock_entry_count_addr())); } bind(cas_label); return null_check_offset; }
看完了源码我们可以这样理解偏向锁,添加偏向锁的过程即是在对象处于可偏向模式时,在对象的markword中cas替换对应的线程id标记位,即表示当前线程持有了对象的偏向锁。完整的偏向锁处理逻辑已经分析完了,这里面分支比较多,我们来画图帮助理解下:
从图中我们可以看到若对象持有偏向锁且锁不是偏向本线程,则会最少会进行一次cas替换,若cas替换失败则会进入偏向锁的撤销升级逻辑。因为偏向锁cas替换后会进入撤销升级的逻辑,所以从效率上看偏向锁更适合一个线程不断的获取锁的场景,而事实上偏向锁正是设计用于应对一个线程获取锁的场景。
当然synchronized的执行逻辑还没有结束,本篇博客我们只着重分析偏向锁相关逻辑。笔者后续还会继续分析synchronized的轻量级锁和重量级锁的逻辑,尽量还原最原汁原味的synchronized。
加载全部内容