From 57c5cc613aa0af6b7c6142ac2173555a9e230bef Mon Sep 17 00:00:00 2001 From: dane805 Date: Sun, 23 Jul 2023 02:05:56 +0900 Subject: [PATCH] =?UTF-8?q?12,=2016=EC=A3=BC=EC=B0=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- presentations/week12/Chapter 12.ipynb | 425 ++++++++++++++++++++++++++ presentations/week16/chapter16.ipynb | 326 ++++++++++++++++++++ 2 files changed, 751 insertions(+) create mode 100644 presentations/week12/Chapter 12.ipynb create mode 100644 presentations/week16/chapter16.ipynb diff --git a/presentations/week12/Chapter 12.ipynb b/presentations/week12/Chapter 12.ipynb new file mode 100644 index 0000000..7279f03 --- /dev/null +++ b/presentations/week12/Chapter 12.ipynb @@ -0,0 +1,425 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "6f537743", + "metadata": {}, + "source": [ + "# Chapter 12. Resilient Distributed Datasets(RDDs)\n", + "- 앞에까지는 스파크의 구조적 API를 보았고, 보통은 구조적 API의 사용을 권하지만\n", + "- 때때로 비즈니스/엔지니어링 문제가 해결되지 않을 수 있고, 이때 스파크의 하위 수준 API인 RDD를 이용할 수 있다" + ] + }, + { + "cell_type": "markdown", + "id": "427f1f69", + "metadata": {}, + "source": [ + "## 하위 수준 API(Low-Level API)란?\n", + "- 분산 데이터(RDD)와 분산 공유 변수(브로드캐스트 변수 및 축적기) 두 가지가 있음\n", + "\n", + "### 언제 하위 수준 API를 사용하는지?\n", + "1. 상위 레벨 API에서는 찾을 수 없는 기능이 필요할 때. 예컨대 클러스터 전체의 물리적 데이터 배치를 엄격하게 제어할 때(?)\n", + "2. RDD를 이용하여 일부 레거시 코드 베이스를 유지 관리 해야 할 때\n", + "3. 사용자 지정 공유 변수를 다뤄야 할 때(14챕터)\n", + "\n", + "- 꼭 이런 이유가 아니더라도 모든 스파크 워크로드는 RDD 기반이므로 이해해두면 좋다\n", + "- DF 변환을 수행하면 사실 RDD 변환이 안에서는 돌아가는 거임\n", + "- 디버깅하거나 복잡한 워크로드 짜는데 도움됨\n", + "\n", + "### 어떻게 하위 수준 API를 사용하는지?\n", + "- SparkContext는 하위 수준 API의 entry point\n", + "- 스파크 클러스터에서 계산을 수행하는 도구인 SparkSession을 통해 SparkContext에 접근한다(자세한 건 15장)\n", + "- `spark.sparkContext`" + ] + }, + { + "cell_type": "markdown", + "id": "27cb32d5", + "metadata": {}, + "source": [ + "## RDD에 대하여\n", + "- 1.X 버전에서는 기본 API이고, 2.X에서도 사용 가능하지만 일반적이진 않다\n", + "- 하지만 내부에서는 다 RDD로 컴파일됨 -> Spark UI도 RDD의 맥락에서 작업 실행을 설명\n", + "- 한 줄 요약: RDD는 병렬로 작동할 수 있는 불변의 분할된 레코드 모음\n", + " - 각 record가 스키마로 구조화된 DF와는 다르다!\n", + " - named tuple의 느낌으로 다가오는데?\n", + "- RDD의 모든 레코드는 Java/Python 개체 -> 모든 제어는 가능\n", + " - 근데 모든 조작을 다시 만들어야됨(Reinvent the wheel)\n", + " - 최적화도 안되어있음\n", + " - 그니까 가능하면 구조화된 API쓰자 ..\n", + "- RDD가 구조화된 데이터 엔진에 안 쓰이니까 앞에서 본 dataset과 유사하다\n", + " - RDD <-> Dataset 변환은 쉬움 -> 두 API 모두 사용해서 각 API의 장단점을 보겠음\n", + " \n", + "### Types of RDDs\n", + "- 스파크 API 문서를 보면 RDD의 하위 클래스가 많은데\n", + " - 대부분 DF API의 최적화된 실행 계획을 위한 것\n", + " - 일반 유저는 \"일반\" RDD나 집계를 위한 키 값 RDD 두 가지만 쓸 수 있고 그 둘만 중요\n", + "- RDD는 다음 다섯가지 특성으로 구별된다\n", + " 1. 파티션 목록\n", + " 2. 각 분할의 계산을 위한 함수\n", + " 3. 다른 RDD에 대한 종속성 목록\n", + " 4. (optional) key-value RDD를 위한 Partitioner(이게 RDD를 쓰는 핵심 이유일 수 있다. 성능-안정성에 도움이 된다:13장 참조)\n", + " 5. (optional) 각 분할을 계산할 기본 위치 목록(ex. HDFS 파일의 블록 위치)\n", + "- 이러한 속성을 프로그램을 스케쥴하고 실행하는 스파크의 모든 기능에 영향(determine)\n", + "- 다른 종류의 RDD는 위의 속성들을 자체적으로 구현(?)\n", + "- RDD는 앞에서 보았던 스파크 프로그래밍 패러다임을 그대로 따른다: lazy, action, eager evaluate, 분산 처리 등\n", + " - 즉 DF랑 동일한 방식이라는 것\n", + "- 근데 RDD에는 행이 없고, 개별 레코드(Java/Scala/Python 객체)만이 존재\n", + "- RDD API는 Java/Scala/Python 모두에서 사용 가능\n", + " - Scala/Java에서는 raw object를 다루는 거랑 성능 차이 없는데, python에서는 성능이 좀 나가리될 수 있다\n", + " - Python RDD를 쓴다는 건 Python UDF를 row-by-row로 하는 거랑 같은 것\n", + " - 데이터를 python 프로세스에 직렬화하고, python에서 돌린 다음 다시 JVM에 직렬화 -> RDD의 오버헤드가 크다\n", + " - 따라서 구조화된 API를 쓰고 꼭 필요할 때만 RDD를 써라\n", + "\n", + "### 언제 RDD를 쓰나\n", + "- 매우 특별한 이유가 없으면 쓰지 말라. DF가 훨씬 효율적이고 안정적이며 표현력이 좋다\n", + "- 가장 큰 이유는 보통 데이터의 물리적 배포(사용자 지정 데이터 파티셔닝)을 세부적으로 제어 하는 것\n", + "\n", + "### Datasets and RDDs of Case Classes\n", + "- 웹에서 본 건데 흥미로운 질문: Case Calsses의 RDD와 Dataset의 차이는?\n", + " - Dataset은 구조화된 API가 제공하는 기능과 최적화를 활용할 수 있다.\n", + " - Dataset을 쓸 때는 JVM 이나 Spark에서 돌아가는 것 중 하나를 선택할 필요가 없다(?)\n", + " - 가장 쉬운 걸 쓰면 된다" + ] + }, + { + "cell_type": "markdown", + "id": "243113f1", + "metadata": {}, + "source": [ + "## RDD 만들기\n", + "\n", + "### DF, Dataset, RDD 간의 상호 운용\n", + "- RDD를 만드는 가장 쉬운 방법은 기존 DF나 Dataset을 변환하는 것\n", + " - `spark.range(500).rdd`처럼.\n", + "- Python에서는 Dataset이 없어서 row형식의 RDD를 반환받게 된다\n", + " - Scala: `spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))`\n", + " - Python: `spark.range(10).toDF(\"id\").rdd.map(lambda row: row[0])`\n", + "- 원복은 toDF\n", + " - `spark.range(10).rdd.toDF()`\n", + "- 이렇게 만들어진 RDD는 Row 유형.\n", + " - 이 행들은 Spark가 구조화된 API의 데이터를 나타내는데 사용하는 내부 카탈리스트 형식.\n", + " - 이런 식으로 구조화된 API와 하위 수준 API를 왔다갔다\n", + "- RDD랑 Dataset은 API가 비슷해보일 거임\n", + " - 실제로 RDD는 Dataset의 하위 수준 형상\n", + " - 둘 다 구조화된 API이 갖고 있는 편리한 기능과 인터페이스가 없음 ...\n", + "\n", + "### 로컬 콜렉션에서\n", + "- collection에서 RDD를 만들려면 병렬화를 해야 함\n", + " - 여기서 말하는 collection은 걍 array를 말하는듯?\n", + "- 이때 파티션 수를 명시할 수 있음\n", + "- 이름도 따로 지어줄 수 있음(Spark UI에서 확인 가능)\n", + "```\n", + "myCollection = \"Spark The Definitive Guide : Big Data Processing Made Simple\".split(\" \")\n", + "words = spark.sparkContext.parallelize(myCollection, 2)\n", + "```\n", + "\n", + "### Data source에서\n", + "- 데이터 소스나 텍스트 파일에서 RDD를 만들 수도 있는데, Data Source API를 사용하는 게 더 좋음\n", + "- RDD에는 DF와 달리 데이터 소스 API 개념이 없다\n", + " - 주로 종속 구조와 파티션 목록을 정의\n", + " - 9장에서 본 Data Source API가 거의 항상 더 좋은 방법\n", + "- 결과적으로 RDD의 각 레코드가 읽으려던 텍스트 파일 또는 파일의 행을 나타냄 (혹은 두번째 코드처럼 통째로 파일 하나)\n", + "- 이 RDD에서 파일의 이름은 첫 번째 개체이고, 텍스트 파일의 값이 두번째 문자열 개체\n", + "- `spark.sparkContext.textFile(\"/some/path/withTextFiles\")`\n", + "- `spark.sparkContext.wholeTextFiles(\"/some/path/withTextFiles\")`" + ] + }, + { + "cell_type": "markdown", + "id": "29d9d2c9", + "metadata": {}, + "source": [ + "## RDD 다루기\n", + "- RDD를 다루는 건 DF랑 따로 차이가 없다\n", + "- 다만 Raw Java/Scala를 개체를 다룬다는 게 핵심 차이" + ] + }, + { + "cell_type": "markdown", + "id": "33e1201a", + "metadata": {}, + "source": [ + "## 변환\n", + "- 대부분의 변환은 구조화된 API의 기능을 반영mirror\n", + "- DF/Dataset에서처럼 하나의 RDD에서 변환하면 새로운 RDD가 생성됨 -> 종속성도 정의되겠지?\n", + "\n", + "### distinct\n", + "- `words.distinct().count()``\n", + "- drop_duplicates지 뭐\n", + "\n", + "### filter\n", + "- 필터링은 SQL에서 where 절이랑 같은 것\n", + "- 필터함수에서는 부울을 반환하는 형식\n", + "\n", + "```\n", + "def startsWithS(individual):\n", + " return individual.startswith(\"S\")\n", + "\n", + "words.filter(lambda word: startsWithS(word)).collect()\n", + "```\n", + "\n", + "### map\n", + "- 11장에서 본 거랑 같은 것\n", + "- 값이 주어지면 원하는 걸 반환하는 형식\n", + "```\n", + "words2 = words.map(lambda word: (word, word[0], word.startswith(\"S\")))\n", + ">> “Spark,” “S,” and “true,”\n", + "words2.filter(lambda record: record[2]).take(5)\n", + ">> Boolean만\n", + "```\n", + "\n", + "### flatMap\n", + "- map 함수의 간단한 확장: flattening해주는 것\n", + "`words.flatMap(lambda word: list(word)).take(5)`\n", + "\n", + "\n", + "### sort\n", + "- sordBy 메소드로 RDD를 정렬 가능\n", + "- RDD 개체에서 기준이 되는 값을 추출하고 이걸로 정렬\n", + "- 아래 함수는 단어 길이를 기준으로 내림차\n", + "`words.sortBy(lambda word: len(word) * -1).take(2)`\n", + "\n", + "\n", + "### Ransdom Splits\n", + "- RDD를 RDD 배열로 무작위 분할 가능\n", + "`fiftyFiftySplit = words.randomSplit([0.5, 0.5])`" + ] + }, + { + "cell_type": "markdown", + "id": "5a13451d", + "metadata": {}, + "source": [ + "## Actions\n", + "- DF와 Dataset처럼 action을 지정 가능\n", + "\n", + "### reduce\n", + "- reduce를 이용하여 모든 종류의 값의 RDD를 하나의 값으로 reduce할 수 있다\n", + "- 예를 들어 숫자는 합으로 줄일 수 있다\n", + "- 함수형 프로그래밍을 좀 친다면, 새로운 개념new condcept이 되면 안된다?\n", + "\n", + "`spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210`\n", + "- 텍스트에서 가장 긴 놈을 뽑을 수도 있다\n", + "\n", + "```\n", + " def wordLengthReducer(leftWord, rightWord):\n", + " if len(leftWord) > len(rightWord):\n", + " return leftWord\n", + " else:\n", + " return rightWord\n", + " words.reduce(wordLengthReducer)\n", + "```\n", + "- 파티션에 대한 reduce가 결정론적이지 않아 무조건 왼쪽으로 한다거나 할 수 있다\n", + " - 해결이 안되나 ..?\n", + " \n", + "### count\n", + "**countApprox**\n", + "- 반환 signature(?)이 이상하긴 하지만, 꽤 정교하다\n", + "- 근사이긴 한데 빠르고 시간 제한에 따라 결과가 불안정할 수 있다\n", + "- 오차 한계가 함께 나온다\n", + "\n", + "```\n", + "## 스칼라만 있네 ;;\n", + "val confidence = 0.95\n", + "val timeoutMilliseconds = 400\n", + "words.countApprox(timeoutMilliseconds, confidence)\n", + "```\n", + "\n", + "**countApproxDistinct**\n", + "- 두 가지 구현이 있고, 둘 다 streamlib의 \"Hyperloglog 실전: ...\"에서 가져왔음\n", + "- 방법 1.\n", + " - 상대적 정확도(>0.000017)를 인수로 받아서, 그 값이 작을수록 더 많은 공간이 필요함\n", + " - `words.countApproxDistinct(0.05)`\n", + "- 방법 2. \n", + " - 일반 데이터의 매개 변수(p)와 희소 표현의 매개 변수(sp)를 인자로 받는다 (모두 정수)\n", + " - 상대적 정확도는 대략 1.054 / sqrt(p)\n", + " - sp > p로 하면 카디널리티가 작을 때 메모리 소비 줄이고 정확도 업업\n", + " - `words.countApproxDistinct(4, 10)`\n", + "\n", + "**countByValue**\n", + "- 지정된 RDD의 값의 수를 센다\n", + "- 최종적으로 결과 집합을 드라이버의 메모리에 로드\n", + " - 따라서 결과 map이 작을 때만 써야 함\n", + "- `words.countByValue()`\n", + "\n", + "**countByValueApprox**\n", + "- 위의 것을 근사치로. \n", + "- `words.countByValueApprox(1000, 0.95)`\n", + "\n", + "**first**\n", + "- `words.first()`\n", + "\n", + "**max in min**\n", + "- `spark.sparkContext.parallelize(1 to 20).max()`\n", + "- `spark.sparkContext.parallelize(1 to 20).min()`\n", + "\n", + "**take**\n", + "- take랑 그 비슷한 놈들은 RDD에서 여러 값을 가져온다\n", + "- 먼저 한 파티션을 스캔하고, 다음 파티션을 스캔해서 필요한 파티션 수를 추정\n", + "- takeOrdered, takeSample, top 등이 있다\n", + " - takeSample을 이용하면 RDD에서 고정 크기 랜덤 표본을 가져올 수 있다 (?)\n", + " - top은 takeOrdered와 반대\n", + "\n", + "```\n", + "words.take(5)\n", + "words.takeOrdered(5)\n", + "words.top(5)\n", + "val withReplacement = true\n", + "val numberToTake = 6\n", + "val randomSeed = 100L\n", + "words.takeSample(withReplacement, numberToTake, randomSeed)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "4ce340af", + "metadata": {}, + "source": [ + "## Saving Files\n", + "- 일반 텍스트 파일에 쓰는 것\n", + "- RDD를 쓸 때는 Data source를 실제로 \"저장\"할 수 없다\n", + "- 파티션 내용을 하나하나 써야 하고, 상위 레벨 API에서도 똑같다\n", + "\n", + "### saveAsTextFile\n", + "- `words.saveAsTextFile(\"file:/tmp/bookTitle\")`\n", + "- 압축 코덱을 지정하고 싶으면 하둡에서 임포트해오면 된다\n", + "\n", + "```\n", + "import org.apache.hadoop.io.compress.BZip2Codec\n", + "words.saveAsTextFile(\"file:/tmp/bookTitleCompressed\", classOf[BZip2Codec])\n", + "```\n", + "\n", + "### SequenceFiles\n", + "- Spark는 하둡 에코 시스템에서 성장해서 좀 친하다\n", + "- SequenceFile은 이진 키-값으로 구성된 플랫 파일\n", + " - MapReduce에서 입/출력 형식으로 광범위하게 사용\n", + "- `words.saveAsObjectFile(\"/tmp/my/sequenceFilePath\")`\n", + "\n", + "\n", + "### Hadoop Files\n", + "- 다양한 종류의 하둡 파일 형식이 있다\n", + "- 하둡에서 고이거나 레거시 아님 거의 관련 없음 (그래서 생략하는듯)" + ] + }, + { + "cell_type": "markdown", + "id": "ef03b0e9", + "metadata": {}, + "source": [ + "## Caching\n", + "- DF, Data source와 마찬가지로 RDD캐싱에도 동일한 원리 적용됨: RDD 캐시하거나 유지 가능\n", + "- 기본적으로 캐시 및 지속의 메모리의 데이터만 처리\n", + "- 앞에서 지정했던 이름으로 지정 가능\n", + "- `words.cache()``\n", + "- singleton 개체의 스토리지 수준(org.apache.spark.storage)으로 스토리지 수준을 지정할 수 있습니다. 저장소 수준 - 메모리만 조합하고 디스크만 조합하며 별도로 힙을 벗어납니다. (이해 못함)" + ] + }, + { + "cell_type": "markdown", + "id": "7f88f39a", + "metadata": {}, + "source": [ + "## Checkpointing\n", + "- DF에서 쓸 수 없었던 것\n", + "- RDD를 디스크에 저장해서 향후 RDD에 대한 참조를 디스크에서 가져가는 것\n", + "- 메모리 말고 디스크에 쓰지만 개념적으로는 캐싱과 유사\n", + "\n", + "```\n", + "spark.sparkContext.setCheckpointDir(\"/some/path/for/checkpointing\")\n", + "words.checkpoint()\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "d8688f01", + "metadata": {}, + "source": [ + "## Pipe RDDs to System Commands\n", + "- 외부에 ETL을 붙인다는 건가?\n", + "- 파이프를 사용하면 파이프 요소에 의해 생성된 RDD를 외부 프로세스에 반환 가능\n", + "- RDD는 파티션 당 한 번씩 지정된 프로세스를 실행/계산?\n", + "\n", + "### mapPartitions\n", + "- 잘 이해안감\n", + "- 그니까 스파크는 파티션해서 돌린다는 거고\n", + "- RDD에 대한 map 작업이 까보면 MapPartitionRDD라는 거다\n", + "- `words.mapPartitions(lambda part: [1]).sum() // 2`\n", + "- 각 row에 대해 하는 것 같지만, 클러스터가 물리적으로 데이터를 운영하는 단위는 파티션이니까\n", + "\n", + "```\n", + "def indexedFunc(partitionIndex, withinPartIterator):\n", + " return [\"partition: {} => {}\".format(partitionIndex,\n", + " x) for x in withinPartIterator]\n", + " words.mapPartitionsWithIndex(indexedFunc).collect()\n", + "```\n", + "\n", + "### foreachPartition\n", + "- 반환없이 돌아간다?\n", + "- 반환없이 DB에 쓴다?\n", + "- 많은 data source커넥터가 쓰이는 방법이다 ..?\n", + "- 임시로 파일을 떨굴 수도 있다(예제 코드처럼)\n", + "\n", + "```\n", + "words.foreachPartition { iter =>\n", + " import java.io._\n", + " import scala.util.Random\n", + " val randomFileName = new Random().nextInt()\n", + " val pw = new PrintWriter(new File(s\"/tmp/random-file-${randomFileName}.txt\"))\n", + " while (iter.hasNext) {\n", + " pw.write(iter.next())\n", + " }\n", + "pw.close() }\n", + "```\n", + "\n", + "### glom\n", + "- Dataset의 모든 파티션을 가져와 배열로 변환한\n", + "- 드라이버에 데이터를 수집하고 각 파티션에 배열을 사용할 때 유용\n", + "- 파티션이 크거나 수가 많으면 드라이버가 충돌 -> 안정성 문제\n", + "- `spark.sparkContext.parallelize([\"Hello\", \"World\"], 2).glom().collect()`" + ] + }, + { + "cell_type": "markdown", + "id": "903c2814", + "metadata": {}, + "source": [ + "## Conclusion\n", + "- 단일 RDD와 RDD API의 기본 사항들을 봤다\n", + "- 담 장에서는 join 및 key-value RDD와 같은 고오급 RDD를 보겠다" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5469fb09", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/presentations/week16/chapter16.ipynb b/presentations/week16/chapter16.ipynb new file mode 100644 index 0000000..11d23f1 --- /dev/null +++ b/presentations/week16/chapter16.ipynb @@ -0,0 +1,326 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d4e171bf", + "metadata": {}, + "source": [ + "# Chapter 16. 스파크 어플리케이션 개발하기" + ] + }, + { + "cell_type": "markdown", + "id": "ac408c07", + "metadata": {}, + "source": [ + "## Writing Spark Application\n", + "- 스파크 어플리케이션: 스파크 클러스터와 코드의 조합\n", + "- standalone에서: 클러스터는 로컬 모드, 애플리케이션은 미리 정의된 모드" + ] + }, + { + "cell_type": "markdown", + "id": "4ce38aba", + "metadata": {}, + "source": [ + "### A Simple Scala-Based App\n", + "- Scala는 Spark의 Native언어. 그냥 스칼라 어플리케이션 만드는 거랑 같음\n", + "- TIP: 스칼라는 좀 빡칠 수 있지만, 네 출신에 따라 배울 가치가 있을 수도? 꼭 다 배워야 하는 것도 아님\n", + "- JVM 기반 빌드 도구인 sbt나 Maven으로 애플 빌드 가능\n", + "- sbt가 가장 쉬움\n", + "- scala가 당장은 관심이 없고, 어차피 자세한 건 여기서도 다루지 않아(beyond the scope) 생략" + ] + }, + { + "cell_type": "markdown", + "id": "077c1b55", + "metadata": {}, + "source": [ + "### Writing Python Applications\n", + "- 파이스파크 짜는 건 걍 파이썬 코드 짜는 거랑 다를 것 없음\n", + "- 빌드 없이 스크립트만 있으면 됨\n", + "- 코드 재사용을 위해 egg나 ZIP파일로 파이썬 파일을 마는 것이 일반적\n", + " - spark-submit의 --py-files 인수 사용\n", + "- 코드를 돌릴 때는, Scala/Java main class에 상응하는 SparkSession을 만들어 실행 스크립트로 지정한다\n", + "\n", + "```\n", + "# in Python\n", + " from __future__ import print_function\n", + " if __name__ == '__main__':\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder \\\n", + " .master(\"local\") \\\n", + " .appName(\"Word Count\") \\\n", + " .config(\"spark.some.config.option\", \"some-value\") \\\n", + " .getOrCreate()\n", + " print(spark.range(5000).where(\"id > 500\").selectExpr(\"sum(id)\").collect())\n", + "```\n", + " \n", + "- 위의 코드로 만들어진 Sparksession을 통해 app과 소통. 단, 런타임에만 전달\n", + "- pip을 이용해 pyspark의 종속성을 지정(pip으로 설치하란 의미인듯)\n", + "\n", + "**Running the application**\n", + "- 아래 코드 예시처럼 spark-submit하면 됨\n", + "\n", + "```$SPARK_HOME/bin/spark-submit --master local pyspark_template/main.py```" + ] + }, + { + "cell_type": "markdown", + "id": "74171dc0", + "metadata": {}, + "source": [ + "### Writing Java Applications\n", + "- 자바는 스칼라에서와 같은데, 종속성만 다름. 자세한 건 생략\n", + "- 자바 몰라요 죄송 ...." + ] + }, + { + "cell_type": "markdown", + "id": "57fc256e", + "metadata": {}, + "source": [ + "## Testing Spark Applications\n", + "- 어케 스파크 코드를 짜는지는 알았으니 이제 테스트를 보자" + ] + }, + { + "cell_type": "markdown", + "id": "8eb7f765", + "metadata": {}, + "source": [ + "### Strategic Principles\n", + "- 데이터 파이프라인과 Spark App을 테스트하는 건 개중요\n", + "\n", + "**Input data resilience**\n", + "- 다양한 종류의 입력에 탄력적으로 대응하도록 데이터 파이프라인을 짜야함\n", + "- 비즈니스가 바뀌면 데이터도 바뀌므로, 어느 정도 변경에 대해서는 탄력적이어야 함\n", + "- 아님 발생하는 장애에 우아하고 탄력적으로 대응할 수 있던가\n", + "- 엣지 케이스 잘 커버해야하고, 알람은 알맞은 때에만 울리도록 하자\n", + "\n", + "**Business logic resilience and evolution**\n", + "- 파이프라인의 비즈니스 논리도 변경될 가능성이 높음\n", + "- 더 중요한 것 원천 데이터에서 추론된 게 개발자가 의도한 것대로 된 것이냐 -> 실제 데이터에서 로버스트한 논리 검사를 해야함\n", + "- 기능적인 동작을 확인하기 위해서만 스파크 unit test를 하는 게 아니라, 비즈니스 파이프라인이 잘 동작하는지 확인해야함\n", + "\n", + "**Resilience in output and atomicity**\n", + "- 입력, 로직을 확인했으면 이제 결과를 봐야함. 일단 출력 스키마를 확인해보자\n", + "- 데이터를 쓰고, 다시 읽지 않은 상태에서 끝나는 건 드문 일. \n", + " - 즉 파이프라인의 output은 보통 다른 파이프라인의 input\n", + " - 따라서 다음 파이프라인의 consumer가 데이터의 \"state\"를 이해해야함\n", + " - 잘 완료됐는지, 업데이트 빈도는 어떻게 되는지\n", + "- 사실 이건 스파크랑 별개로 데이터 파이프라인을 구축할 때 일반적인 원칙임" + ] + }, + { + "cell_type": "markdown", + "id": "014f6de5", + "metadata": {}, + "source": [ + "### Tactical Takeaways\n", + "- 위에서는 개념적인 걸(전략적 사고)를 봤으니, 여기서는 구체적인 전략(전술)을 보겠음\n", + "- 가장 좋은 접근 방식은 \n", + " - 적절한 unit test롤 비즈니스 로직을 확인하고\n", + " - 입력 데이터 변경에 탄력성을 확보하거나 스키마를 고정시키는 것\n", + "- 어떻게 하는지 결정은 개발자에게 달려있음. 비즈니스 도메인이나 관련 전문성에 따라 달라질 수 있기 때문\n", + "\n", + "**Managing SparkSessions**\n", + "- local모드에서 JUnit이나 ScalaTest를 돌리면 unit test는 쉽게 할 수 있음\n", + "- SparkSession은 한 번만 생성한 뒤, 생성된 session을 관련 함수/클래스에 전달하는 것이 좋다\n", + "\n", + "**Which Spark API to Use?**\n", + "- RDD, DF, Dataset 등 API가 많은데, 정답은 없음\n", + "- 단 API는 뭘 쓰든 각 기능의 입/출력 유형을 문서화하고 테스트하는 게 좋음\n", + "- Type-safe API??\n", + " - The type-safe API automatically enforces a minimal contract for your function that makes it easy for other code to build on it\n", + "- DF나 SQL 같은 동적 프로그래밍의 경우 무슨 일이 날 줄 모르기 때문에 input에 대한 유형화/테스트에 시간이 쓰임\n", + "- RDD API처럼 정적이면 꼭 기능이 필요한 경우에만 써야 함. 최적화도 안되고 기능도 제한적이라서.\n", + "- 일반적으로 대규모 App이나 성능 이슈가 있으면 Scala나 Java같은 정적 언어가 낫지만, 그렇지 않으면 Python이나 R 쓰자.\n", + "- 단, 스파크 코드는 모든 언어의 표준 유닛 테스트 프레임 워크에서 테스트 가능해야함" + ] + }, + { + "cell_type": "markdown", + "id": "8008d34d", + "metadata": {}, + "source": [ + "### Connecting to Unit Testing Frameworks\n", + "- unit test를 할 땐 해당 언어에 맞는 표준 프레임 워크(UNist이나 ScalaTest)를 써라\n", + "- 각 테스트에 대한 스파크 세션을 만들고 정리하도록 테스트 하니스test harness를 써라(?)\n", + "- 프레임 워크마다 다르기 때문에, 템플릿에 일부 유닛 테스크 코드를 포함함" + ] + }, + { + "cell_type": "markdown", + "id": "2d73a490", + "metadata": {}, + "source": [ + "### Connecting to Data Sources\n", + "- 가능하면 테스트 코드가 프로덕션 데이터 소스에 연결되지 않아야 함 -> 테스트 코드를 쉽게 분리하여 실행\n", + "- 비즈니스 로직 기능을, df나 dataset을 입력 받도록 하면 됨\n", + "- Structured API를 쓸 때도 dummy dataset을 table르 등록하고 이용" + ] + }, + { + "cell_type": "markdown", + "id": "74469c04", + "metadata": {}, + "source": [ + "## The Development Process\n", + "- 보통 notebook같은 interactive한 곳에서 코딩하고, 나중에 library나 package로 이동\n", + "- 당장 돌려보기 편리하니까. databricks같은 것도 좋음\n", + "- 로컬 컴퓨터에서 쓸 때는 spark-shell이 좋음\n", + "- 그 중에서 spark-submit은 product app을 위한 것" + ] + }, + { + "cell_type": "markdown", + "id": "d58fee3b", + "metadata": {}, + "source": [ + "## Launching Applications\n", + "- 스파크 app을 돌리는 가장 일반적인 방법은 spark-submit\n", + "- 옵션과 JAR 등을 다음과 같이 지정하면 됨\n", + "\n", + "```\n", + " ./bin/spark-submit \\\n", + " --class \\\n", + " --master \\\n", + " --deploy-mode \\\n", + " --conf = \\\n", + " ... # other options\n", + " \\\n", + " [application-arguments]\n", + "```\n", + "\n", + "- 클러스터 모드로 할지 클라이언트 모드로 할 지도 지정할 수 있음\n", + "- 근데 executor와 driver 사이의 지연을 줄이기 위해서는 클러스터 모드를 사용하는 게 좋음\n", + "\n", + "### Application Launch Examples\n", + "- 아래 코드에서는 옵션들을 살펴보면 좋겠음\n", + "\n", + "```\n", + " ./bin/spark-submit \\\n", + " --class org.apache.spark.examples.SparkPi \\\n", + " --master spark://207.184.161.138:7077 \\\n", + " --executor-memory 20G \\\n", + " --total-executor-cores 100 \\\n", + " replace/with/path/to/examples.jar \\\n", + " 1000\n", + "```\n", + "\n", + "- 아래 스니펫은 파이썬도 동일\n", + "\n", + "```\n", + " ./bin/spark-submit \\\n", + " --master spark://207.184.161.138:7077 \\\n", + " examples/src/main/python/pi.py \\\n", + " 1000\n", + "```\n", + "\n", + "- local로 하면 로컬 모드이고, local[*]로 하면 모든 코어 사용" + ] + }, + { + "cell_type": "markdown", + "id": "c812adb4", + "metadata": {}, + "source": [ + "## Configuring Applications\n", + "- 스파크에는 다양한 구성이 있고, 대부분은 아래에 속함\n", + " - Application properties Runtime environment\n", + " - Shuffle behavior\n", + " - Spark UI\n", + " - Compression and serialization Memory management Execution behavior Networking\n", + " - Scheduling\n", + " - Dynamic allocation Security Encryption\n", + " - Spark SQL\n", + " - Spark streaming\n", + " - SparkR\n", + "- 스파크에서 configure는 다음의 세 부분에서\n", + " - SparkConf가 대부분의 파라미터 제어\n", + " - Java 시스템 속성\n", + " - 하드코딩된 config 파일\n", + "- 스파크의 root에서 conf아래에 템플릿들이 있음\n", + " - 하드코딩하거나 런타임에서 지정하여 템플릿 사용가능\n", + " - 환경변수를 이용해서 설정할 수도 있음\n", + " - log4j.properties로 로깅 구성 가능\n", + " - 이거 해킹 이슈 있지 않았음?\n", + " \n", + " \n", + "### The SparkConf\n", + "- 얘가 모든 App 구성 관리함\n", + "- 아래 코드 참조. 참고로 SparkConf는 immutable\n", + "\n", + "```\n", + "# in Python\n", + " from pyspark import SparkConf\n", + " conf = SparkConf().setMaster(\"local[2]\").setAppName(\"DefinitiveGuide\")\\\n", + " .set(\"some.conf\", \"to.some.value\")\n", + "```\n", + "\n", + "### Application Properties\n", + "- App 속성은 spark-submit이나 Spark App을 만들 때 설정하는 것. 표 16-3 참조\n", + "- 드라이버의 4040 포트, \"Environment\" 탭에서 속성 확인 가능\n", + "- 명시적으로 지정된 값 외에는 모두 디폴트\n", + "\n", + "### Runtime Properties\n", + "- 일반적이지는 않지만 런타임 환경을 config할 때도 있음\n", + "- 여백이 부족해 다 적지는 않음. 스파크 설명서 참조\n", + "- driver와 execute, python worker config, 기타 logging 등 설정 가능\n", + "\n", + "### Execution Properties\n", + "- 실제 실행을 세부적으로 제어하기 땜에 가장 관련있는 것. \n", + "- 여백이 없어 생략. 도큐 참조.\n", + "- spark.exeutor.core와 같은 것 변경 가능\n", + "\n", + "### Configuring Memory Management\n", + "- 최적화를 위해 메모리 옵션을 수동 관리해야할 때가 있ㅇ므\n", + "- 2.X에서는 자동 메모리 관리로 없어진 게 많아서 end user랑은 상관 ㅇ벗는 게 많음\n", + "- 여백 없어 생략했으니 도큐 참조\n", + "\n", + "### Configuring Shuffle Behavior\n", + "- 오버헤드 문제로 shuffle이 병목이 될 수 있음\n", + "- low level로 제어할 수 있는 게 좀 있음\n", + "- 여백 없어 생략했으니 도큐 참조\n", + "\n", + "### Environmental Variables\n", + "- 환경 변수로 특정 스파크 설정 구성 가능\n", + "- 설치된 디렉토리의 conf/spark-env.sh 스크립트에서 읽어옴\n", + "- Standalone이나 Mesos에서, hostname과 같은 기기 관련 정보를 제공할 수 있음?\n", + "- spark-env.sh는 기본적으로는 존재하지 않는 파일 -> conf/spark-env.sh.template을 복사해서 사용\n", + "\n", + "### Job Scheduling Within an Application\n", + "- 여러 스레드?에서 동시에 작업이 submit되면 동시에 병렬로 job이 실행될 수 있음\n", + "- 스파크 스케쥴러는 thread-safe?이고 동시에 여러 요청을 처리할 수 있게 해줌\n", + "- 기본적으로는 FIFO인데, 맨 앞의 작업이 좀 작아 여유가 되면 뒤의 작업을 동시에 처리하기도 ㅎ마\n", + "- 공평하게 자원을 배분할 수도 있음 -> 라운드 로빈 방식\n", + " - spark.scheduler.mode를 FAIR로 설정\n", + " - job들을 pool로 그룹화하고 각 풀에 대해 서로 다른 예약옵션/가중치 설정을 지원 -> 우선순위 풀 생성 가능\n", + " - 새로 submit된 job은 default pool로 옮기는데, 따로 pool을 생성해줄 수도 있음\n", + " - 스레드가 연결된 풀을 지우려면 null로 셋업" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}