Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ repositories {
}

dependencies {
compile 'org.jetbrains:annotations:15.0'
compile 'com.googlecode.json-simple:json-simple:1.1'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
5 changes: 5 additions & 0 deletions src/main/java/vr/LightExecutionException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package vr;


public class LightExecutionException extends Exception {
}
13 changes: 13 additions & 0 deletions src/main/java/vr/LightFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package vr;


import java.util.function.Function;

public interface LightFuture<T> {

boolean isReady();

T get() throws Exception;

<Y> LightFuture<Y> thenApply(Function<? super T, Y> function);
}
138 changes: 138 additions & 0 deletions src/main/java/vr/ThreadPoolImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package vr;


import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class ThreadPoolImpl {

private final List<Thread> threads = new LinkedList<>();
private final LinkedList<Future> taskList = new LinkedList<>();

public ThreadPoolImpl(int threadCount) {
IntStream.range(0, threadCount).forEach(i -> threads.add(new Thread(() -> {
Future future = null;
while (!Thread.interrupted()) {
try {
synchronized (taskList) {
while (taskList.isEmpty()) {
if (Thread.interrupted()) throw new InterruptedException();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем эта строка?

taskList.wait();
}
future = taskList.poll();
}
future.calculateResult();
future = null;
} catch (InterruptedException e) {
break;
}
}
})));
threads.forEach(Thread::start);
}

public void shutdown() {
synchronized (taskList) {
taskList.forEach(future -> future.corrupt(new LightExecutionException()));
taskList.clear();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

пусть довычисляются, думаю, достаточно только флаг проставить

threads.forEach(Thread::interrupt);
}


public <T> LightFuture<T> submit(Supplier<T> supplier) {
Future<T> future = new Future<>(supplier);
synchronized (taskList) {
taskList.add(future);
taskList.notify();
}
return future;
}

public class Future<T> implements LightFuture<T> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

почему не private?


private final Supplier<T> supplier;
private volatile boolean isReady = false;
private volatile Exception exception = null;
private volatile T result = null;
private final List<Future> dependentTasks = new LinkedList<>();

public Future(Supplier<T> supplierForFuture) {
supplier = supplierForFuture;
}

public boolean isReady() {
return isReady;
}

public T get() throws Exception {
if (!isReady) {
synchronized (this) {
while (!isReady) {
this.wait();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this, -0.5

}
}
}
if (exception != null) throw exception;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

из get может лететь только ExecutionException, либо еще InterruptedException, все остальные исключения ставятся вышеуказанным в качестве причины

return result;
}

public <Y> LightFuture<Y> thenApply(Function<? super T, Y> function) {
Future<Y> future = new Future<>(() -> function.apply(getResult()));
synchronized (dependentTasks) {
if (isReady) {
if (exception != null) {
future.corrupt(exception);
} else {
synchronized (taskList) {
taskList.add(future);
taskList.notify();
}
}
} else {
dependentTasks.add(future);
}
}
return future;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Убедись, что следующее верно для твоего кода (я пока что не вчитывался в код, просто заранее предупреждаю):

Предположим в пуле всего два треда. Мы положили туда одно задание, потом второе, зависимое от первого. Первое вычисляется долго. Второе его ждет. Приходит третье задание, у него должны быть все шансы выполниться


private void calculateResult() {
try {
result = supplier.get();
synchronized (dependentTasks) {
synchronized (taskList) {
taskList.addAll(dependentTasks);
taskList.notifyAll();
dependentTasks.clear();
}
isReady = true;
}
} catch (Exception e) {
corrupt(new LightExecutionException());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e можно передать в качестве аргумента конструктору, как это сделано во многих других исключениях

}

synchronized (this) {
this.notifyAll();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this

}
}

private T getResult() {
return result;
}

private void corrupt(Exception exception) {
this.exception = exception;
synchronized (dependentTasks) {
dependentTasks.forEach(future -> future.corrupt(exception));
dependentTasks.clear();
isReady = true;
}
synchronized (this) {
this.notifyAll();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this

}
}
}
}
159 changes: 159 additions & 0 deletions src/test/java/vr/ThreadPoolImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package vr;

import org.junit.Test;

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.IntStream;

import static org.junit.Assert.*;


public class ThreadPoolImplTest {

public static final int THREAD_COUNT = 4;
public static final int TIME_TO_SLEEP = 10;
public static final int COUNT_OF_TASKS = 100;

@Test
public void getTest() throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

именование тестов осуществляется либо testBlahBlahBlah, либо blahBlahBlah. В 3 junit работает только первый способ. В тестовом классе всегда должно быть одинаковое именование

LightFuture[] futures = new LightFuture[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);

IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
return i;
}));

