From 94fc71e371a3e70791781f01bf8b05319c82e4ff Mon Sep 17 00:00:00 2001 From: roman Date: Sat, 30 Apr 2016 00:09:56 +0300 Subject: [PATCH] task5 --- build.gradle | 2 - src/main/java/vr/LightExecutionException.java | 5 + src/main/java/vr/LightFuture.java | 13 ++ src/main/java/vr/ThreadPoolImpl.java | 138 +++++++++++++++ src/test/java/vr/ThreadPoolImplTest.java | 159 ++++++++++++++++++ 5 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 src/main/java/vr/LightExecutionException.java create mode 100644 src/main/java/vr/LightFuture.java create mode 100644 src/main/java/vr/ThreadPoolImpl.java create mode 100644 src/test/java/vr/ThreadPoolImplTest.java diff --git a/build.gradle b/build.gradle index c33546c..1c437fa 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,5 @@ repositories { } dependencies { - compile 'org.jetbrains:annotations:15.0' - compile 'com.googlecode.json-simple:json-simple:1.1' testCompile group: 'junit', name: 'junit', version: '4.12' } diff --git a/src/main/java/vr/LightExecutionException.java b/src/main/java/vr/LightExecutionException.java new file mode 100644 index 0000000..a225d4f --- /dev/null +++ b/src/main/java/vr/LightExecutionException.java @@ -0,0 +1,5 @@ +package vr; + + +public class LightExecutionException extends Exception { +} diff --git a/src/main/java/vr/LightFuture.java b/src/main/java/vr/LightFuture.java new file mode 100644 index 0000000..0caf121 --- /dev/null +++ b/src/main/java/vr/LightFuture.java @@ -0,0 +1,13 @@ +package vr; + + +import java.util.function.Function; + +public interface LightFuture { + + boolean isReady(); + + T get() throws Exception; + + LightFuture thenApply(Function function); +} diff --git a/src/main/java/vr/ThreadPoolImpl.java b/src/main/java/vr/ThreadPoolImpl.java new file mode 100644 index 0000000..85eff8c --- /dev/null +++ b/src/main/java/vr/ThreadPoolImpl.java @@ -0,0 +1,138 @@ +package vr; + + +import java.util.LinkedList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class ThreadPoolImpl { + + private final List threads = new LinkedList<>(); + private final LinkedList taskList = new LinkedList<>(); + + public ThreadPoolImpl(int threadCount) { + IntStream.range(0, threadCount).forEach(i -> threads.add(new Thread(() -> { + Future future = null; + while (!Thread.interrupted()) { + try { + synchronized (taskList) { + while (taskList.isEmpty()) { + if (Thread.interrupted()) throw new InterruptedException(); + taskList.wait(); + } + future = taskList.poll(); + } + future.calculateResult(); + future = null; + } catch (InterruptedException e) { + break; + } + } + }))); + threads.forEach(Thread::start); + } + + public void shutdown() { + synchronized (taskList) { + taskList.forEach(future -> future.corrupt(new LightExecutionException())); + taskList.clear(); + } + threads.forEach(Thread::interrupt); + } + + + public LightFuture submit(Supplier supplier) { + Future future = new Future<>(supplier); + synchronized (taskList) { + taskList.add(future); + taskList.notify(); + } + return future; + } + + public class Future implements LightFuture { + + private final Supplier supplier; + private volatile boolean isReady = false; + private volatile Exception exception = null; + private volatile T result = null; + private final List dependentTasks = new LinkedList<>(); + + public Future(Supplier supplierForFuture) { + supplier = supplierForFuture; + } + + public boolean isReady() { + return isReady; + } + + public T get() throws Exception { + if (!isReady) { + synchronized (this) { + while (!isReady) { + this.wait(); + } + } + } + if (exception != null) throw exception; + return result; + } + + public LightFuture thenApply(Function function) { + Future future = new Future<>(() -> function.apply(getResult())); + synchronized (dependentTasks) { + if (isReady) { + if (exception != null) { + future.corrupt(exception); + } else { + synchronized (taskList) { + taskList.add(future); + taskList.notify(); + } + } + } else { + dependentTasks.add(future); + } + } + return future; + } + + private void calculateResult() { + try { + result = supplier.get(); + synchronized (dependentTasks) { + synchronized (taskList) { + taskList.addAll(dependentTasks); + taskList.notifyAll(); + dependentTasks.clear(); + } + isReady = true; + } + } catch (Exception e) { + corrupt(new LightExecutionException()); + } + + synchronized (this) { + this.notifyAll(); + } + } + + private T getResult() { + return result; + } + + private void corrupt(Exception exception) { + this.exception = exception; + synchronized (dependentTasks) { + dependentTasks.forEach(future -> future.corrupt(exception)); + dependentTasks.clear(); + isReady = true; + } + synchronized (this) { + this.notifyAll(); + } + } + } +} diff --git a/src/test/java/vr/ThreadPoolImplTest.java b/src/test/java/vr/ThreadPoolImplTest.java new file mode 100644 index 0000000..496afea --- /dev/null +++ b/src/test/java/vr/ThreadPoolImplTest.java @@ -0,0 +1,159 @@ +package vr; + +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.IntStream; + +import static org.junit.Assert.*; + + +public class ThreadPoolImplTest { + + public static final int THREAD_COUNT = 4; + public static final int TIME_TO_SLEEP = 10; + public static final int COUNT_OF_TASKS = 100; + + @Test + public void getTest() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + + IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + return i; + })); + + for (int i = 0; i < COUNT_OF_TASKS; ++i) { + assertEquals(i, futures[i].get()); + } + + pool.shutdown(); + } + + @Test + @SuppressWarnings("all") + public void numberOfThreadsTest() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + Set threadIds = new ConcurrentSkipListSet<>(); // hope i can use it instead of synchronized + IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + threadIds.add(Thread.currentThread().getId()); + return i; + })); + + for (int i = 0; i < COUNT_OF_TASKS; ++i) { + futures[i].get(); + } + if (TIME_TO_SLEEP >= 10) { // if time to small, then one process can do all work + assertEquals(threadIds.size(), THREAD_COUNT); + } + + pool.shutdown(); + } + + @Test + public void isReadyTest() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + + IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + return i; + })); + + for (int i = 0; i < COUNT_OF_TASKS; ++i) { + futures[i].isReady(); + futures[i].get(); + assertTrue(futures[i].isReady()); + } + + pool.shutdown(); + } + + @Test + @SuppressWarnings("unchecked") + public void thenApplyTest() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + + IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + return i; + })); + + for (int i = 0; i < COUNT_OF_TASKS / 2; ++i) { + futures[i] = futures[i].thenApply(n -> n + 2); + } + + futures[COUNT_OF_TASKS - 1].get(); // wait until all tasks complete + + for (int i = COUNT_OF_TASKS / 2; i < COUNT_OF_TASKS; ++i) { + futures[i] = futures[i].thenApply(n -> n + 2); + } + + for (int i = 0; i < COUNT_OF_TASKS; ++i) { + assertEquals(i + 2, futures[i].get().intValue()); + } + + pool.shutdown(); + } + + @Test(expected = LightExecutionException.class) + public void thenExceptionAfterShutdown() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + + IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + return i; + })); + + pool.shutdown(); + + for (int i = 0; i < COUNT_OF_TASKS; ++i) { + futures[i].get(); + } + } + + @Test(expected = LightExecutionException.class) + public void thenExceptionInside() throws Exception { + LightFuture[] futures = new LightFuture[COUNT_OF_TASKS + 1]; + boolean[] array = new boolean[COUNT_OF_TASKS]; + ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); + + IntStream.range(0, COUNT_OF_TASKS + 1).forEach(i -> futures[i] = pool.submit(() -> { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException ignored) { + } + array[i] = true; // out of bounds exception + return i; + })); + + pool.shutdown(); + + for (int i = 0; i < COUNT_OF_TASKS + 1; ++i) { + futures[i].get(); + } + + assertFalse(array[COUNT_OF_TASKS]); + } +}