From 364fe1efe45166ea8d998ce6442b1ef0296868b2 Mon Sep 17 00:00:00 2001 From: zhouxiaoyu Date: Fri, 23 Mar 2018 12:15:48 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=91=A8=E6=95=88=E5=AE=87=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rxjava/share/practices/Practice1.java | 7 ++++-- .../rxjava/share/practices/Practice2.java | 19 +++++++++++++-- .../rxjava/share/practices/Practice3.java | 21 +++++++++++++--- .../rxjava/share/practices/Practice4.java | 7 ++++-- .../rxjava/share/practices/Practice5.java | 24 ++++++++++++------- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..6da344a 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -23,13 +23,16 @@ * @author Baoyi Chen */ public class Practice1 { - + /* * 举例如下: * 参数 Observable["a","b","c"] * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + Observable obInteger = Observable.just(1,2,3); + return Observable.zip(obInteger ,observable ,(index, val) -> { + return new Tuple2(index, val); + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..ff3adba 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -22,6 +22,7 @@ import io.reactivex.Single; import java.util.Map; +import java.util.HashMap; /** * @author Baoyi Chen @@ -34,7 +35,14 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + Observable distring = words.distinct(); + return distring.map(e -> { + Integer t = words.filter(s -> { + return e == s; + }).count().blockingGet().intValue(); + return new Tuple2(e,t); + }); + } /* @@ -43,7 +51,14 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduce(new HashMap(),(initial,str2) -> { + if(!initial.containsKey(str2)) { + initial.put(str2, 1); + } else { + initial.put(str2, initial.get(str2) + 1); + } + return initial; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..0569d70 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -18,7 +18,7 @@ import io.reactivex.Maybe; import io.reactivex.Observable; - +import java.util.*; /** * @author Baoyi Chen */ @@ -28,7 +28,9 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((x,y) -> { + return x + y; + }); } /* @@ -42,7 +44,20 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(e1 -> { + Stack nodes = new Stack(); + Observable obs = Observable.empty(); + List> integerList = new ArrayList>(); + Node currentNode = e1; + nodes.push(currentNode); + while(!nodes.isEmpty()) { + currentNode = nodes.pop(); + if(currentNode.left != null) { nodes.push(currentNode.left); } + if(currentNode.right != null) { nodes.push(currentNode.right);} + integerList.add(Observable.just(currentNode.value)); + } + return Observable.concat(integerList); + }).concatMap(e -> e); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..4b6735f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,7 +18,7 @@ import io.reactivex.Observable; - +import io.reactivex.schedulers.Schedulers; /** * @author Baoyi Chen @@ -44,7 +44,10 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(s -> { + return Observable.just(s).observeOn(Schedulers.io()); + }).concatMap(e -> e); + } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..45e3cf4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -18,6 +18,7 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import io.reactivex.ObservableOperator; import io.reactivex.Single; import java.util.List; @@ -35,7 +36,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.count(); } /* @@ -44,7 +45,8 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + Observable> ob = source.map(e -> Observable.fromIterable(e)); + return ob.concatMap(e -> e); } /* @@ -53,7 +55,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.distinct(); } /* @@ -62,7 +64,9 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> { + if(conditon.test(e)) return Observable.just(e); else return Observable.empty(); + }).concatMap(e -> e); } /* @@ -71,7 +75,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.elementAt(index); } /* @@ -80,7 +84,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return source.repeat(count); } /* @@ -89,7 +93,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.concat(source); } /* @@ -98,7 +102,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.merge(source); } /* @@ -107,7 +111,9 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> { + return Observable.just(e).delay(delay,unit); + }).concatMap(e -> e); } } From ac3eedd17243e95d7a60c0ddd27646da5c9d723a Mon Sep 17 00:00:00 2001 From: zhouxiaoyu Date: Fri, 23 Mar 2018 15:08:38 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=91=A8=E6=95=88=E5=AE=87=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rxjava/share/practices/Practice1.java | 5 ++++- .../rxjava/share/practices/Practice5.java | 20 +++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index 6da344a..cc6eab5 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -30,7 +30,10 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - Observable obInteger = Observable.just(1,2,3); + + Observable obInteger = observable.scan(1, (initial,x) -> { + return initial + 1 ; + }); return Observable.zip(obInteger ,observable ,(index, val) -> { return new Tuple2(index, val); }); diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 45e3cf4..8263029 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.HashMap; /** * @author Baoyi Chen @@ -36,7 +37,9 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - return source.count(); + return source.reduce(new Long(0), (initial,str1) -> { + return initial + 1; + }); } /* @@ -55,7 +58,10 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return source.distinct(); + return source.groupBy(e -> { + return e; + }).map(e -> e.getKey()); + } /* @@ -75,7 +81,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - return source.elementAt(index); + return source.skip(index).take(1).firstElement(); } /* @@ -84,7 +90,9 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - return source.repeat(count); + return Observable.range(0, count).map(e -> { + return source; + }).concatMap(e -> e); } /* @@ -93,7 +101,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - return Observable.concat(source); + return Observable.fromIterable(source).concatMap(e -> e); } /* @@ -102,7 +110,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - return Observable.merge(source); + return Observable.fromIterable(source).flatMap(e->e); } /*