• notice
  • Congratulations on the launch of the Sought Tech site

ThreadPoolExecutor thread destruction source code analysis

Thread pool is used very frequently in development, and it is also a high-frequency interview point in interviews.I recently looked at the source code and shared some experience:

The general process of thread pool recycling threads:

1.Call the shutdown() method or shutdownNow() method of the thread pool;

2.The main thread modifies the state of the thread pool:

shutdown() -> SHUTDOWN

    shutdownNow() -> STOP

3.Interrupt all threads in the thread pool; interrupt some blocked threads and let the threads finish execution normally;

4.When each work thread obtains a task, getTask() verifies the current thread pool status.If the thread pool status has been changed to a non-RUNNING state, the time is right, and the work thread exits ;

For details, please see the source code comments below:

1.Thread pool status:

// Initial state of thread pool
private static final int RUNNING = -1 << COUNT_BITS;
// Thread pool state after calling shutdown() method
private static final int SHUTDOWN = 0 << COUNT_BITS;
// Thread pool state after calling shutdownNow() method
private static final int STOP = 1 << COUNT_BITS;
// A transition state before TERMINATED, not very useful
private static final int TIDYING = 2 << COUNT_BITS;
// thread pool termination status
private static final int TERMINATED = 3 << COUNT_BITS;

2.1 shutdown method

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // Check whether the thread calling shutdown() can operate this method, and check whether the thread (works) of the terminated task can be terminated
      checkShutdownAccess();
      // CAS modifies the running status of the thread pool to: SHUTDOWN
      advanceRunState(SHUTDOWN);
      // Interrupt all worker threads
      interruptIdleWorkers();
      // hook for ScheduledThreadPoolExecutor
      onShutdown();
    } finally {
      mainLock.unlock();
    }
  // try to interrupt the thread pool
    tryTerminate();
}

2.2 shutdownNow

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
     // Verify whether the thread calling shutdownNow() can operate this method, and verify whether the thread (works) of the terminated task can be terminated
      checkShutdownAccess();
     // CAS modifies the running status of the thread pool to: STOP
      advanceRunState(STOP);
      // Interrupt all worker threads
      interruptWorkers();
      // Remove the task in the queue, and return to the removed task
      tasks = drainQueue();
    } finally {
      mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

3.interruptIdleWorkers

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // Traverse the collection of works worker threads, interrupt each thread that blocks to obtain tasks, let these threads execute the current logic, and exit the run() method
    for (Worker w: workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

4.tryTerminate()

final void tryTerminate() {
   for (;;) {
     int c = ctl.get();
     // thread pool RUNNING exit
     // thread pool TIDYING exit
     // Thread pool SHUTDOWN and the queue is not empty (there are still unprocessed tasks) exit
     if (isRunning(c) ||
         runStateAtLeast(c, TIDYING) ||
         (runStateOf(c) == SHUTDOWN &&! workQueue.isEmpty()))
       return;
     // There are still active threads, interrupt one
     if (workerCountOf(c) != 0) {// Eligible to terminate
       interruptIdleWorkers(ONLY_ONE);
       return;
     }

     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
       if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
         try {
           terminated();
         } finally {
           ctl.set(ctlOf(TERMINATED, 0));
           termination.signalAll();
         }
         return;
       }
     } finally {
       mainLock.unlock();
     }
     // else retry on failed CAS
   }
 }

5.Work class: worker thread

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
   private static final long serialVersionUID = 6138294804551838833L;

   /** The worker thread in the thread pool, which is passed in when the Work object is created */
   final Thread thread;
   /** The first task executed after the worker thread is created */
   Runnable firstTask;
   /** The number of tasks performed by the worker thread */
   volatile long completedTasks;

   /**
    * Create a Work thread, specify the first task to be executed
    */
   Worker(Runnable firstTask) {
     setState(-1);
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this);
   }

   /** After the work thread starts, execute the run method until the thread exits */
   public void run() {
     runWorker(this);
   }
}

6.runWorker

