-
Notifications
You must be signed in to change notification settings - Fork 0
Java05. ДЗ 05, Васильев Роман #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| package vr; | ||
|
|
||
|
|
||
| public class LightExecutionException extends Exception { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| package vr; | ||
|
|
||
|
|
||
| import java.util.function.Function; | ||
|
|
||
| public interface LightFuture<T> { | ||
|
|
||
| boolean isReady(); | ||
|
|
||
| T get() throws Exception; | ||
|
|
||
| <Y> LightFuture<Y> thenApply(Function<? super T, Y> function); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| package vr; | ||
|
|
||
|
|
||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| public class ThreadPoolImpl { | ||
|
|
||
| private final List<Thread> threads = new LinkedList<>(); | ||
| private final LinkedList<Future> taskList = new LinkedList<>(); | ||
|
|
||
| public ThreadPoolImpl(int threadCount) { | ||
| IntStream.range(0, threadCount).forEach(i -> threads.add(new Thread(() -> { | ||
| Future future = null; | ||
| while (!Thread.interrupted()) { | ||
| try { | ||
| synchronized (taskList) { | ||
| while (taskList.isEmpty()) { | ||
| if (Thread.interrupted()) throw new InterruptedException(); | ||
| taskList.wait(); | ||
| } | ||
| future = taskList.poll(); | ||
| } | ||
| future.calculateResult(); | ||
| future = null; | ||
| } catch (InterruptedException e) { | ||
| break; | ||
| } | ||
| } | ||
| }))); | ||
| threads.forEach(Thread::start); | ||
| } | ||
|
|
||
| public void shutdown() { | ||
| synchronized (taskList) { | ||
| taskList.forEach(future -> future.corrupt(new LightExecutionException())); | ||
| taskList.clear(); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. пусть довычисляются, думаю, достаточно только флаг проставить |
||
| threads.forEach(Thread::interrupt); | ||
| } | ||
|
|
||
|
|
||
| public <T> LightFuture<T> submit(Supplier<T> supplier) { | ||
| Future<T> future = new Future<>(supplier); | ||
| synchronized (taskList) { | ||
| taskList.add(future); | ||
| taskList.notify(); | ||
| } | ||
| return future; | ||
| } | ||
|
|
||
| public class Future<T> implements LightFuture<T> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. почему не private? |
||
|
|
||
| private final Supplier<T> supplier; | ||
| private volatile boolean isReady = false; | ||
| private volatile Exception exception = null; | ||
| private volatile T result = null; | ||
| private final List<Future> dependentTasks = new LinkedList<>(); | ||
|
|
||
| public Future(Supplier<T> supplierForFuture) { | ||
| supplier = supplierForFuture; | ||
| } | ||
|
|
||
| public boolean isReady() { | ||
| return isReady; | ||
| } | ||
|
|
||
| public T get() throws Exception { | ||
| if (!isReady) { | ||
| synchronized (this) { | ||
| while (!isReady) { | ||
| this.wait(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this, -0.5 |
||
| } | ||
| } | ||
| } | ||
| if (exception != null) throw exception; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. из get может лететь только ExecutionException, либо еще InterruptedException, все остальные исключения ставятся вышеуказанным в качестве причины |
||
| return result; | ||
| } | ||
|
|
||
| public <Y> LightFuture<Y> thenApply(Function<? super T, Y> function) { | ||
| Future<Y> future = new Future<>(() -> function.apply(getResult())); | ||
| synchronized (dependentTasks) { | ||
| if (isReady) { | ||
| if (exception != null) { | ||
| future.corrupt(exception); | ||
| } else { | ||
| synchronized (taskList) { | ||
| taskList.add(future); | ||
| taskList.notify(); | ||
| } | ||
| } | ||
| } else { | ||
| dependentTasks.add(future); | ||
| } | ||
| } | ||
| return future; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Убедись, что следующее верно для твоего кода (я пока что не вчитывался в код, просто заранее предупреждаю): Предположим в пуле всего два треда. Мы положили туда одно задание, потом второе, зависимое от первого. Первое вычисляется долго. Второе его ждет. Приходит третье задание, у него должны быть все шансы выполниться |
||
|
|
||
| private void calculateResult() { | ||
| try { | ||
| result = supplier.get(); | ||
| synchronized (dependentTasks) { | ||
| synchronized (taskList) { | ||
| taskList.addAll(dependentTasks); | ||
| taskList.notifyAll(); | ||
| dependentTasks.clear(); | ||
| } | ||
| isReady = true; | ||
| } | ||
| } catch (Exception e) { | ||
| corrupt(new LightExecutionException()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. e можно передать в качестве аргумента конструктору, как это сделано во многих других исключениях |
||
| } | ||
|
|
||
| synchronized (this) { | ||
| this.notifyAll(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this |
||
| } | ||
| } | ||
|
|
||
| private T getResult() { | ||
| return result; | ||
| } | ||
|
|
||
| private void corrupt(Exception exception) { | ||
| this.exception = exception; | ||
| synchronized (dependentTasks) { | ||
| dependentTasks.forEach(future -> future.corrupt(exception)); | ||
| dependentTasks.clear(); | ||
| isReady = true; | ||
| } | ||
| synchronized (this) { | ||
| this.notifyAll(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this |
||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| package vr; | ||
|
|
||
| import org.junit.Test; | ||
|
|
||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentSkipListSet; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| import static org.junit.Assert.*; | ||
|
|
||
|
|
||
| public class ThreadPoolImplTest { | ||
|
|
||
| public static final int THREAD_COUNT = 4; | ||
| public static final int TIME_TO_SLEEP = 10; | ||
| public static final int COUNT_OF_TASKS = 100; | ||
|
|
||
| @Test | ||
| public void getTest() throws Exception { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. именование тестов осуществляется либо testBlahBlahBlah, либо blahBlahBlah. В 3 junit работает только первый способ. В тестовом классе всегда должно быть одинаковое именование |
||
| LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
|
|
||
| IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| return i; | ||
| })); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS; ++i) { | ||
| assertEquals(i, futures[i].get()); | ||
| } | ||
|
|
||
| pool.shutdown(); | ||
| } | ||
|
|
||
| @Test | ||
| @SuppressWarnings("all") | ||
| public void numberOfThreadsTest() throws Exception { | ||
| LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
| Set<Long> threadIds = new ConcurrentSkipListSet<>(); // hope i can use it instead of synchronized | ||
| IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| threadIds.add(Thread.currentThread().getId()); | ||
| return i; | ||
| })); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS; ++i) { | ||
| futures[i].get(); | ||
| } | ||
| if (TIME_TO_SLEEP >= 10) { // if time to small, then one process can do all work | ||
| assertEquals(threadIds.size(), THREAD_COUNT); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. не очень понял это условие, как результат зависит от времени сна? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. аа, дошло |
||
|
|
||
| pool.shutdown(); | ||
| } | ||
|
|
||
| @Test | ||
| public void isReadyTest() throws Exception { | ||
| LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
|
|
||
| IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| return i; | ||
| })); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS; ++i) { | ||
| futures[i].isReady(); | ||
| futures[i].get(); | ||
| assertTrue(futures[i].isReady()); | ||
| } | ||
|
|
||
| pool.shutdown(); | ||
| } | ||
|
|
||
| @Test | ||
| @SuppressWarnings("unchecked") | ||
| public void thenApplyTest() throws Exception { | ||
| LightFuture<Integer>[] futures = new LightFuture[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
|
|
||
| IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| return i; | ||
| })); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS / 2; ++i) { | ||
| futures[i] = futures[i].thenApply(n -> n + 2); | ||
| } | ||
|
|
||
| futures[COUNT_OF_TASKS - 1].get(); // wait until all tasks complete | ||
|
|
||
| for (int i = COUNT_OF_TASKS / 2; i < COUNT_OF_TASKS; ++i) { | ||
| futures[i] = futures[i].thenApply(n -> n + 2); | ||
| } | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS; ++i) { | ||
| assertEquals(i + 2, futures[i].get().intValue()); | ||
| } | ||
|
|
||
| pool.shutdown(); | ||
| } | ||
|
|
||
| @Test(expected = LightExecutionException.class) | ||
| public void thenExceptionAfterShutdown() throws Exception { | ||
| LightFuture[] futures = new LightFuture[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
|
|
||
| IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| return i; | ||
| })); | ||
|
|
||
| pool.shutdown(); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS; ++i) { | ||
| futures[i].get(); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. почему должно быть исключение? и какое? и в каком Future? |
||
| } | ||
|
|
||
| @Test(expected = LightExecutionException.class) | ||
| public void thenExceptionInside() throws Exception { | ||
| LightFuture[] futures = new LightFuture[COUNT_OF_TASKS + 1]; | ||
| boolean[] array = new boolean[COUNT_OF_TASKS]; | ||
| ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT); | ||
|
|
||
| IntStream.range(0, COUNT_OF_TASKS + 1).forEach(i -> futures[i] = pool.submit(() -> { | ||
| try { | ||
| Thread.sleep(TIME_TO_SLEEP); | ||
| } catch (InterruptedException ignored) { | ||
| } | ||
| array[i] = true; // out of bounds exception | ||
| return i; | ||
| })); | ||
|
|
||
| pool.shutdown(); | ||
|
|
||
| for (int i = 0; i < COUNT_OF_TASKS + 1; ++i) { | ||
| futures[i].get(); | ||
| } | ||
|
|
||
| assertFalse(array[COUNT_OF_TASKS]); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. откуда должно полететь исключение? думаю промоделировать этот процесс можно гораздо проще |
||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. тесты надо писать как можно проще, чтобы в них было как можно меньше излишков |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
зачем эта строка?