ForumTaskManager.java

/*
 * Copyright (C) 2003-2014 eXo Platform SAS.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation; either version 3
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, see<http://www.gnu.org/licenses/>.
 */
package org.exoplatform.forum.ext.activity;

import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.exoplatform.commons.utils.CommonsUtils;
import org.exoplatform.container.PortalContainer;
import org.exoplatform.container.component.RequestLifeCycle;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.forum.common.InitParamsValue;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.organization.idm.PicketLinkIDMServiceImpl;
import org.exoplatform.services.security.ConversationState;
import org.exoplatform.social.core.activity.model.ExoSocialActivity;
import org.hibernate.TransactionException;
import org.picocontainer.Startable;

public class ForumTaskManager implements Startable {
  private static final Log    LOG                = ExoLogger.getExoLogger(ForumTaskManager.class);
  private static final String PERIOD_TIME_KEY  = "periodTime";
  private static final String MAX_PERSIST_SIZE = "maxPersistSize";
  private static final String PRIORITY_KEY = "thread-priority";
  private static final String ASYNC_EXECUTION_KEY = "async-execution";

  private ScheduledExecutorService scheduler ;
  private Queue<Task<ForumActivityContext>> tasks = null;
  private static long   INTERVAL         = 5000l;
  private static int    MAX_SIZE_PERSIST = 25;
  private static int THREAD_PRIORITY = 1;
  private boolean isDone = true;
  private boolean forceStop = false;
  private boolean isAsync = true;

  public ForumTaskManager(InitParams params) {
    INTERVAL = InitParamsValue.getLong(params, PERIOD_TIME_KEY, INTERVAL);
    MAX_SIZE_PERSIST = InitParamsValue.getInteger(params, MAX_PERSIST_SIZE, MAX_SIZE_PERSIST);
    THREAD_PRIORITY = InitParamsValue.getInteger(params, PRIORITY_KEY, THREAD_PRIORITY);
    THREAD_PRIORITY = InitParamsValue.getInteger(params, PRIORITY_KEY, THREAD_PRIORITY);
    isAsync = InitParamsValue.getBoolean(params, ASYNC_EXECUTION_KEY, true);
  }

  @Override
  public void start() {
    //
    if(isAsync) {
      makeInterval();
    }
  }

  @Override
  public void stop() {
    isDone = false;
    forceStop = true;
    if (scheduler != null) {
      scheduler.shutdownNow();
    }
    scheduler = null;
  }

  private void makeInterval() {
    //
    ThreadFactory threadFactory = new ThreadFactory() {
      public Thread newThread(Runnable runable) {
        Thread t = new Thread(runable, "Forum-task-manager-thread");
        t.setPriority(THREAD_PRIORITY);
        return t;
      }
    };
    scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
    //
    scheduler.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
        if (isCommit(true)) {
          commit();
        }
      }
    }, 30000, INTERVAL, TimeUnit.MILLISECONDS);
  }
  
  public Boolean commit() {
    try {
      RequestLifeCycle.begin(PortalContainer.getInstance());
      persist();
    } finally {
      PicketLinkIDMServiceImpl idmServiceImpl = CommonsUtils.getService(PicketLinkIDMServiceImpl.class);
      if (idmServiceImpl != null) {
        try {
          if (idmServiceImpl.getIdentitySession().getTransaction().isActive()) {
            idmServiceImpl.getIdentitySession().getTransaction().commit();
          }
        } catch (TransactionException e) {
          LOG.debug("The PoolingConnection is null ", e);
        } catch (Exception e) {
          LOG.debug("End request life cycle unsuccessfully ", e);
        }
      }
      RequestLifeCycle.end();
    }
    return true;
  }

  public void addTask(Task<ForumActivityContext> task) {
    if(tasks == null){
      tasks = new LinkedBlockingQueue<Task<ForumActivityContext>>();
    }
    tasks.add(task);
    if(!isAsync) {
      persist();
    }
    //
    if (isCommit(false)) {
      scheduler.submit(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
          return commit();
        }
      });
    }
  }

  private boolean isCommit(boolean forceCommit) {
    if (tasks == null || forceStop) {
      return false;
    }
    if (isDone && (forceCommit || tasks.size() >= MAX_SIZE_PERSIST)) {
      return true;
    }
    return false;
  }

  private Queue<Task<ForumActivityContext>> popTasks() {
    Queue<Task<ForumActivityContext>> tmp = tasks;
    tasks = null;
    Queue<Task<ForumActivityContext>> processTasks = new LinkedBlockingQueue<Task<ForumActivityContext>>();
    for (Task<ForumActivityContext> forumTask : tmp) {
      if (!processTasks.contains(forumTask)) {
        processTasks.add(forumTask);
      }
    }
    //
    return processTasks;
  }
  
  private void persist() {
    //
    isDone = false;
    try {
      //
      Queue<Task<ForumActivityContext>> tasks = popTasks();

      Task<ForumActivityContext> task;
      while (!forceStop && (task = tasks.poll()) != null) {
        ConversationState lastState = ConversationState.getCurrent();
        try {
          startProcess(task);
          processTask(task);
        } finally {
          endProcess(task, lastState);
        }
      }
    } catch (Exception e) {
      LOG.warn("Running task of forum activity unsuccessful.", e);
      LOG.debug(e.getMessage(), e);
    } finally {
      isDone = true;
    }
  }

  private void startProcess(Task<ForumActivityContext> task) {
    try{
      ConversationState.setCurrent(task.getState());
    }catch(Exception e){
      LOG.warn("Failed to set state context for forum activity task", e);
    }
  }

  private void processTask(Task<ForumActivityContext> task) {
    ActivityTask<ForumActivityContext> activityTask = task.getTask();
    //
    ExoSocialActivity got = ActivityExecutor.execute(activityTask, task.getContext());
    //
    if (activityTask instanceof PostActivityTask) {
      //
      PostActivityTask task_ = PostActivityTask.ADD_POST;
      if (got != null && activityTask.equals(task_)) {
        //
        ForumActivityUtils.takeCommentBack(task.getContext().getPost(), got);
      }
    } else if (activityTask instanceof TopicActivityTask) {
      //
      TopicActivityTask task_ = TopicActivityTask.ADD_TOPIC;
      if (got != null && activityTask.equals(task_)) {
        ForumActivityUtils.takeActivityBack(task.getContext().getTopic(), got);
      }
    }
  }

  private void endProcess(Task<ForumActivityContext> task, ConversationState lastState) {
    try{
      ConversationState.setCurrent(lastState);
    }catch(Exception e){
      LOG.warn("Failed to reset state context for forum activity task executing", e);
    }
  }

  public static class Task<T> {
    private ForumActivityContext ctx;
    private ActivityTask<T>      task;
    private final ConversationState state;

    public Task(ForumActivityContext ctx, ActivityTask<T> task) {
      this.ctx = ctx;
      this.task = task;
      this.state = ConversationState.getCurrent();
    }

    public ForumActivityContext getContext() {
      return ctx;
    }

    public ActivityTask<T> getTask() {
      return task;
    }
    
    public ConversationState getState(){
      return this.state;
    }
  }
}