首页>>后端>>java->打通JAVA与内核系列之一ReentrantLock锁的实现原理

打通JAVA与内核系列之一ReentrantLock锁的实现原理

时间:2023-12-02 本站 点击:0

作者 | 蒋冲 来源 | 阿里技术公众号

写JAVA代码的同学都知道,JAVA里的锁有两大类,一类是synchronized锁,一类是concurrent包里的锁(JUC锁)。其中synchronized锁是JAVA语言层面提供的能力,在此不展开,本文主要讨论JUC里的ReentrantLock锁。

一 JDK层

1 AbstractQueuedSynchronizer

ReentrantLock的lock(),unlock()等API其实依赖于内部的Synchronizer(注意,不是synchronized)来实现。Synchronizer又分为FairSync和NonfairSync,顾名思义是指公平和非公平。

当调用ReentrantLock的lock方法时,其实就只是简单地转交给Synchronizer的lock()方法:

代码节选自:java.util.concurrent.locks.ReentrantLock.java/**Synchronizerprovidingallimplementationmechanics*/privatefinalSyncsync;/***Baseofsynchronizationcontrolforthislock.Subclassed*intofairandnonfairversionsbelow.UsesAQSstateto*representthenumberofholdsonthelock.*/abstractstaticclassSyncextendsAbstractQueuedSynchronizer{......}publicvoidlock(){sync.lock();}

那么这个sync又是什么?我们看到Sync 继承自AbstractQueueSynchronizer(AQS),AQS是concurrent包的基石,AQS本身并不实现任何同步接口(比如lock,unlock,countDown等等),但是它定义了一个并发资源控制逻辑的框架(运用了template method 设计模式),它定义了acquire和release方法用于独占地(exclusive)获取和释放资源,以及acquireShared和releaseShared方法用于共享地获取和释放资源。比如acquire/release用于实现ReentrantLock,而acquireShared/releaseShared用于实现CountDownLacth,Semaphore。比如acquire的框架如下:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}

整体逻辑是,先进行一次tryAcquire,如果成功了,就没啥事了,调用者继续执行自己后面的代码,如果失败,则执行addWaiter和acquireQueued。其中tryAcquire()需要子类根据自己的同步需求进行实现,而acquireQueued() 和addWaiter() 已经由AQS实现。addWaiter的作用是把当前线程加入到AQS内部同步队列的尾部,而acquireQueued的作用是当tryAcquire()失败的时候阻塞当前线程。

addWaiter的代码如下:

