DefaultThreadPoolFactory.java

/*
 * Copyright (C) 2003-2013 eXo Platform SAS.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package org.exoplatform.social.common.service.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DefaultThreadPoolFactory implements ThreadPoolFactory {

  public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
      return Executors.newCachedThreadPool(threadFactory);
  }
  
  @Override
  public ExecutorService newThreadPool(ThreadPoolConfig config, ThreadFactory factory) {
      return newThreadPool(config.getPoolSize(), 
                           config.getMaxPoolSize(), 
                           config.getKeepAliveTime(),
                           config.getTimeUnit(),
                           config.getMaxQueueSize(), 
                           factory);
  }

  public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
                                       int maxQueueSize, ThreadFactory threadFactory) throws IllegalArgumentException {

      // the core pool size must be higher than 0
      if (corePoolSize < 1) {
          throw new IllegalArgumentException("CorePoolSize must be >= 1, was " + corePoolSize);
      }

      // validate max >= core
      if (maxPoolSize < corePoolSize) {
          throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
      }

      BlockingQueue<Runnable> workQueue;
      if (corePoolSize == 0 && maxQueueSize <= 0) {
          // use a synchronous queue for direct-handover (no tasks stored on the queue)
          workQueue = new SynchronousQueue<Runnable>();
          // and force 1 as pool size to be able to create the thread pool by the JDK
          corePoolSize = 1;
          maxPoolSize = 1;
      } else if (maxQueueSize <= 0) {
          // use a synchronous queue for direct-handover (no tasks stored on the queue)
          workQueue = new SynchronousQueue<Runnable>();
      } else {
          // bounded task queue to store tasks on the queue
          workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
      }

      ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
      answer.setThreadFactory(threadFactory);
      //sets TRUE : allows terminal if no tasks arrive within the keep-alive time
      //sets FALSE: When false, core threads are never terminated due to lack of incoming tasks.
      answer.allowCoreThreadTimeOut(true);
      answer.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      return answer;
  }
  
  @Override
  public ScheduledExecutorService newScheduledThreadPool(ThreadPoolConfig config, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(config.getPoolSize(), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
  }

}