final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 // task defaults to the first task specified when creating the work
 Runnable task = w.firstTask;
 w.firstTask = null;
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
   
   /**
   * 1.The first task is not null, execute the first task first, and then take tasks from the task queue for execution in a loop
   * 2.Only when getTask() returns null will it exit the loop
   */
   while (task != null || (task = getTask()) != null) {
     w.lock();
     /** double check
     * To prevent some threads from failing to respond to interruption after the thread pool has been set to the STOP state,
     * This place will re-check and set the interrupt status of the current thread
     */
     if ((runStateAtLeast(ctl.get(), STOP) ||
          (Thread.interrupted() &&
           runStateAtLeast(ctl.get(), STOP))) &&
         !wt.isInterrupted())
       wt.interrupt();
     try {
       // Not implemented by default, can be extended in subclasses
       beforeExecute(wt, task);
       Throwable thrown = null;
       // perform task
       task.run();
       // Omit part of the try-catch code
     } finally {
       task = null;
       // The cumulative number of tasks executed by the current thread
       w.completedTasks++;
       w.unlock();
     }
   }
   // This step can be reached when getTask() is null
   completedAbruptly = false;
 } finally {
   // getTask() is null, the task is executed, the thread exits
   processWorkerExit(w, completedAbruptly);
 }

7.getTask

private Runnable getTask() {
 // Record whether the task from the task queue was timed out last time
 boolean timedOut = false;
 // Loop through until the task is obtained or interrupted
 for (;;) {
   // Number of threads in the thread pool
   int c = ctl.get();
   // thread pool status
   int rs = runStateOf(c);

   // If the thread pool status: RUNNING(-1)|SHUTDOWN(0)|STOP(1)|TIDYING(2)|TERMINATED(3)
   // Two cases:
   // 1.If it is after STOP (don't care whether there are tasks in the task queue that have not been executed), the total number of threads will be reduced by 1, and null will be returned immediately;
   // 2.After the above situation, if it is the state after SHUTDOWN && The task queue is empty (because you have to wait for the task to be executed), the total number of threads is reduced by 1, and null is returned immediately;
   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
     decrementWorkerCount();
     return null;
   }
   int wc = workerCountOf(c);
   /**
   * 1, allowCoreThreadTimeOut: Whether to let the core thread idle and recycle, the default is false
   * 2.Is the number of threads in the current thread pool greater than the number of core threads
   */
   boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;

   /**
   * In the case of'there are threads' or'there is no task in the task queue' in the thread pool:
   * 1.If the number of threads in the thread pool is greater than the maximum number of threads set, the current thread must be stopped;
   * 2.Obtaining task timeout in the current thread (indicating that no task can be executed in the queue):
   * (1) If the core thread allows timeout to be recycled, the current thread is recycled (because the core thread and other threads in the thread pool have no special marks,
   * Recycle after recycling.If there are not enough threads in the thread pool next time, create a new thread to play the role of the core thread);
   * (2) The current total number of threads exceeds the number of core threads, and now there is no task to be executed, the current thread is naturally recycled;
   */
   if ((wc> maximumPoolSize || (timed && timedOut))
       && (wc> 1 || workQueue.isEmpty())) {
     if (compareAndDecrementWorkerCount(c))
       return null;
     continue;
   }
   try {
     /**
     * 1.If the core thread is allowed to exit idle or the current number of threads is greater than the number of core threads:
     * poll() sets the blocking time of the acquisition task.After the timeout, it returns null.When the loop is executed to the above two check points, it returns null to exit;
     * 2.Take() blocking acquisition tasks, until the task queue has tasks that can be executed;
     */
     Runnable r = timed?
       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):
     workQueue.take();
     if (r != null)
       return r;
     // The task is still not obtained after the timeout
     timedOut = true;
   } catch (InterruptedException retry) {
     timedOut = false;
   }
 }
}

8.processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
   decrementWorkerCount();
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
   // Count the number of tasks completed by each thread
   completedTaskCount += w.completedTasks;
   // Remove the current thread from the thread pool (set)
   workers.remove(w);
 } finally {
   mainLock.unlock();
 }
// Modify thread pool interrupt status
 tryTerminate();

 int c = ctl.get();
 /**
 * If the current thread pool status is not STOP, it means that the current status is RUNNING or SHUTDOWN,
 * At this time, it is necessary to ensure that there are necessary threads in the thread pool to perform the tasks of the queue
 */
 if (runStateLessThan(c, STOP)) {
   if (!completedAbruptly) {
     int min = allowCoreThreadTimeOut? 0: corePoolSize;
     // If the current number of threads is 0, but there are tasks in the task queue that need to be executed, the number of threads must be adjusted
     if (min == 0 &&! workQueue.isEmpty())
       min = 1;
     if (workerCountOf(c) >= min)
       return; // replacement not needed
   }
   // Add a thread to the thread pool to ensure that the task is executed
   addWorker(null, false);
 }
}


Tags

Technical otaku

Sought technology together

Related Topic

0 Comments

Leave a Reply

+