/***Createsandenqueuesnodeforcurrentthreadandgivenmode.**@parammodeNode.EXCLUSIVEforexclusive,Node.SHAREDforshared*@returnthenewnode*/privateNodeaddWaiter(Nodemode){//创建节点,设置关联线程和模式(独占或共享)Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;//如果尾节点不为空,说明同步队列已经初始化过if(pred!=null){//新节点的前驱节点设置为尾节点node.prev=pred;//设置新节点为尾节点if(compareAndSetTail(pred,node)){//老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next=node;returnnode;}}//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点enq(node);returnnode;}

enq(node)的代码如下:

/***Insertsnodeintoqueue,initializingifnecessary.Seepictureabove.*@paramnodethenodetoinsert*@returnnode'spredecessor*/privateNodeenq(finalNodenode){for(;;){Nodet=tail;if(t==null){//Mustinitialize//如果tail为空,则新建一个head节点,并且tail和head都指向这个head节点//队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联if(compareAndSetHead(newNode()))tail=head;}else{//第二次循环进入这个分支,node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}

addWaiter执行结束后,同步队列的结构如下所示:

acquireQueued的代码如下:

/***Acquiresinexclusiveuninterruptiblemodeforthreadalreadyin*queue.Usedbyconditionwaitmethodsaswellasacquire.**@paramnodethenode*@paramargtheacquireargument*@return{@codetrue}ifinterruptedwhilewaiting*/finalbooleanacquireQueued(finalNodenode,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){//获取当前node的前驱nodefinalNodep=node.predecessor();//如果前驱node是headnode,说明自己是第一个排队的线程,则尝试获锁if(p==head&&tryAcquire(arg)){//把获锁成功的当前节点变成headnode(哑节点)。setHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}

acquireQueued的逻辑是:

判断自己是不是同步队列中的第一个排队的节点,则尝试进行加锁,如果成功,则把自己变成head node,过程如下所示:

如果自己不是第一个排队的节点或者tryAcquire失败,则调用shouldParkAfterFailedAcquire,其主要逻辑是使用CAS将节点状态由 INITIAL 设置成 SIGNAL,表示当前线程阻塞等待SIGNAL唤醒。如果设置失败,会在 acquireQueued 方法中的死循环中继续重试,直至设置成功,然后调用parkAndCheckInterrupt 方法。parkAndCheckInterrupt的作用是把当前线程阻塞挂起,等待唤醒。parkAndCheckInterrupt的实现需要借助下层的能力,这是本文的重点,在下文中逐层阐述。

2 ReentrantLock

下面就让我们一起看看ReentrantLock是如何基于AbstractQueueSynchronizer实现其语义的。

ReentrantLock内部使用的FairSync和NonfairSync,它们都是AQS的子类,比如FairSync的主要代码如下:

/***Syncobjectforfairlocks*/staticfinalclassFairSyncextendsSync{privatestaticfinallongserialVersionUID=-3000897897090466540L;finalvoidlock(){acquire(1);}/***FairversionoftryAcquire.Don'tgrantaccessunless*recursivecallornowaitersorisfirst.*/protectedfinalbooleantryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();intc=getState();if(c==0){if(!hasQueuedPredecessors()&&compareAndSetState(0,acquires)){setExclusiveOwnerThread(current);returntrue;}}elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;if(nextc<0)thrownewError("Maximumlockcountexceeded");setState(nextc);returntrue;}returnfalse;}}

AQS中最重要的一个字段就是state,锁和同步器的实现都是围绕着这个字段的修改展开的。AQS可以实现各种不同的锁和同步器的原因之一就是,不同的锁或同步器按照自己的需要可以对同步状态的含义有不同的定义,并重写对应的tryAcquire, tryRelease或tryAcquireshared, tryReleaseShared等方法来操作同步状态。

我们来看看ReentrantLock的FairSync的tryAcquire的逻辑:

如果此时state(private volatile int state)是0,那么就表示这个时候没有人占有锁。但因为是公平锁,所以还要判断自己是不是首节点,然后才尝试把状态设置为1,假如成功的话,就成功的占有了锁。compareAndSetState 也是通过CAS来实现。CAS 是原子操作,而且state的类型是volatile,所以state 的值是线程安全的。

如果此时状态不是0,那么再判断当前线程是不是锁的owner,如果是的话,则state 递增,当state溢出时,会抛错。如果没溢出,则返回true,表示成功获取锁。

上述都不满足,则返回false,获取锁失败。

至此,JAVA层面的实现基本说清楚了,小结一下,整个框架如下所示:

关于unlock的实现,限于篇幅,就不讨论了,下文重点分析lock过程中是如何把当前线程阻塞挂起的,就是上图中的unsafe.park()是如何实现的。

二 JVM层

Unsafe.park和Unsafe.unpark 是sun.misc.Unsafe类的native 方法,

publicnativevoidunpark(Objectvar1);publicnativevoidpark(booleanvar1,longvar2);

这两个方法的实现是在JVM的hotspot/src/share/vm/prims/unsafe.cpp 文件中,

UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime))UnsafeWrapper("Unsafe_Park");EventThreadParkevent;#ifndefUSDT2HS_DTRACE_PROBE3(hotspot,thread__park__begin,thread->parker(),(int)isAbsolute,time);#else/*USDT2*/HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);#endif/*USDT2*/JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);#ifndefUSDT2HS_DTRACE_PROBE1(hotspot,thread__park__end,thread->parker());#else/*USDT2*/HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());#endif/*USDT2*/if(event.should_commit()){constoopobj=thread->current_park_blocker();if(time==0){post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{if(isAbsolute!=0){post_thread_park_event(&event,obj,min_jlong,time);}else{post_thread_park_event(&event,obj,time,min_jlong);}}}UNSAFE_END

核心是逻辑是thread->parker()->park(isAbsolute != 0, time); 就是获取java线程的parker对象,然后执行它的park方法。每个java线程都有一个Parker实例,Parker类是这样定义的:

classParker:publicos::PlatformParker{private:volatileint_counter;...public:voidpark(boolisAbsolute,jlongtime);voidunpark();...}classPlatformParker:publicCHeapObj<mtInternal>{protected:enum{REL_INDEX=0,ABS_INDEX=1};int_cur_index;//whichcondisinuse:-1,0,1pthread_mutex_t_mutex[1];pthread_cond_t_cond[2];//oneforrelativetimesandoneforabs.public://TODO-FIXME:makedtorprivate~PlatformParker(){guarantee(0,"invariant");}public:PlatformParker(){intstatus;status=pthread_cond_init(&_cond[REL_INDEX],os::Linux::condAttr());assert_status(status==0,status,"cond_initrel");status=pthread_cond_init(&_cond[ABS_INDEX],NULL);assert_status(status==0,status,"cond_initabs");status=pthread_mutex_init(_mutex,NULL);assert_status(status==0,status,"mutex_init");_cur_index=-1;//markasunused}};

park方法:

voidParker::park(boolisAbsolute,jlongtime){//Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(0,&_counter)>0)return;Thread*thread=Thread::current();assert(thread->is_Java_thread(),"MustbeJavaThread");JavaThread*jt=(JavaThread*)thread;if(Thread::is_interrupted(thread,false)){return;}//Next,demultiplex/decodetimeargumentstimespecabsTime;if(time<0||(isAbsolute&&time==0)){//don'twaitatallreturn;}if(time>0){unpackTime(&absTime,isAbsolute,time);}////进入safepointregion,更改线程为阻塞状态ThreadBlockInVMtbivm(jt);//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unblocking.Also.checkinterruptbeforetryingwaitif(Thread::is_interrupted(thread,false)||pthread_mutex_trylock(_mutex)!=0){//如果线程被中断,或者尝试给互斥变量加锁时失败,比如被其它线程锁住了,直接返回return;}//到这里,意味着pthread_mutex_trylock(_mutex)成功intstatus;if(_counter>0){//nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert(status==0,"invariant");OrderAccess::fence();return;}#ifdefASSERT//Don'tcatchsignalswhileblocked;lettherunningthreadshavethesignals.//(Thisallowsadebuggertobreakintotherunningthread.)sigset_toldsigs;sigset_t*allowdebug_blocked=os::Linux::allowdebug_blocked_signals();pthread_sigmask(SIG_BLOCK,allowdebug_blocked,&oldsigs);#endifOSThreadWaitStateosts(thread->osthread(),false/*notObject.wait()*/);jt->set_suspend_equivalent();//clearedbyhandle_special_suspend_equivalent_condition()orjava_suspend_self()assert(_cur_index==-1,"invariant");if(time==0){_cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);}else{_cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=os::Linux::safe_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);if(status!=0&&WorkAroundNPTLTimedWaitHang){pthread_cond_destroy(&_cond[_cur_index]);pthread_cond_init(&_cond[_cur_index],isAbsolute?NULL:os::Linux::condAttr());}}_cur_index=-1;assert_status(status==0||status==EINTR||status==ETIME||status==ETIMEDOUT,status,"cond_timedwait");#ifdefASSERTpthread_sigmask(SIG_SETMASK,&oldsigs,NULL);#endif_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();//Ifexternallysuspendedwhilewaiting,re-suspendif(jt->handle_special_suspend_equivalent_condition()){jt->java_suspend_self();}}

park的思路:parker内部有个关键字段_counter, 这个counter用来记录所谓的“permit”,当_counter大于0时,意味着有permit,然后就可以把_counter设置为0,就算是获得了permit,可以继续运行后面的代码。如果此时_counter不大于0,则等待这个条件满足。

下面我具体来看看park的具体实现:

当调用park时,先尝试能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回。

如果不成功,则把线程的状态设置成_thread_in_vm并且_thread_blocked。_thread_in_vm 表示线程当前在JVM中执行,_thread_blocked表示线程当前阻塞了。

拿到mutex之后,再次检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回

如果_counter还是不大于0,则判断等待的时间是否等于0,然后调用相应的pthread_cond_wait系列函数进行等待,如果等待返回(即有人进行unpark,则pthread_cond_signal来通知),则把_counter设置为0,unlock mutex并返回。

所以本质上来讲,LockSupport.park 是通过pthread库的条件变量pthread_cond_t来实现的。下面我们就来看看pthread_cond_t 是怎么实现的。

三 GLIBC 层

pthread_cond_t 典型的用法如下:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}0

重点就是:无论是pthread_cond_wait还是pthread_cond_signal 都必须得先pthread_mutex_lock。如果没有这个保护,可能会产生race condition,漏掉信号。pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread_cond_signal或pthread_cond_broadcast把该线程唤醒,使pthread_cond_wait()返回时,该线程又自动获得该mutex。

整个过程如下图所示:

1 pthread_mutex_lock

例如,在Linux中,使用了称为Futex(快速用户空间互斥锁的简称)的系统。

在此系统中,对用户空间中的互斥变量执行原子增量和测试操作。

如果操作结果表明锁上没有争用,则对pthread_mutex_lock的调用将返回,而无需将上下文切换到内核中,因此获取互斥量的操作可以非常快。

仅当检测到争用时,系统调用(称为futex)才会发生,并且上下文切换到内核中,这会使调用进程进入睡眠状态,直到释放互斥锁为止。

还有很多更多的细节,尤其是对于可靠和/或优先级继承互斥,但这就是它的本质。

nptl/pthread_mutex_lock.c

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}1

pthread_mutex_t的定义如下:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}2

