From 024c55ce34b698192a88e7d27574f1aca13cc1fc Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:18:37 +0800 Subject: [PATCH 01/15] Add files via upload --- Practice1.java | 35 +++++++++ Practice2.java | 77 ++++++++++++++++++++ Practice3.java | 88 +++++++++++++++++++++++ Practice4.java | 52 ++++++++++++++ Practice5.java | 188 +++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 440 insertions(+) create mode 100644 Practice1.java create mode 100644 Practice2.java create mode 100644 Practice3.java create mode 100644 Practice4.java create mode 100644 Practice5.java diff --git a/Practice1.java b/Practice1.java new file mode 100644 index 0000000..c8ad00f --- /dev/null +++ b/Practice1.java @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice1 { + + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return observable.map(s -> new Tuple2<>(1, s)).scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())); + } +} diff --git a/Practice2.java b/Practice2.java new file mode 100644 index 0000000..1061f66 --- /dev/null +++ b/Practice2.java @@ -0,0 +1,77 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Baoyi Chen + */ +public class Practice2 { + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + + return Observable.create(emitter -> { + ArrayList> list = new ArrayList(); + words.doOnComplete(() -> { + for (Tuple2 tuple2 : list) { + emitter.onNext(tuple2); + } + emitter.onComplete(); + }).subscribe(s -> { + int value = 1; + for (Tuple2 tuple2 : list) { + if (tuple2.getV1().equals(s)) { + list.remove(tuple2); + value = tuple2.getV2() + 1; + break; + } + } + list.add(new Tuple2<>(s, value)); + }); + + }); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return Single.create(singleEmitter -> { + HashMap map = new HashMap<>(); + words.doOnComplete(() -> singleEmitter.onSuccess(map)) + .subscribe(s -> { + Integer i = map.get(s); + map.put(s, i == null ? 1 : i++); + }); + }); + } + +} diff --git a/Practice3.java b/Practice3.java new file mode 100644 index 0000000..71e73f5 --- /dev/null +++ b/Practice3.java @@ -0,0 +1,88 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import io.reactivex.*; +import io.reactivex.functions.Action; +import io.reactivex.functions.BiFunction; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Baoyi Chen + */ +public class Practice3 { + + private Observable nodes(Node left, Node right, Integer value) { + if (left != null && right != null) { + return Observable.just(left, right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else if (left != null) { + return Observable.just(left, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else if (right != null) { + return Observable.just(right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else { + return Observable.just(value); + } + } + + /* + * 根据iterate的结果求和 + */ + public Maybe sum(Observable observable) { + return Maybe.create(maybeEmitter -> + observable + .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) + .scan((integer, integer2) -> integer + integer2) + .lastElement() + .subscribe(value -> maybeEmitter.onSuccess(value)) + ); + } + + /* + * 举例: + * 5 + * / \ + * 6 7 + * / \ \ + * 4 3 nil + * + * return Observable[4, 3, 6, 7, 5] 顺序无关 + */ + public Observable iterate(Observable observable) { + return Observable.create(maybeEmitter -> + observable + .doOnComplete(() -> maybeEmitter.onComplete()) + .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) + .subscribe(value -> maybeEmitter.onNext(value)) + ); + } + + public static class Node { + public Node left; + public Node right; + public int value; + + public Node(Node left, Node right, int value) { + this.left = left; + this.right = right; + this.value = value; + } + } + +} diff --git a/Practice4.java b/Practice4.java new file mode 100644 index 0000000..5b1df15 --- /dev/null +++ b/Practice4.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import io.reactivex.*; +import io.reactivex.schedulers.Schedulers; + + +/** + * @author Baoyi Chen + */ +public class Practice4 { + + /* + * 举例: + * 参数observable = Observable["a", "b", "c"] + * 参数observer在消费observable时,每个元素都在独立的线程 + * + * thread 1 --------------- + * |-----------| ["a"] | + * | --------------- + * | + * ------------------------- ---------- |thread 2 --------------- + * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | + * ------------------------- ---------- | --------------- + * | + * |thread 3 --------------- + * |-----------| ["c"] | + * --------------- + * + */ + public Observable runInMultiThread(Observable observable) { + return Observable.create(observableEmitter -> observable.subscribe(s -> Observable.just(s).subscribeOn(Schedulers.newThread()) + .subscribe(s1 -> observableEmitter.onNext(s1)))); + } + +} diff --git a/Practice5.java b/Practice5.java new file mode 100644 index 0000000..0f84b56 --- /dev/null +++ b/Practice5.java @@ -0,0 +1,188 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return Single.create(singleEmitter -> + source.map(s -> 1) + .scan((acc, value) -> acc + 1) + .lastElement() + .subscribe(integer -> { + singleEmitter.onSuccess((long) integer); + })); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return Observable.create(observableEmitter -> + source.doOnComplete(() -> observableEmitter.onComplete()) + .subscribe(strings -> { + for (String s : strings) { + observableEmitter.onNext(s); + } + })); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return Observable.create(observableEmitter -> { + ArrayList list = new ArrayList(); + source.doOnComplete(() -> { + for (String s : list) { + observableEmitter.onNext(s); + } + observableEmitter.onComplete(); + }).subscribe(s -> { + if (!list.contains(s)) { + list.add(s); + } + }); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return Observable.create(observableEmitter -> { + ArrayList list = new ArrayList(); + source.doOnComplete(() -> { + for (Integer i : list) { + observableEmitter.onNext(i); + } + observableEmitter.onComplete(); + }).subscribe(i -> { + if (conditon.test(i)) { + list.add(i); + } + }); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return Maybe.create(maybeEmitter -> + source.map(s -> new Tuple2<>(0, s)) + .scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())) + .subscribe(tuple2 -> { + if (tuple2.getV1() == index) { + maybeEmitter.onSuccess(tuple2.getV2()); + } + })); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + ArrayList> list = new ArrayList(); + for (int i = 0; i < count; i++) { + list.add(source); + } + return Observable.concat(list); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + AtomicInteger index = new AtomicInteger(); + return Observable.create(observableEmitter -> + doConcat(source,index,observableEmitter)); + } + + private void doConcat(List> source, AtomicInteger index, ObservableEmitter emitter) { + source.get(index.getAndIncrement()) + .doOnComplete(() -> { + if (index.get() >= source.size()) { + emitter.onComplete(); + } else { + doConcat(source, index, emitter); + } + }).subscribe(string -> emitter.onNext(string)); + + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + int size = source.size(); + AtomicInteger index = new AtomicInteger(); + return Observable.create(observableEmitter -> + Observable.fromIterable(source) + .subscribe(stringObservable -> stringObservable.doOnComplete(() -> { + if (index.incrementAndGet() == size) { + observableEmitter.onComplete(); + } + }).subscribe(string -> observableEmitter.onNext(string)))); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return Observable.create(observableEmitter -> + source.doOnComplete(() -> observableEmitter.onComplete()) + .subscribe(s -> { + Thread.sleep(1000); + observableEmitter.onNext(s); + })); + } + +} From b3f71c4d11e8461d185450718e80e229d4dbdd2f Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:19:20 +0800 Subject: [PATCH 02/15] Add files via upload --- .../rxjava/share/practices/Practice1.java | 2 +- .../rxjava/share/practices/Practice2.java | 126 +++++--- .../rxjava/share/practices/Practice3.java | 148 +++++---- .../rxjava/share/practices/Practice4.java | 102 +++--- .../rxjava/share/practices/Practice5.java | 301 +++++++++++------- 5 files changed, 406 insertions(+), 273 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..c8ad00f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -30,6 +30,6 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(s -> new Tuple2<>(1, s)).scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..1061f66 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -1,49 +1,77 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.Map; - -/** - * @author Baoyi Chen - */ -public class Practice2 { - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Baoyi Chen + */ +public class Practice2 { + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + + return Observable.create(emitter -> { + ArrayList> list = new ArrayList(); + words.doOnComplete(() -> { + for (Tuple2 tuple2 : list) { + emitter.onNext(tuple2); + } + emitter.onComplete(); + }).subscribe(s -> { + int value = 1; + for (Tuple2 tuple2 : list) { + if (tuple2.getV1().equals(s)) { + list.remove(tuple2); + value = tuple2.getV2() + 1; + break; + } + } + list.add(new Tuple2<>(s, value)); + }); + + }); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return Single.create(singleEmitter -> { + HashMap map = new HashMap<>(); + words.doOnComplete(() -> singleEmitter.onSuccess(map)) + .subscribe(s -> { + Integer i = map.get(s); + map.put(s, i == null ? 1 : i++); + }); + }); + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..71e73f5 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -1,60 +1,88 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice3 { - - /* - * 根据iterate的结果求和 - */ - public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * 5 - * / \ - * 6 7 - * / \ \ - * 4 3 nil - * - * return Observable[4, 3, 6, 7, 5] 顺序无关 - */ - public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - public static class Node { - public Node left; - public Node right; - public int value; - - public Node(Node left, Node right, int value) { - this.left = left; - this.right = right; - this.value = value; - } - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import io.reactivex.*; +import io.reactivex.functions.Action; +import io.reactivex.functions.BiFunction; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Baoyi Chen + */ +public class Practice3 { + + private Observable nodes(Node left, Node right, Integer value) { + if (left != null && right != null) { + return Observable.just(left, right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else if (left != null) { + return Observable.just(left, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else if (right != null) { + return Observable.just(right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); + } else { + return Observable.just(value); + } + } + + /* + * 根据iterate的结果求和 + */ + public Maybe sum(Observable observable) { + return Maybe.create(maybeEmitter -> + observable + .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) + .scan((integer, integer2) -> integer + integer2) + .lastElement() + .subscribe(value -> maybeEmitter.onSuccess(value)) + ); + } + + /* + * 举例: + * 5 + * / \ + * 6 7 + * / \ \ + * 4 3 nil + * + * return Observable[4, 3, 6, 7, 5] 顺序无关 + */ + public Observable iterate(Observable observable) { + return Observable.create(maybeEmitter -> + observable + .doOnComplete(() -> maybeEmitter.onComplete()) + .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) + .subscribe(value -> maybeEmitter.onNext(value)) + ); + } + + public static class Node { + public Node left; + public Node right; + public int value; + + public Node(Node left, Node right, int value) { + this.left = left; + this.right = right; + this.value = value; + } + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..5b1df15 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -1,50 +1,52 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import io.reactivex.Observable; - - -/** - * @author Baoyi Chen - */ -public class Practice4 { - - /* - * 举例: - * 参数observable = Observable["a", "b", "c"] - * 参数observer在消费observable时,每个元素都在独立的线程 - * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- - * - */ - public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import io.reactivex.*; +import io.reactivex.schedulers.Schedulers; + + +/** + * @author Baoyi Chen + */ +public class Practice4 { + + /* + * 举例: + * 参数observable = Observable["a", "b", "c"] + * 参数observer在消费observable时,每个元素都在独立的线程 + * + * thread 1 --------------- + * |-----------| ["a"] | + * | --------------- + * | + * ------------------------- ---------- |thread 2 --------------- + * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | + * ------------------------- ---------- | --------------- + * | + * |thread 3 --------------- + * |-----------| ["c"] | + * --------------- + * + */ + public Observable runInMultiThread(Observable observable) { + return Observable.create(observableEmitter -> observable.subscribe(s -> Observable.just(s).subscribeOn(Schedulers.newThread()) + .subscribe(s1 -> observableEmitter.onNext(s1)))); + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..0f84b56 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -1,113 +1,188 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return Single.create(singleEmitter -> + source.map(s -> 1) + .scan((acc, value) -> acc + 1) + .lastElement() + .subscribe(integer -> { + singleEmitter.onSuccess((long) integer); + })); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return Observable.create(observableEmitter -> + source.doOnComplete(() -> observableEmitter.onComplete()) + .subscribe(strings -> { + for (String s : strings) { + observableEmitter.onNext(s); + } + })); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return Observable.create(observableEmitter -> { + ArrayList list = new ArrayList(); + source.doOnComplete(() -> { + for (String s : list) { + observableEmitter.onNext(s); + } + observableEmitter.onComplete(); + }).subscribe(s -> { + if (!list.contains(s)) { + list.add(s); + } + }); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return Observable.create(observableEmitter -> { + ArrayList list = new ArrayList(); + source.doOnComplete(() -> { + for (Integer i : list) { + observableEmitter.onNext(i); + } + observableEmitter.onComplete(); + }).subscribe(i -> { + if (conditon.test(i)) { + list.add(i); + } + }); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return Maybe.create(maybeEmitter -> + source.map(s -> new Tuple2<>(0, s)) + .scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())) + .subscribe(tuple2 -> { + if (tuple2.getV1() == index) { + maybeEmitter.onSuccess(tuple2.getV2()); + } + })); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + ArrayList> list = new ArrayList(); + for (int i = 0; i < count; i++) { + list.add(source); + } + return Observable.concat(list); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + AtomicInteger index = new AtomicInteger(); + return Observable.create(observableEmitter -> + doConcat(source,index,observableEmitter)); + } + + private void doConcat(List> source, AtomicInteger index, ObservableEmitter emitter) { + source.get(index.getAndIncrement()) + .doOnComplete(() -> { + if (index.get() >= source.size()) { + emitter.onComplete(); + } else { + doConcat(source, index, emitter); + } + }).subscribe(string -> emitter.onNext(string)); + + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + int size = source.size(); + AtomicInteger index = new AtomicInteger(); + return Observable.create(observableEmitter -> + Observable.fromIterable(source) + .subscribe(stringObservable -> stringObservable.doOnComplete(() -> { + if (index.incrementAndGet() == size) { + observableEmitter.onComplete(); + } + }).subscribe(string -> observableEmitter.onNext(string)))); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return Observable.create(observableEmitter -> + source.doOnComplete(() -> observableEmitter.onComplete()) + .subscribe(s -> { + Thread.sleep(1000); + observableEmitter.onNext(s); + })); + } + +} From 6242ec249a1ffb4469e240fab05b03d4cddfa406 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:21:08 +0800 Subject: [PATCH 03/15] Delete Practice1.java --- Practice1.java | 35 ----------------------------------- 1 file changed, 35 deletions(-) delete mode 100644 Practice1.java diff --git a/Practice1.java b/Practice1.java deleted file mode 100644 index c8ad00f..0000000 --- a/Practice1.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice1 { - - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - return observable.map(s -> new Tuple2<>(1, s)).scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())); - } -} From b27a66b325eeb45d3a67d38ca3c475be0674d6e2 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:21:18 +0800 Subject: [PATCH 04/15] Delete Practice2.java --- Practice2.java | 77 -------------------------------------------------- 1 file changed, 77 deletions(-) delete mode 100644 Practice2.java diff --git a/Practice2.java b/Practice2.java deleted file mode 100644 index 1061f66..0000000 --- a/Practice2.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.*; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -/** - * @author Baoyi Chen - */ -public class Practice2 { - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - - return Observable.create(emitter -> { - ArrayList> list = new ArrayList(); - words.doOnComplete(() -> { - for (Tuple2 tuple2 : list) { - emitter.onNext(tuple2); - } - emitter.onComplete(); - }).subscribe(s -> { - int value = 1; - for (Tuple2 tuple2 : list) { - if (tuple2.getV1().equals(s)) { - list.remove(tuple2); - value = tuple2.getV2() + 1; - break; - } - } - list.add(new Tuple2<>(s, value)); - }); - - }); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - return Single.create(singleEmitter -> { - HashMap map = new HashMap<>(); - words.doOnComplete(() -> singleEmitter.onSuccess(map)) - .subscribe(s -> { - Integer i = map.get(s); - map.put(s, i == null ? 1 : i++); - }); - }); - } - -} From a4f5a10e946833f725f3064351832baf1f38a499 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:21:27 +0800 Subject: [PATCH 05/15] Delete Practice3.java --- Practice3.java | 88 -------------------------------------------------- 1 file changed, 88 deletions(-) delete mode 100644 Practice3.java diff --git a/Practice3.java b/Practice3.java deleted file mode 100644 index 71e73f5..0000000 --- a/Practice3.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.*; -import io.reactivex.functions.Action; -import io.reactivex.functions.BiFunction; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author Baoyi Chen - */ -public class Practice3 { - - private Observable nodes(Node left, Node right, Integer value) { - if (left != null && right != null) { - return Observable.just(left, right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else if (left != null) { - return Observable.just(left, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else if (right != null) { - return Observable.just(right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else { - return Observable.just(value); - } - } - - /* - * 根据iterate的结果求和 - */ - public Maybe sum(Observable observable) { - return Maybe.create(maybeEmitter -> - observable - .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) - .scan((integer, integer2) -> integer + integer2) - .lastElement() - .subscribe(value -> maybeEmitter.onSuccess(value)) - ); - } - - /* - * 举例: - * 5 - * / \ - * 6 7 - * / \ \ - * 4 3 nil - * - * return Observable[4, 3, 6, 7, 5] 顺序无关 - */ - public Observable iterate(Observable observable) { - return Observable.create(maybeEmitter -> - observable - .doOnComplete(() -> maybeEmitter.onComplete()) - .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) - .subscribe(value -> maybeEmitter.onNext(value)) - ); - } - - public static class Node { - public Node left; - public Node right; - public int value; - - public Node(Node left, Node right, int value) { - this.left = left; - this.right = right; - this.value = value; - } - } - -} From 26d4d1ddeb108803f9ce7d056d3fd2a8d48b3a10 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:21:35 +0800 Subject: [PATCH 06/15] Delete Practice4.java --- Practice4.java | 52 -------------------------------------------------- 1 file changed, 52 deletions(-) delete mode 100644 Practice4.java diff --git a/Practice4.java b/Practice4.java deleted file mode 100644 index 5b1df15..0000000 --- a/Practice4.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import io.reactivex.*; -import io.reactivex.schedulers.Schedulers; - - -/** - * @author Baoyi Chen - */ -public class Practice4 { - - /* - * 举例: - * 参数observable = Observable["a", "b", "c"] - * 参数observer在消费observable时,每个元素都在独立的线程 - * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- - * - */ - public Observable runInMultiThread(Observable observable) { - return Observable.create(observableEmitter -> observable.subscribe(s -> Observable.just(s).subscribeOn(Schedulers.newThread()) - .subscribe(s1 -> observableEmitter.onNext(s1)))); - } - -} From bebb33332949bcb1f4c9fdbb0d4370c4e96d9e7e Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:21:45 +0800 Subject: [PATCH 07/15] Delete Practice5.java --- Practice5.java | 188 ------------------------------------------------- 1 file changed, 188 deletions(-) delete mode 100644 Practice5.java diff --git a/Practice5.java b/Practice5.java deleted file mode 100644 index 0f84b56..0000000 --- a/Practice5.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.*; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - return Single.create(singleEmitter -> - source.map(s -> 1) - .scan((acc, value) -> acc + 1) - .lastElement() - .subscribe(integer -> { - singleEmitter.onSuccess((long) integer); - })); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - return Observable.create(observableEmitter -> - source.doOnComplete(() -> observableEmitter.onComplete()) - .subscribe(strings -> { - for (String s : strings) { - observableEmitter.onNext(s); - } - })); - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - return Observable.create(observableEmitter -> { - ArrayList list = new ArrayList(); - source.doOnComplete(() -> { - for (String s : list) { - observableEmitter.onNext(s); - } - observableEmitter.onComplete(); - }).subscribe(s -> { - if (!list.contains(s)) { - list.add(s); - } - }); - }); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - return Observable.create(observableEmitter -> { - ArrayList list = new ArrayList(); - source.doOnComplete(() -> { - for (Integer i : list) { - observableEmitter.onNext(i); - } - observableEmitter.onComplete(); - }).subscribe(i -> { - if (conditon.test(i)) { - list.add(i); - } - }); - }); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - return Maybe.create(maybeEmitter -> - source.map(s -> new Tuple2<>(0, s)) - .scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())) - .subscribe(tuple2 -> { - if (tuple2.getV1() == index) { - maybeEmitter.onSuccess(tuple2.getV2()); - } - })); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - ArrayList> list = new ArrayList(); - for (int i = 0; i < count; i++) { - list.add(source); - } - return Observable.concat(list); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - AtomicInteger index = new AtomicInteger(); - return Observable.create(observableEmitter -> - doConcat(source,index,observableEmitter)); - } - - private void doConcat(List> source, AtomicInteger index, ObservableEmitter emitter) { - source.get(index.getAndIncrement()) - .doOnComplete(() -> { - if (index.get() >= source.size()) { - emitter.onComplete(); - } else { - doConcat(source, index, emitter); - } - }).subscribe(string -> emitter.onNext(string)); - - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - int size = source.size(); - AtomicInteger index = new AtomicInteger(); - return Observable.create(observableEmitter -> - Observable.fromIterable(source) - .subscribe(stringObservable -> stringObservable.doOnComplete(() -> { - if (index.incrementAndGet() == size) { - observableEmitter.onComplete(); - } - }).subscribe(string -> observableEmitter.onNext(string)))); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - return Observable.create(observableEmitter -> - source.doOnComplete(() -> observableEmitter.onComplete()) - .subscribe(s -> { - Thread.sleep(1000); - observableEmitter.onNext(s); - })); - } - -} From 183c30b449db25c57661b8250c236b7c1c853aac Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Fri, 23 Mar 2018 16:24:47 +0800 Subject: [PATCH 08/15] Update Practice3Test.java --- .../rxjava/share/practices/Practice3Test.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java b/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java index 5de56bb..c467ad6 100644 --- a/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java +++ b/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java @@ -1,8 +1,14 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Lists; import io.reactivex.Observable; import org.junit.Test; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; + /** * @author Baoyi Chen */ @@ -13,4 +19,15 @@ public void test() { Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); new Practice3().sum(Observable.just(n)).test().assertResult(33); } -} \ No newline at end of file + + @Test + public void test1() { + List list = Lists.of(4,3,5,6,7,8); + Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); + List list1 = new Practice3().iterate(Observable.just(n)).toList().blockingGet(); + assertEquals(list.size(), list1.size()); + for (Integer i : list) { + assertTrue(list1.contains(i)); + } + } +} From f5b440ebd488ecb28eeb985f1c0b78da2cd5521e Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:55:01 +0800 Subject: [PATCH 09/15] Add files via upload --- .../rxjava/share/practices/Practice1Test.java | 18 +++++ .../rxjava/share/practices/Practice2Test.java | 46 ++++++++++++ .../rxjava/share/practices/Practice3Test.java | 33 ++++++++ .../rxjava/share/practices/Practice4Test.java | 29 +++++++ .../rxjava/share/practices/Practice5Test.java | 75 +++++++++++++++++++ 5 files changed, 201 insertions(+) create mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java create mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java create mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java create mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java create mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java new file mode 100644 index 0000000..bfe09df --- /dev/null +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java @@ -0,0 +1,18 @@ +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Tuples; +import io.reactivex.Observable; +import org.junit.Test; + + +/** + * @author Baoyi Chen + */ +public class Practice1Test { + + @Test + public void test() { + new Practice1().indexable(Observable.just("a", "b", "c")).test().assertResult(Tuples.of(1, "a"), Tuples.of(2, "b"), Tuples.of(3, "c")); + } + +} \ No newline at end of file diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java new file mode 100644 index 0000000..e8955ee --- /dev/null +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java @@ -0,0 +1,46 @@ +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Tuples; +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Baoyi Chen + */ +public class Practice2Test { + + @Test + public void test1() { + List> values = new ArrayList<>(); + values.add(Tuples.of("a", 2)); + values.add(Tuples.of("b", 1)); + values.add(Tuples.of("c", 2)); + List> list = new Practice2().wordCount1(Observable.just("a", "a", "b", "c", "c")).toList().blockingGet(); + assertEquals(3, list.size()); + for (Tuple2 tuple2 : list) { + assertTrue(values.contains(tuple2)); + } + } + + @Test + public void test2() { + Map map = new Practice2().wordCount2(Observable.just("a", "a", "b", "c", "c")).blockingGet(); + Map values = new HashMap<>(); + values.put("a", 2); + values.put("b", 1); + values.put("c", 2); + assertEquals(3, map.size()); + for (Map.Entry entry : map.entrySet()) { + assertTrue(values.containsKey(entry.getKey())); + } + } +} \ No newline at end of file diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java new file mode 100644 index 0000000..0e92033 --- /dev/null +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java @@ -0,0 +1,33 @@ +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Lists; +import io.reactivex.Observable; +import org.junit.Test; + +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; + +/** + * @author Baoyi Chen + */ +public class Practice3Test { + + @Test + public void test() { + Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); + new Practice3().sum(Observable.just(n)).test().assertResult(33); + } + + @Test + public void test1() { + List list = Lists.of(4,3,5,6,7,8); + Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); + List list1 = new Practice3().iterate(Observable.just(n)).toList().blockingGet(); + assertEquals(list.size(), list1.size()); + for (Integer i : list) { + assertTrue(list1.contains(i)); + } + } +} \ No newline at end of file diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java new file mode 100644 index 0000000..d1fe4c8 --- /dev/null +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java @@ -0,0 +1,29 @@ +package cn.nextop.rxjava.share.practices; + +import io.reactivex.Observable; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; + +/** + * @author Baoyi Chen + */ +public class Practice4Test { + + @Test + public void test() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(3); + Map map = new ConcurrentHashMap<>(); + new Practice4().runInMultiThread(Observable.just("a", "b", "c")).map(e -> Thread.currentThread().toString()).subscribe(e -> { + map.put(e, true); + latch.countDown(); + }); + latch.await(5, TimeUnit.SECONDS); + assertTrue(map.size() >= 2); + } +} \ No newline at end of file diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java new file mode 100644 index 0000000..1a79300 --- /dev/null +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java @@ -0,0 +1,75 @@ +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Lists; +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; + +/** + * @author Baoyi Chen + */ +public class Practice5Test { + + @Test + public void count() { + new Practice5().count(Observable.just("a", "b", "c")).test().assertResult(3L); + } + + @Test + public void convert() { + new Practice5().convert(Observable.just(Lists.of("a", "b", "c"))).test().assertResult("a", "b", "c"); + } + + @Test + public void distinct() { + new Practice5().distinct(Observable.just("a", "b", "b")).test().assertResult("a", "b"); + } + + @Test + public void filter() { + new Practice5().filter(Observable.just(1, 2, 3, 4, 5), x -> x > 2 && x < 5).test().assertResult(3, 4); + } + + @Test + public void elementAt() { + new Practice5().elementAt(Observable.just("1", "2", "3", "4", "5"), 2).test().assertResult("3"); + } + + @Test + public void repeat() { + new Practice5().repeat(Observable.just("0", "1"), 5).test().assertResult("0", "1", "0", "1", "0", "1", "0", "1", "0", "1"); + } + + @Test + public void concat() { + List list = new Practice5().concat(Lists.of(Observable.just("a").delay(1, TimeUnit.SECONDS), Observable.just("c").delay(1, TimeUnit.MILLISECONDS))).subscribeOn(Schedulers.newThread()).toList().blockingGet(); + assertEquals(2, list.size()); + assertEquals("a", list.get(0)); + assertEquals("c", list.get(1)); + } + + @Test + public void merge() { + List list = new Practice5().merge(Lists.of(Observable.just("a").delay(1, TimeUnit.SECONDS), Observable.just("c").delay(1, TimeUnit.MILLISECONDS))).subscribeOn(Schedulers.newThread()).toList().blockingGet(); + assertEquals(2, list.size()); + assertEquals("c", list.get(0)); + assertEquals("a", list.get(1)); + } + + @Test + public void delayAll() { + long st = System.currentTimeMillis(); + List list = new Practice5().delayAll(Observable.just("a", "b", "c"), 1, TimeUnit.SECONDS).toList().blockingGet(); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + assertTrue(System.currentTimeMillis() - st >= 3000); + } +} \ No newline at end of file From 68a17c5f4f9554305a1f87cdb18214daacc132fd Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:55:38 +0800 Subject: [PATCH 10/15] Delete Practice1Test.java --- .../rxjava/share/practices/Practice1Test.java | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java deleted file mode 100644 index bfe09df..0000000 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1Test.java +++ /dev/null @@ -1,18 +0,0 @@ -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.Tuples; -import io.reactivex.Observable; -import org.junit.Test; - - -/** - * @author Baoyi Chen - */ -public class Practice1Test { - - @Test - public void test() { - new Practice1().indexable(Observable.just("a", "b", "c")).test().assertResult(Tuples.of(1, "a"), Tuples.of(2, "b"), Tuples.of(3, "c")); - } - -} \ No newline at end of file From b1c9f5d54d6530543ed8b442201aa6efbc7502af Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:55:46 +0800 Subject: [PATCH 11/15] Delete Practice2Test.java --- .../rxjava/share/practices/Practice2Test.java | 46 ------------------- 1 file changed, 46 deletions(-) delete mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java deleted file mode 100644 index e8955ee..0000000 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2Test.java +++ /dev/null @@ -1,46 +0,0 @@ -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.Tuples; -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * @author Baoyi Chen - */ -public class Practice2Test { - - @Test - public void test1() { - List> values = new ArrayList<>(); - values.add(Tuples.of("a", 2)); - values.add(Tuples.of("b", 1)); - values.add(Tuples.of("c", 2)); - List> list = new Practice2().wordCount1(Observable.just("a", "a", "b", "c", "c")).toList().blockingGet(); - assertEquals(3, list.size()); - for (Tuple2 tuple2 : list) { - assertTrue(values.contains(tuple2)); - } - } - - @Test - public void test2() { - Map map = new Practice2().wordCount2(Observable.just("a", "a", "b", "c", "c")).blockingGet(); - Map values = new HashMap<>(); - values.put("a", 2); - values.put("b", 1); - values.put("c", 2); - assertEquals(3, map.size()); - for (Map.Entry entry : map.entrySet()) { - assertTrue(values.containsKey(entry.getKey())); - } - } -} \ No newline at end of file From 52e2788cd1117c9ffc718762e3c01e0ea34c2c54 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:55:55 +0800 Subject: [PATCH 12/15] Delete Practice3Test.java --- .../rxjava/share/practices/Practice3Test.java | 33 ------------------- 1 file changed, 33 deletions(-) delete mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java deleted file mode 100644 index 0e92033..0000000 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3Test.java +++ /dev/null @@ -1,33 +0,0 @@ -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.Lists; -import io.reactivex.Observable; -import org.junit.Test; - -import java.util.List; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; - -/** - * @author Baoyi Chen - */ -public class Practice3Test { - - @Test - public void test() { - Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); - new Practice3().sum(Observable.just(n)).test().assertResult(33); - } - - @Test - public void test1() { - List list = Lists.of(4,3,5,6,7,8); - Practice3.Node n = new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 4), null, 3), new Practice3.Node(new Practice3.Node(new Practice3.Node(null, null, 8), null, 7), null, 6), 5); - List list1 = new Practice3().iterate(Observable.just(n)).toList().blockingGet(); - assertEquals(list.size(), list1.size()); - for (Integer i : list) { - assertTrue(list1.contains(i)); - } - } -} \ No newline at end of file From 3c71067aed908ca97d0aafaf776ce7c74c9aaeb8 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:56:04 +0800 Subject: [PATCH 13/15] Delete Practice4Test.java --- .../rxjava/share/practices/Practice4Test.java | 29 ------------------- 1 file changed, 29 deletions(-) delete mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java deleted file mode 100644 index d1fe4c8..0000000 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4Test.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Observable; -import org.junit.Test; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static junit.framework.TestCase.assertTrue; - -/** - * @author Baoyi Chen - */ -public class Practice4Test { - - @Test - public void test() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(3); - Map map = new ConcurrentHashMap<>(); - new Practice4().runInMultiThread(Observable.just("a", "b", "c")).map(e -> Thread.currentThread().toString()).subscribe(e -> { - map.put(e, true); - latch.countDown(); - }); - latch.await(5, TimeUnit.SECONDS); - assertTrue(map.size() >= 2); - } -} \ No newline at end of file From cf3edec51f77becd37c9c0ffc32fa640e68b9590 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:56:12 +0800 Subject: [PATCH 14/15] Delete Practice5Test.java --- .../rxjava/share/practices/Practice5Test.java | 75 ------------------- 1 file changed, 75 deletions(-) delete mode 100644 src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java deleted file mode 100644 index 1a79300..0000000 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5Test.java +++ /dev/null @@ -1,75 +0,0 @@ -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.Lists; -import io.reactivex.Observable; -import io.reactivex.schedulers.Schedulers; -import org.junit.Test; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; - -/** - * @author Baoyi Chen - */ -public class Practice5Test { - - @Test - public void count() { - new Practice5().count(Observable.just("a", "b", "c")).test().assertResult(3L); - } - - @Test - public void convert() { - new Practice5().convert(Observable.just(Lists.of("a", "b", "c"))).test().assertResult("a", "b", "c"); - } - - @Test - public void distinct() { - new Practice5().distinct(Observable.just("a", "b", "b")).test().assertResult("a", "b"); - } - - @Test - public void filter() { - new Practice5().filter(Observable.just(1, 2, 3, 4, 5), x -> x > 2 && x < 5).test().assertResult(3, 4); - } - - @Test - public void elementAt() { - new Practice5().elementAt(Observable.just("1", "2", "3", "4", "5"), 2).test().assertResult("3"); - } - - @Test - public void repeat() { - new Practice5().repeat(Observable.just("0", "1"), 5).test().assertResult("0", "1", "0", "1", "0", "1", "0", "1", "0", "1"); - } - - @Test - public void concat() { - List list = new Practice5().concat(Lists.of(Observable.just("a").delay(1, TimeUnit.SECONDS), Observable.just("c").delay(1, TimeUnit.MILLISECONDS))).subscribeOn(Schedulers.newThread()).toList().blockingGet(); - assertEquals(2, list.size()); - assertEquals("a", list.get(0)); - assertEquals("c", list.get(1)); - } - - @Test - public void merge() { - List list = new Practice5().merge(Lists.of(Observable.just("a").delay(1, TimeUnit.SECONDS), Observable.just("c").delay(1, TimeUnit.MILLISECONDS))).subscribeOn(Schedulers.newThread()).toList().blockingGet(); - assertEquals(2, list.size()); - assertEquals("c", list.get(0)); - assertEquals("a", list.get(1)); - } - - @Test - public void delayAll() { - long st = System.currentTimeMillis(); - List list = new Practice5().delayAll(Observable.just("a", "b", "c"), 1, TimeUnit.SECONDS).toList().blockingGet(); - assertEquals(3, list.size()); - assertEquals("a", list.get(0)); - assertEquals("b", list.get(1)); - assertEquals("c", list.get(2)); - assertTrue(System.currentTimeMillis() - st >= 3000); - } -} \ No newline at end of file From bf6ecc77ccf9984088ebaa1ee04c9a4bff68ea18 Mon Sep 17 00:00:00 2001 From: tonyj2me <35590733@qq.com> Date: Mon, 26 Mar 2018 12:56:42 +0800 Subject: [PATCH 15/15] Add files via upload --- .../rxjava/share/practices/Practice1.java | 71 ++++++------ .../rxjava/share/practices/Practice2.java | 34 +----- .../rxjava/share/practices/Practice3.java | 38 ++----- .../rxjava/share/practices/Practice4.java | 3 +- .../rxjava/share/practices/Practice5.java | 103 ++++-------------- 5 files changed, 68 insertions(+), 181 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index c8ad00f..1f2c82c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -1,35 +1,36 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice1 { - - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - return observable.map(s -> new Tuple2<>(1, s)).scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Tuples; +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice1 { + + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return Observable.zip(observable, Observable.range(1, Integer.MAX_VALUE), (a, b) -> Tuples.of(b, a)); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 1061f66..099039d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,10 +17,10 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.*; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -35,27 +35,7 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - - return Observable.create(emitter -> { - ArrayList> list = new ArrayList(); - words.doOnComplete(() -> { - for (Tuple2 tuple2 : list) { - emitter.onNext(tuple2); - } - emitter.onComplete(); - }).subscribe(s -> { - int value = 1; - for (Tuple2 tuple2 : list) { - if (tuple2.getV1().equals(s)) { - list.remove(tuple2); - value = tuple2.getV2() + 1; - break; - } - } - list.add(new Tuple2<>(s, value)); - }); - - }); + return words.groupBy(string -> string).flatMap(observable-> observable.count().toObservable().map(count-> Tuples.of(observable.getKey(),count.intValue()))); } /* @@ -64,13 +44,9 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - return Single.create(singleEmitter -> { - HashMap map = new HashMap<>(); - words.doOnComplete(() -> singleEmitter.onSuccess(map)) - .subscribe(s -> { - Integer i = map.get(s); - map.put(s, i == null ? 1 : i++); - }); + return words.reduce(new HashMap<>(), (map, s) -> { + map.put(s, map.get(s) == null ? 1 : map.get(s) +1); + return map; }); } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index 71e73f5..03f93e0 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -17,41 +17,17 @@ package cn.nextop.rxjava.share.practices; import io.reactivex.*; -import io.reactivex.functions.Action; -import io.reactivex.functions.BiFunction; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; - -import java.util.concurrent.atomic.AtomicInteger; /** * @author Baoyi Chen */ public class Practice3 { - private Observable nodes(Node left, Node right, Integer value) { - if (left != null && right != null) { - return Observable.just(left, right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else if (left != null) { - return Observable.just(left, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else if (right != null) { - return Observable.just(right, new Node(null, null, value)).flatMap((Function>) node1 -> nodes(node1.left, node1.right, node1.value)); - } else { - return Observable.just(value); - } - } - /* * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - return Maybe.create(maybeEmitter -> - observable - .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) - .scan((integer, integer2) -> integer + integer2) - .lastElement() - .subscribe(value -> maybeEmitter.onSuccess(value)) - ); + return iterate(observable).reduce((x, y) -> x + y); } /* @@ -65,12 +41,12 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - return Observable.create(maybeEmitter -> - observable - .doOnComplete(() -> maybeEmitter.onComplete()) - .flatMap((Function>) node -> nodes(node.left, node.right, node.value)) - .subscribe(value -> maybeEmitter.onNext(value)) - ); + return observable.flatMap(e -> { + Observable left = e.left == null ? Observable.empty() : iterate(Observable.just(e.left)); + Observable right = e.right == null ? Observable.empty() : iterate(Observable.just(e.right)); + Observable value = Observable.just(e.value); + return Observable.merge(left, right, value); + }); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 5b1df15..20a1f5a 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -45,8 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - return Observable.create(observableEmitter -> observable.subscribe(s -> Observable.just(s).subscribeOn(Schedulers.newThread()) - .subscribe(s1 -> observableEmitter.onNext(s1)))); + return observable.flatMap(o-> Observable.just(o).subscribeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 0f84b56..d1c076c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,13 +16,11 @@ package cn.nextop.rxjava.share.practices; -import cn.nextop.rxjava.share.util.type.Tuple2; +import cn.nextop.rxjava.share.util.Tuples; import io.reactivex.*; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** @@ -36,13 +34,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - return Single.create(singleEmitter -> - source.map(s -> 1) - .scan((acc, value) -> acc + 1) - .lastElement() - .subscribe(integer -> { - singleEmitter.onSuccess((long) integer); - })); + return source.reduce(0l, (a, b) -> a + 1); } /* @@ -51,13 +43,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - return Observable.create(observableEmitter -> - source.doOnComplete(() -> observableEmitter.onComplete()) - .subscribe(strings -> { - for (String s : strings) { - observableEmitter.onNext(s); - } - })); + return source.concatMap(e -> Observable.fromIterable(e)); } /* @@ -66,19 +52,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return Observable.create(observableEmitter -> { - ArrayList list = new ArrayList(); - source.doOnComplete(() -> { - for (String s : list) { - observableEmitter.onNext(s); - } - observableEmitter.onComplete(); - }).subscribe(s -> { - if (!list.contains(s)) { - list.add(s); - } - }); - }); + return source.groupBy(e -> e).map(e -> e.getKey()); } /* @@ -87,19 +61,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - return Observable.create(observableEmitter -> { - ArrayList list = new ArrayList(); - source.doOnComplete(() -> { - for (Integer i : list) { - observableEmitter.onNext(i); - } - observableEmitter.onComplete(); - }).subscribe(i -> { - if (conditon.test(i)) { - list.add(i); - } - }); - }); + return source.flatMap(e -> conditon.test(e) ? Observable.just(e) : Observable.empty()); } /* @@ -108,14 +70,8 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - return Maybe.create(maybeEmitter -> - source.map(s -> new Tuple2<>(0, s)) - .scan((acc, value) -> new Tuple2<>(acc.getV1() + 1, value.getV2())) - .subscribe(tuple2 -> { - if (tuple2.getV1() == index) { - maybeEmitter.onSuccess(tuple2.getV2()); - } - })); + return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)) + .filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); } /* @@ -124,11 +80,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - ArrayList> list = new ArrayList(); - for (int i = 0; i < count; i++) { - list.add(source); - } - return Observable.concat(list); + return Observable.range(1, count).flatMap(e -> source); } /* @@ -137,21 +89,17 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - AtomicInteger index = new AtomicInteger(); - return Observable.create(observableEmitter -> - doConcat(source,index,observableEmitter)); + return Observable.create(observableEmitter -> doConcat(source, observableEmitter)); } - private void doConcat(List> source, AtomicInteger index, ObservableEmitter emitter) { - source.get(index.getAndIncrement()) - .doOnComplete(() -> { - if (index.get() >= source.size()) { - emitter.onComplete(); - } else { - doConcat(source, index, emitter); - } - }).subscribe(string -> emitter.onNext(string)); - + private void doConcat(List> source, ObservableEmitter emitter) { + if (source.isEmpty()) { + emitter.onComplete(); + } else { + source.get(0) + .doOnComplete(() -> doConcat(source.subList(1, source.size()), emitter)) + .subscribe(string -> emitter.onNext(string)); + } } /* @@ -160,15 +108,7 @@ private void doConcat(List> source, AtomicInteger index, Obse * return: Observable["a", "b"] */ public Observable merge(List> source) { - int size = source.size(); - AtomicInteger index = new AtomicInteger(); - return Observable.create(observableEmitter -> - Observable.fromIterable(source) - .subscribe(stringObservable -> stringObservable.doOnComplete(() -> { - if (index.incrementAndGet() == size) { - observableEmitter.onComplete(); - } - }).subscribe(string -> observableEmitter.onNext(string)))); + return Observable.fromIterable(source).flatMap(e -> e); } /* @@ -177,12 +117,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - return Observable.create(observableEmitter -> - source.doOnComplete(() -> observableEmitter.onComplete()) - .subscribe(s -> { - Thread.sleep(1000); - observableEmitter.onNext(s); - })); + return source.concatMap(e -> Observable.just(e).delay(delay, unit)); } }