LifeCycleCompletionService.java

/*
 * Copyright (C) 2003-2013 eXo Platform SAS.
 *
 * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License
* as published by the Free Software Foundation; either version 3
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, see<http://www.gnu.org/licenses/>.
 */
package org.exoplatform.forum.common.lifecycle;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.picocontainer.Startable;

import org.exoplatform.container.xml.InitParams;
import org.exoplatform.forum.common.InitParamsValue;

public class LifeCycleCompletionService implements Startable {
  private final String                 THREAD_NUMBER_KEY       = "thread-number";

  private final String                 ASYNC_EXECUTION_KEY     = "async-execution";

  private final String                 PRIORITY_KEY            = "thread-priority";
  
  private final String KEEP_ALIVE_TIME = "keepAliveTime";

  private Executor                     executor;

  private ExecutorCompletionService<?> ecs;
  
  private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();

  private final int                   DEFAULT_THREAD_NUMBER   = 1;

  private final boolean               DEFAULT_ASYNC_EXECUTION = true;

  private final int                   DEFAULT_KEEP_ALIVE_TIME = 10;

  private final int                   DEFAULT_THREAD_PRIORITY = 10;

  private final boolean configAsyncExecution;


  public LifeCycleCompletionService(InitParams params) {

    //
    final int configThreadNumber = InitParamsValue.getInteger(params, THREAD_NUMBER_KEY, DEFAULT_THREAD_NUMBER);
    final int keepAliveTime = InitParamsValue.getInteger(params, KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME);
    final int threadPriority = InitParamsValue.getInteger(params, PRIORITY_KEY, DEFAULT_THREAD_PRIORITY);
    this.configAsyncExecution = InitParamsValue.getBoolean(params, ASYNC_EXECUTION_KEY, DEFAULT_ASYNC_EXECUTION);

    int threadNumber_ = configThreadNumber <= 0 ? configThreadNumber : Runtime.getRuntime().availableProcessors();
    //
    ThreadFactory threadFactory = new ThreadFactory() {
      public Thread newThread(Runnable runable) {
        Thread t = new Thread(runable, "Forum-Thread");
        t.setPriority(threadPriority);
        return t;
      }
    };
    //
    if (configAsyncExecution) {
      executor = new ThreadPoolExecutor(threadNumber_, threadNumber_, keepAliveTime, 
                                              TimeUnit.SECONDS, workQueue, threadFactory);
      ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    } else {
      executor = new DirectExecutor();
    }

    //
    this.ecs = new ExecutorCompletionService(executor);

  }

  public void addTask(Callable callable) {
    ecs.submit(callable);
  }

  public void waitCompletionFinished() {
    try {
      if (executor instanceof ExecutorService) {
        ((ExecutorService) executor).awaitTermination(1, TimeUnit.SECONDS);
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  public boolean shutdownNow() {
    boolean isDone = true;
    Future<?> f;
    while (ecs != null && (f = ecs.poll()) != null) {
      isDone &= f.cancel(true);
    }
    return isDone;
  }

  public boolean isAsync() {
    return this.configAsyncExecution;
  }

  private class DirectExecutor implements Executor {

    public void execute(final Runnable runnable) {
      if (Thread.interrupted())
        throw new RuntimeException();

      runnable.run();
    }
  }

  @Override
  public void start() {
  }

  @Override
  public void stop() {
    if (executor instanceof ExecutorService) {
      ((ExecutorService) executor).shutdown();
    }
  }
}