其中__kind字段是指锁的类型,取值如下:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}3

其中: 

PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。 

PTHREAD_MUTEX_RECURSIVE_NP,可重入锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。

PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程重复请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型相同。

PTHREAD_MUTEX_ADAPTIVE_NP,自适应锁,自旋锁与普通锁的混合。 

mutex默认用的是PTHREAD_MUTEX_TIMED_NP,所以会走到LLL_MUTEX_LOCK_OPTIMIZED,这是个宏:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}4

由于不是LLL_PRIVATE,所以走lll_lock, lll_lock也是个宏:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}5

注意这里出现了futex,本文的后续主要就是围绕它展开的。

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}6

其中,atomic_compare_and_exchange_bool_acq是尝试通过原子操作尝试将__futex(就是mutex->__data.__lock)从0变为1,如果成功就直接返回了,如果失败,则调用__lll_lock_wait,代码如下:

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}7

在这里先要说明一下,pthread将futex的锁状态定义为3种:

0,代表当前锁空闲无锁,可以进行快速上锁,不需要进内核。

1,代表有线程持有当前锁,如果这时有其它线程需要上锁,就必须标记futex为“锁竞争”,然后通过futex系统调用进内核把当前线程挂起。

2,代表锁竞争,有其它线程将要或正在内核的futex系统中排队等待锁。

