The following programme has developed as part of my assignments at ST, Thanks to Terry for your continuous patronage.. actually greatly thankful for your patience and understanding. Thanks for you.
Thread Pool Manager (Rough implementation) - contains a manager which maintains minimum number of threads always and executes the tasks as and when they are added.
Again this is a rough implementation with main features. You can add the other features like Singleton property and any other methods you like that a thread pool manager should have.
Testcases cover the methods that are developed.
Note: For any assistance or help, you are always welcome to write a comment or drop a mail @complanboy2@gmail.com. https://www.facebook.com/complanboy2
Thread Pool Manager:
/**
*
*/
package Threads;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
/**
* @author veerasekhar
*
*/
public final class ThreadPoolManager
{
int minNumberOfThreads, maxNumberOfThreads;
long idleTime;
private volatile boolean isAlive;
private volatile boolean isShutDown;
protected ArrayList<Runnable> jobs;
protected Set<PooledThread> threads;
public ThreadPoolManager()
{
this(2,5,1000);
}
public ThreadPoolManager(final int minNumberOfThreads, final int maxNumberOfThreads, final int idleTime)
{
this.minNumberOfThreads = minNumberOfThreads;
this.maxNumberOfThreads = maxNumberOfThreads;
this.idleTime = idleTime;
isAlive = true;
jobs = new ArrayList<Runnable>();
threads = new HashSet<PooledThread>();
// Create the min number of threads
for(int i=0;i<minNumberOfThreads;i++)
{
PooledThread thread = new PooledThread();
threads.add(thread);
thread.setName("Thread " + i);
thread.start();
}
}
public final void execute(final Runnable task)
{
// Execute if threadPool is Alive only.
if (!isAlive)
{
throw new IllegalStateException("");
}
// Add the jobs to the queue and new threads if jobs added are > Curren Number of Threads.
synchronized(jobs)
{
jobs.add(task);
setMaxThreads(jobs.size());
jobs.notify();
}
}
private final void setMaxThreads(final int jobSize)
{
int threadPoolSize = threads.size();
if(jobSize > threadPoolSize && threadPoolSize < maxNumberOfThreads)
{
PooledThread thread = new PooledThread();
threads.add(thread);
thread.start();
}
}
private final class PooledThread extends Thread
{
@Override
public void run()
{
// Run while the thread is Alive
while(isAlive)
{
Runnable task = null;
synchronized (jobs)
{
// If jobs are empty, wait for specified idle time
// Or till some body get communicate (notify, notifyall, interrupt) with the waiting thread.
if(jobs.isEmpty())
{
try
{
jobs.wait(idleTime);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
// Allow only if the queue is not empty and threads are get notified
// when shutDownNow is called, thenthreads will be notified
// and dont allow the thread to remove the job from the jobs queue
if( !(jobs.isEmpty() || isShutDown) )
{
task = (Runnable) jobs.remove(0);
}
// If idle for the idle time
else
{
// Check for excessive threads and remove it from thread pool
if(threads.size() > minNumberOfThreads)
{
threads.remove(this);
break;
}
}
}
if(task != null)
{
task.run();
}
}
}
}
/* shutsdown threadpool immediately and completes current started jobs only. */
public final int shutdownNow()
{
isAlive = false;
isShutDown = true;
// Interrupt the threads(min number always) in the waiting state to get completed
for (PooledThread thread : threads)
{
thread.interrupt();
}
return jobs.size();
}
// shutsdown only al started jobs and awaiting jobs in the queue get completed
// Notify all the waiting threads.. so that they can be brought to end
public final int shutdown()
{
isAlive = false;
// Interrupt the threads(min number always) in the waiting state to get completed
for (PooledThread thread : threads)
{
thread.interrupt();
}
return jobs.size();
}
}
Testcases:
package Threads;
import static org.junit.Assert.*;
import org.junit.Test;
/**
* @author veerasekhar
*
*/
public class ThreadPoolTest
{
ThreadPoolManager tp;
int numberOfThreads;
@Test(expected=IllegalStateException.class)
public void test1() throws InterruptedException
{
int numberOfTasks = 20;
ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
for (int i = 0; i < numberOfTasks; i++)
{
threadPool.execute(createTask(i));
}
System.out.println(threadPool.shutdownNow());
//check it is not accepting any more tasks
threadPool.execute(createTask(-1));
//check all threads are dead
assertEquals(threadPool.threads.size(), 0);
}
@Test(expected=IllegalStateException.class)
public void test2() throws InterruptedException
{
int numberOfTasks = 20;
ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
for (int i = 0; i < numberOfTasks; i++)
{
threadPool.execute(createTask(i));
}
int jobsSize = threadPool.shutdown();
//check it is not accepting any more tasks
threadPool.execute(createTask(-1));
//check all threads are dead
Thread.sleep(jobsSize * 10);
assertEquals(threadPool.threads.size(), 0);
assertEquals(threadPool.jobs.size(), 0);
}
private static Runnable createTask(final int taskID)
{
return new Runnable()
{
public void run()
{
System.out.println("Task " + taskID + ": start");
try
{
Thread.sleep(10);
}
catch(Exception e)
{
e.printStackTrace();
}
System.out.println("Task " + taskID + ": end");
}
};
}
}
Thread Pool Manager (Rough implementation) - contains a manager which maintains minimum number of threads always and executes the tasks as and when they are added.
Again this is a rough implementation with main features. You can add the other features like Singleton property and any other methods you like that a thread pool manager should have.
Testcases cover the methods that are developed.
Note: For any assistance or help, you are always welcome to write a comment or drop a mail @complanboy2@gmail.com. https://www.facebook.com/complanboy2
Thread Pool Manager:
/**
*
*/
package Threads;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
/**
* @author veerasekhar
*
*/
public final class ThreadPoolManager
{
int minNumberOfThreads, maxNumberOfThreads;
long idleTime;
private volatile boolean isAlive;
private volatile boolean isShutDown;
protected ArrayList<Runnable> jobs;
protected Set<PooledThread> threads;
public ThreadPoolManager()
{
this(2,5,1000);
}
public ThreadPoolManager(final int minNumberOfThreads, final int maxNumberOfThreads, final int idleTime)
{
this.minNumberOfThreads = minNumberOfThreads;
this.maxNumberOfThreads = maxNumberOfThreads;
this.idleTime = idleTime;
isAlive = true;
jobs = new ArrayList<Runnable>();
threads = new HashSet<PooledThread>();
// Create the min number of threads
for(int i=0;i<minNumberOfThreads;i++)
{
PooledThread thread = new PooledThread();
threads.add(thread);
thread.setName("Thread " + i);
thread.start();
}
}
public final void execute(final Runnable task)
{
// Execute if threadPool is Alive only.
if (!isAlive)
{
throw new IllegalStateException("");
}
// Add the jobs to the queue and new threads if jobs added are > Curren Number of Threads.
synchronized(jobs)
{
jobs.add(task);
setMaxThreads(jobs.size());
jobs.notify();
}
}
private final void setMaxThreads(final int jobSize)
{
int threadPoolSize = threads.size();
if(jobSize > threadPoolSize && threadPoolSize < maxNumberOfThreads)
{
PooledThread thread = new PooledThread();
threads.add(thread);
thread.start();
}
}
private final class PooledThread extends Thread
{
@Override
public void run()
{
// Run while the thread is Alive
while(isAlive)
{
Runnable task = null;
synchronized (jobs)
{
// If jobs are empty, wait for specified idle time
// Or till some body get communicate (notify, notifyall, interrupt) with the waiting thread.
if(jobs.isEmpty())
{
try
{
jobs.wait(idleTime);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
// Allow only if the queue is not empty and threads are get notified
// when shutDownNow is called, thenthreads will be notified
// and dont allow the thread to remove the job from the jobs queue
if( !(jobs.isEmpty() || isShutDown) )
{
task = (Runnable) jobs.remove(0);
}
// If idle for the idle time
else
{
// Check for excessive threads and remove it from thread pool
if(threads.size() > minNumberOfThreads)
{
threads.remove(this);
break;
}
}
}
if(task != null)
{
task.run();
}
}
}
}
/* shutsdown threadpool immediately and completes current started jobs only. */
public final int shutdownNow()
{
isAlive = false;
isShutDown = true;
// Interrupt the threads(min number always) in the waiting state to get completed
for (PooledThread thread : threads)
{
thread.interrupt();
}
return jobs.size();
}
// shutsdown only al started jobs and awaiting jobs in the queue get completed
// Notify all the waiting threads.. so that they can be brought to end
public final int shutdown()
{
isAlive = false;
// Interrupt the threads(min number always) in the waiting state to get completed
for (PooledThread thread : threads)
{
thread.interrupt();
}
return jobs.size();
}
}
Testcases:
package Threads;
import static org.junit.Assert.*;
import org.junit.Test;
/**
* @author veerasekhar
*
*/
public class ThreadPoolTest
{
ThreadPoolManager tp;
int numberOfThreads;
@Test(expected=IllegalStateException.class)
public void test1() throws InterruptedException
{
int numberOfTasks = 20;
ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
for (int i = 0; i < numberOfTasks; i++)
{
threadPool.execute(createTask(i));
}
System.out.println(threadPool.shutdownNow());
//check it is not accepting any more tasks
threadPool.execute(createTask(-1));
//check all threads are dead
assertEquals(threadPool.threads.size(), 0);
}
@Test(expected=IllegalStateException.class)
public void test2() throws InterruptedException
{
int numberOfTasks = 20;
ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
for (int i = 0; i < numberOfTasks; i++)
{
threadPool.execute(createTask(i));
}
int jobsSize = threadPool.shutdown();
//check it is not accepting any more tasks
threadPool.execute(createTask(-1));
//check all threads are dead
Thread.sleep(jobsSize * 10);
assertEquals(threadPool.threads.size(), 0);
assertEquals(threadPool.jobs.size(), 0);
}
private static Runnable createTask(final int taskID)
{
return new Runnable()
{
public void run()
{
System.out.println("Task " + taskID + ": start");
try
{
Thread.sleep(10);
}
catch(Exception e)
{
e.printStackTrace();
}
System.out.println("Task " + taskID + ": end");
}
};
}
}