for (int i = 0; i < COUNT_OF_TASKS; ++i) {
assertEquals(i, futures[i].get());
}

pool.shutdown();
}

@Test
@SuppressWarnings("all")
public void numberOfThreadsTest() throws Exception {
LightFuture[] futures = new LightFuture[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);
Set<Long> threadIds = new ConcurrentSkipListSet<>(); // hope i can use it instead of synchronized
IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
threadIds.add(Thread.currentThread().getId());
return i;
}));

for (int i = 0; i < COUNT_OF_TASKS; ++i) {
futures[i].get();
}
if (TIME_TO_SLEEP >= 10) { // if time to small, then one process can do all work
assertEquals(threadIds.size(), THREAD_COUNT);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

не очень понял это условие, как результат зависит от времени сна?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

аа, дошло


pool.shutdown();
}

@Test
public void isReadyTest() throws Exception {
LightFuture[] futures = new LightFuture[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);

IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
return i;
}));

for (int i = 0; i < COUNT_OF_TASKS; ++i) {
futures[i].isReady();
futures[i].get();
assertTrue(futures[i].isReady());
}

pool.shutdown();
}

@Test
@SuppressWarnings("unchecked")
public void thenApplyTest() throws Exception {
LightFuture<Integer>[] futures = new LightFuture[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);

IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
return i;
}));

for (int i = 0; i < COUNT_OF_TASKS / 2; ++i) {
futures[i] = futures[i].thenApply(n -> n + 2);
}

futures[COUNT_OF_TASKS - 1].get(); // wait until all tasks complete

for (int i = COUNT_OF_TASKS / 2; i < COUNT_OF_TASKS; ++i) {
futures[i] = futures[i].thenApply(n -> n + 2);
}

for (int i = 0; i < COUNT_OF_TASKS; ++i) {
assertEquals(i + 2, futures[i].get().intValue());
}

pool.shutdown();
}

@Test(expected = LightExecutionException.class)
public void thenExceptionAfterShutdown() throws Exception {
LightFuture[] futures = new LightFuture[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);

IntStream.range(0, COUNT_OF_TASKS).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
return i;
}));

pool.shutdown();

for (int i = 0; i < COUNT_OF_TASKS; ++i) {
futures[i].get();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

почему должно быть исключение? и какое? и в каком Future?

}

@Test(expected = LightExecutionException.class)
public void thenExceptionInside() throws Exception {
LightFuture[] futures = new LightFuture[COUNT_OF_TASKS + 1];
boolean[] array = new boolean[COUNT_OF_TASKS];
ThreadPoolImpl pool = new ThreadPoolImpl(THREAD_COUNT);

IntStream.range(0, COUNT_OF_TASKS + 1).forEach(i -> futures[i] = pool.submit(() -> {
try {
Thread.sleep(TIME_TO_SLEEP);
} catch (InterruptedException ignored) {
}
array[i] = true; // out of bounds exception
return i;
}));

pool.shutdown();

for (int i = 0; i < COUNT_OF_TASKS + 1; ++i) {
futures[i].get();
}

assertFalse(array[COUNT_OF_TASKS]);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

откуда должно полететь исключение? думаю промоделировать этот процесс можно гораздо проще

}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тесты надо писать как можно проще, чтобы в них было как можно меньше излишков