所以上锁失败进入到__lll_lock_wait这里后,先判断futex 是不是等于2,如果是则说明大家都在排队,你也排着吧(直跳转到futex_wait)。如果不等于2,那说明你是第一个来竞争的人,把futex设置成2,告诉后面来的人要排队,然后自己以身作则先排队。 futex_wait 实质上就是调用futex系统调用。在第四节,我们就来仔细分析这个系统调用。

2 pthread_cond_wait

本质也是走到futex系统调用,限于篇幅就不展开了。

四 内核层

为什么要有futex,它解决什么问题?何时加入内核的?

简单来讲,futex的解决思路是:在无竞争的情况下操作完全在user space进行,不需要系统调用,仅在发生竞争的时候进入内核去完成相应的处理(wait 或者 wake up)。所以说,futex是一种user mode和kernel mode混合的同步机制,需要两种模式合作才能完成,futex变量位于user space,而不是内核对象,futex的代码也分为user mode和kernel mode两部分,无竞争的情况下在user mode,发生竞争时则通过sys_futex系统调用进入kernel mode进行处理。

用户态的部分已经在前面讲解了,本节重点讲解futex在内核部分的实现。

futex 设计了三个基本数据结构:futex_hash_bucket,futex_key,futex_q。

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}8

其实还有个struct __futex_data, 如下所示,这个

