DisableUserUpgradePlugin.java
/*
* Copyright (C) 2003-2014 eXo Platform SAS.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.exoplatform.platform.upgrade.plugins;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.exoplatform.commons.upgrade.UpgradeProductPlugin;
import org.exoplatform.commons.utils.ListAccess;
import org.exoplatform.commons.version.util.VersionComparator;
import org.exoplatform.container.ExoContainer;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.component.RequestLifeCycle;
import org.exoplatform.container.xml.InitParams;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.exoplatform.services.organization.OrganizationService;
import org.exoplatform.services.organization.User;
import org.exoplatform.services.organization.UserHandler;
import org.exoplatform.services.organization.UserStatus;
import org.exoplatform.services.organization.idm.PicketLinkIDMOrganizationServiceImpl;
import org.exoplatform.services.organization.idm.PicketLinkIDMService;
import org.exoplatform.services.organization.idm.UserDAOImpl;
import org.exoplatform.services.organization.impl.UserImpl;
import org.picketlink.idm.api.Attribute;
import org.picketlink.idm.api.AttributesManager;
import org.picketlink.idm.api.IdentitySession;
import org.picketlink.idm.impl.api.SimpleAttribute;
public class DisableUserUpgradePlugin extends UpgradeProductPlugin {
private static final Log LOG = ExoLogger.getLogger(DisableUserUpgradePlugin.class.getName());
private OrganizationService service;
private int threadNum = 20;
private int batchSize = 100;
public static final String THREAD_CONFIG = "numberOfThreads";
public static final String BATCH_CONFIG = "batchSize";
private static final String VERSION = "4.3";
public DisableUserUpgradePlugin(OrganizationService service, InitParams initParams) {
super(initParams);
if (initParams.containsKey(THREAD_CONFIG)) {
int tn = threadNum;
try {
tn = Integer.parseInt(initParams.getValueParam(THREAD_CONFIG).getValue());
} catch (Exception ex) {
}
if (tn > 0) {
threadNum = tn;
}
}
if (initParams.containsKey(BATCH_CONFIG)) {
int b = threadNum;
try {
b = Integer.parseInt(initParams.getValueParam(BATCH_CONFIG).getValue());
} catch (Exception ex) {
}
if (b > 0) {
batchSize = b;
}
}
this.service = service;
}
@Override
public void processUpgrade(String oldVersion, String newVersion) {
LOG.info("Start {} .............", this.getClass().getName());
if (service instanceof PicketLinkIDMOrganizationServiceImpl) {
final PicketLinkIDMOrganizationServiceImpl impl = (PicketLinkIDMOrganizationServiceImpl)service;
if (!impl.getConfiguration().isDisableUserActived()) {
LOG.info("Ignore upgrade user due to disable-user feature is not activated");
return;
}
final UserHandler handler = service.getUserHandler();
ListAccess<User> users = null;
int size = 0;
try {
RequestLifeCycle.begin(impl);
users = handler.findAllUsers(UserStatus.ANY);
size = users.getSize();
} catch (Exception ex) {
LOG.error(ex);
} finally {
RequestLifeCycle.end();
}
batchSize = batchSize > size ? size : batchSize;
int tSize = size / threadNum;
tSize = tSize > 0 ? tSize : size;
final int odd = tSize == size ? 0 : size % threadNum;
LOG.info("start upgrading {} users by {} threads, batchSize: {}", size, threadNum, batchSize);
//
final AtomicLong count = new AtomicLong();
List<Future<Boolean>> results = new LinkedList<Future<Boolean>>();
final ExoContainer container = ExoContainerContext.getCurrentContainer();
final PicketLinkIDMService idmService = container.getComponentInstanceOfType(PicketLinkIDMService.class);
final int threadSize = tSize;
final ListAccess<User> usrs = users;
final int totalSize = size;
ExecutorService execService = Executors.newFixedThreadPool(threadNum);
try {
//
for (int i = 0; i < threadNum; i++) {
final int idx = i;
//
results.add(execService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int start = idx * threadSize;
int end = start + threadSize;
if (idx == threadNum -1) {
end += odd;
}
ExoContainerContext.setCurrentContainer(container);
int startBatch = start;
int endBatch = startBatch + batchSize;
endBatch = endBatch > end ? end : endBatch;
while (endBatch <= totalSize && endBatch <= end && startBatch < endBatch) {
LOG.info("{} start: {}, end: {}", Thread.currentThread(), startBatch, endBatch);
RequestLifeCycle.begin(impl);
IdentitySession session = idmService.getIdentitySession();
User[] tmp = usrs.load(startBatch, endBatch - startBatch);
try {
for (User u : tmp) {
enableUser(u, session);
}
count.addAndGet(endBatch - startBatch);
startBatch = endBatch;
endBatch = startBatch + batchSize;
endBatch = endBatch > end ? end : endBatch;
LOG.info("{} finished successfully!", Thread.currentThread());
} catch (Exception e) {
LOG.error("An unexpected error occurs when migrating pages:", e);
return false;
} finally {
RequestLifeCycle.end();
}
}
return true;
}
}));
}
} finally {
execService.shutdown();
}
try {
for (Future<Boolean> r : results) {
if (!r.get()) {
LOG.error("Disable user migration completed with errors");
}
}
LOG.info("Finish upgrading {} users", count.get());
} catch (Exception ex) {
LOG.error(ex);
}
} else {
LOG.info("Ignore upgrade user due to not using PicketLinkIDMOrganizationServiceImpl");
}
}
@Override
public boolean shouldProceedToUpgrade(String newVersion, String previousVersion) {
return VersionComparator.isBefore(previousVersion, VERSION);
}
private void enableUser(User u, IdentitySession session) {
if (u instanceof UserImpl) {
((UserImpl) u).setEnabled(true);
Attribute[] attrs = new Attribute[] { new SimpleAttribute(UserDAOImpl.USER_ENABLED, String.valueOf(true)) };
AttributesManager am = session.getAttributesManager();
try {
am.updateAttributes(u.getUserName(), attrs);
} catch (Exception e) {
LOG.error(e);
}
}
}
}