Thread Pool Manager and its Test Cases

 The following programme has developed as part of my assignments at ST, Thanks to Terry for your continuous patronage.. actually greatly thankful for your patience and understanding. Thanks for you.

Thread Pool Manager (Rough implementation) -  contains a manager which maintains minimum number of threads always and executes the tasks as and when they are added.


Again this is a rough implementation with main features. You can add the other features like Singleton property and any other methods you like that a thread pool manager should have.

Testcases cover the methods that are developed.

Note: For any assistance or help, you are always welcome to write a comment or drop a mail @complanboy2@gmail.com. https://www.facebook.com/complanboy2


Thread Pool Manager:

/**
 *
 */
package Threads;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

/**
 * @author veerasekhar
 *
 */

public final class ThreadPoolManager 
{   
    int minNumberOfThreads, maxNumberOfThreads;
    long idleTime;
   
    private volatile boolean isAlive;
    private volatile boolean isShutDown;
    protected ArrayList<Runnable> jobs;
    protected Set<PooledThread> threads;
   
    public ThreadPoolManager()
    {
        this(2,5,1000);
    }
   
    public ThreadPoolManager(final int minNumberOfThreads, final int maxNumberOfThreads, final int idleTime)
    {
        this.minNumberOfThreads = minNumberOfThreads;
        this.maxNumberOfThreads = maxNumberOfThreads;
        this.idleTime = idleTime;
       
        isAlive = true;
        jobs = new ArrayList<Runnable>();
        threads = new HashSet<PooledThread>();
       
        // Create the min number of threads
        for(int i=0;i<minNumberOfThreads;i++)
        {
            PooledThread thread = new PooledThread();   
           
            threads.add(thread);
            thread.setName("Thread " + i);
           
            thread.start();
        }
    }   

    public final void execute(final Runnable task)
    {
        // Execute if threadPool is Alive only.
        if (!isAlive)
        {
            throw new IllegalStateException("");
        }
       
        // Add the jobs to the queue and new threads if jobs added are > Curren Number of Threads.
        synchronized(jobs)
        {   
            jobs.add(task);
           
            setMaxThreads(jobs.size());
           
            jobs.notify();
        }
    }
   
    private final void setMaxThreads(final int jobSize)
    {
        int threadPoolSize = threads.size();
        if(jobSize > threadPoolSize && threadPoolSize < maxNumberOfThreads)
        {
            PooledThread thread = new PooledThread();
            threads.add(thread);
           
            thread.start();                       
        }
    }
           
    private final class PooledThread extends Thread
    {       
        @Override
        public void run()
        {   
            // Run while the thread is Alive
            while(isAlive)
            {
                Runnable task = null;   
                               
                synchronized (jobs)
                {   
                    // If jobs are empty, wait for specified idle time
                    // Or till some body get communicate (notify, notifyall, interrupt) with the waiting thread.
                    if(jobs.isEmpty())
                    {
                        try
                        {   
                            jobs.wait(idleTime);
                        }
                        catch(InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                    }
                   
                    // Allow only if the queue is not empty and threads are get notified
                    // when shutDownNow is called, thenthreads will be notified
                    // and dont allow the thread to remove the job from the jobs queue                   
                    if( !(jobs.isEmpty() || isShutDown) )
                    {
                        task = (Runnable) jobs.remove(0);
                    }
                    // If idle for the idle time
                    else
                    {
                        // Check for excessive threads and remove it from thread pool
                        if(threads.size() > minNumberOfThreads)
                        {
                            threads.remove(this);
                            break;
                        }
                    }                   
                }
               
                if(task != null)
                {
                    task.run();
                }
            }
        }
    }
                   
   
    /* shutsdown threadpool immediately and completes current started jobs only. */
    public final int shutdownNow()
    {
               
        isAlive = false;
        isShutDown = true;
       
        // Interrupt the threads(min number always) in the waiting state to get completed
        for (PooledThread thread : threads)
        {
            thread.interrupt();
        }
       
        return jobs.size();
    }
   
    // shutsdown only al started jobs and awaiting jobs in the queue get completed
    // Notify all the waiting threads.. so that they can be brought to end
    public final int shutdown()
    {   
        isAlive = false;
       
        // Interrupt the threads(min number always) in the waiting state to get completed
        for (PooledThread thread : threads)
        {
            thread.interrupt();
        }
        return jobs.size();
    }
}  

Testcases:


package Threads;

import static org.junit.Assert.*;

import org.junit.Test;

/**
 * @author veerasekhar
 *
 */
public class ThreadPoolTest
{
    ThreadPoolManager tp;
    int numberOfThreads;


    @Test(expected=IllegalStateException.class)
    public void test1() throws InterruptedException
    {
        int numberOfTasks = 20;
       
        ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
        for (int i = 0; i < numberOfTasks; i++)
        {
            threadPool.execute(createTask(i));
        }
       
        System.out.println(threadPool.shutdownNow());
       
        //check it is not accepting any more tasks       
        threadPool.execute(createTask(-1));
       
        //check all threads are dead
        assertEquals(threadPool.threads.size(), 0);
    }

    @Test(expected=IllegalStateException.class)
    public void test2() throws InterruptedException
    {
        int numberOfTasks = 20;
       
        ThreadPoolManager threadPool = new ThreadPoolManager(2, 5, 1000);
        for (int i = 0; i < numberOfTasks; i++)
        {
            threadPool.execute(createTask(i));
        }
       
        int jobsSize = threadPool.shutdown();
       
        //check it is not accepting any more tasks
        threadPool.execute(createTask(-1));
       
        //check all threads are dead
        Thread.sleep(jobsSize * 10);
       
        assertEquals(threadPool.threads.size(), 0);
        assertEquals(threadPool.jobs.size(), 0);
    }

    private static Runnable createTask(final int taskID)
    {
        return new Runnable()
        {
            public void run()
            {
                System.out.println("Task " + taskID + ": start");   
               
                try
                {
                    Thread.sleep(10);
                }
                catch(Exception e)
                {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskID + ": end");
            }
        };
    }
}