/***Acquiresinexclusivemode,ignoringinterrupts.Implemented*byinvokingatleastonce{@link#tryAcquire},*returningonsuccess.Otherwisethethreadisqueued,possibly*repeatedlyblockingandunblocking,invoking{@link*#tryAcquire}untilsuccess.Thismethodcanbeused*toimplementmethod{@linkLock#lock}.**@paramargtheacquireargument.Thisvalueisconveyedto*{@link#tryAcquire}butisotherwiseuninterpretedand*canrepresentanythingyoulike.*/publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}9

在futex初始化的时候(futex_init),会确定hashsize,比如24核cpu时,hashsize = 8192。然后根据这个hashsize调用alloc_large_system_hash分配数组空间,并初始化数组元素里的相关字段,比如plist_head, lock。

/***Createsandenqueuesnodeforcurrentthreadandgivenmode.**@parammodeNode.EXCLUSIVEforexclusive,Node.SHAREDforshared*@returnthenewnode*/privateNodeaddWaiter(Nodemode){//创建节点,设置关联线程和模式(独占或共享)Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;//如果尾节点不为空,说明同步队列已经初始化过if(pred!=null){//新节点的前驱节点设置为尾节点node.prev=pred;//设置新节点为尾节点if(compareAndSetTail(pred,node)){//老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next=node;returnnode;}}//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点enq(node);returnnode;}0

这些数据结构之间的关系如下所示:

脑子里有了数据结构,流程就容易理解了。futex_wait的总体流程如下:

/***Createsandenqueuesnodeforcurrentthreadandgivenmode.**@parammodeNode.EXCLUSIVEforexclusive,Node.SHAREDforshared*@returnthenewnode*/privateNodeaddWaiter(Nodemode){//创建节点,设置关联线程和模式(独占或共享)Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;//如果尾节点不为空,说明同步队列已经初始化过if(pred!=null){//新节点的前驱节点设置为尾节点node.prev=pred;//设置新节点为尾节点if(compareAndSetTail(pred,node)){//老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next=node;returnnode;}}//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点enq(node);returnnode;}1

函数 futex_wait_setup主要做两件事,一是对uaddr进行hash,找到futex_hash_bucket并获取它上面的自旋锁,二是判断*uaddr是否为预期值。如果不相等则会立即返回,由用户态继续trylock。

/***Createsandenqueuesnodeforcurrentthreadandgivenmode.**@parammodeNode.EXCLUSIVEforexclusive,Node.SHAREDforshared*@returnthenewnode*/privateNodeaddWaiter(Nodemode){//创建节点,设置关联线程和模式(独占或共享)Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;//如果尾节点不为空,说明同步队列已经初始化过if(pred!=null){//新节点的前驱节点设置为尾节点node.prev=pred;//设置新节点为尾节点if(compareAndSetTail(pred,node)){//老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next=node;returnnode;}}//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点enq(node);returnnode;}2

然后调用futex_wait_queue_me 把当前进程挂起:

/***Createsandenqueuesnodeforcurrentthreadandgivenmode.**@parammodeNode.EXCLUSIVEforexclusive,Node.SHAREDforshared*@returnthenewnode*/privateNodeaddWaiter(Nodemode){//创建节点,设置关联线程和模式(独占或共享)Nodenode=newNode(Thread.currentThread(),mode);//Trythefastpathofenq;backuptofullenqonfailureNodepred=tail;//如果尾节点不为空,说明同步队列已经初始化过if(pred!=null){//新节点的前驱节点设置为尾节点node.prev=pred;//设置新节点为尾节点if(compareAndSetTail(pred,node)){//老的尾节点的后继节点设置为新的尾节点。所以同步队列是一个双向列表。pred.next=node;returnnode;}}//如果尾节点为空,说明队列还未初始化,需要初始化head节点并加入新节点enq(node);returnnode;}3

futex_wait_queue_me主要做几件事:

将当前进程插入到等待队列,就是把futex_q 挂到futex_hash_bucket上

启动定时任务

主动触发内核进程调度

五 总结

本文主要是对JAVA中的ReentrantLock.lock流程进行了自上而下的梳理。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/10497.html