private <T> List<List<T>> partition(List<T> list, int batchCount) { if (!(batchCount > 0)) { thrownewIllegalArgumentException("batch count must greater than zero"); }
/** * Head of the wait queue , lazily initialized . Except for * initialization , it is modified only via method setHead . Note : * If head exists , its waitStatus is guaranteed not to be * CANCELLED . */ privatetransientvolatile Node head;
/** * Tail of the wait queue , lazily initialized . Modified only via * method enq to add new wait node . */ privatetransientvolatile Node tail;
/** * The synchronization state . */ privatevolatileint state;
/** * Status field , taking on only the values : * SIGNAL : The successor of this node is ( or will soon be ) * blocked ( via park ), so the current node must * unpark its successor when it releases or * cancels . To avoid races , acquire methods must * first indicate they need a signal , * then retry the atomic acquire , and then , * on failure , block . * CANCELLED : This node is cancelled due to timeout or interrupt . * Nodes never leave this state . In particular , * a thread with cancelled node never again blocks . * CONDITION : This node is currently on a condition queue . * It will not be used as a sync queue node * until transferred , at which time the status * will be set to 0. ( Use of this value here has * nothing to do with the other uses of the * field , but simplifies mechanics .) * PROPAGATE : A releaseShared should be propagated to other * nodes . This is set ( for head node only ) in * doReleaseShared to ensure propagation * continues , even if other operations have * since intervened . * 0: None of the above * * The values are arranged numerically to simplify use . * Non - negative values mean that a node doesn ' t need to * signal . So , most code doesn ' t need to check for particular * values , just for sign . * * The field is initialized to 0 for normal sync nodes , and * CONDITION for condition nodes . It is modified using CAS * ( or when possible , unconditional volatile writes ). */ volatileint waitStatus;
/** * Link to predecessor node that current node / thread relies on * for checking waitStatus . Assigned during enqueuing , and nulled * out ( for sake of GC ) only upon dequeuing . Also , upon * cancellation of a predecessor , we short - circuit while * finding a non - cancelled one , which will always exist * because the head node is never cancelled : A node becomes * head only as a result of successful acquire . A * cancelled thread never succeeds in acquiring , and a thread only * cancels itself , not any other node . */ volatile Node prev;
/** * Link to the successor node that the current node / thread * unparks upon release . Assigned during enqueuing , adjusted * when bypassing cancelled predecessors , and nulled out ( for * sake of GC ) when dequeued . The enq operation does not * assign next field of a predecessor until after attachment , * so seeing a null next field does not necessarily mean that * node is at end of queue . However , if a next field appears * to be null , we can scan prev ' s from the tail to * double - check . The next field of cancelled nodes is set to * point to the node itself instead of null , to make life * easier for isOnSyncQueue . */ volatile Node next;
/** * The thread that enqueued this node . Initialized on * construction and nulled out after use . */ volatile Thread thread;
/** * Link to next node waiting on condition , or the special * value SHARED . Because condition queues are accessed only * when holding in exclusive mode , we just need a simple * linked queue to hold nodes while they are waiting on * conditions . They are then transferred to the queue to * re - acquire . And because conditions can only be exclusive , * we save a field by using special value to indicate shared * mode . */ Node nextWaiter;
finalbooleanacquireQueued( final Node node, int arg) { booleaninterrupted=false ; try { for (;;) { finalNodep= node. predecessor (); if (p == head && tryAcquire (arg)) { setHead (node); p. next = null ; // help GC return interrupted; } if ( shouldParkAfterFailedAcquire (p, node)) interrupted |= parkAndCheckInterrupt (); } } catch ( Throwable t) { cancelAcquire (node); if (interrupted) selfInterrupt (); throw t; } }
privatestaticbooleanshouldParkAfterFailedAcquire( Node pred, Node node) { intws= pred. waitStatus ; if (ws == Node . SIGNAL ) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue ; if (ws > 0 ) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node. prev = pred = pred. prev ; } while (pred. waitStatus > 0 ); pred. next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred. compareAndSetWaitStatus (ws, Node . SIGNAL ); } returnfalse ; }
privatefinalbooleanparkAndCheckInterrupt() { LockSupport . park ( this ); return Thread . interrupted (); }
privatevoidunparkSuccessor( Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node. waitStatus ; if (ws < 0 ) node. compareAndSetWaitStatus (ws, 0 );
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node. next ; if (s == null || s. waitStatus > 0 ) { s = null ; for ( Nodep= tail; p != node && p != null ; p = p. prev ) if (p. waitStatus <= 0 ) s = p; } if (s != null ) LockSupport . unpark (s. thread ); }