※ Apache Spark
- RDD(Resilitent(유연한, 탄력적인) Distributed Dataset): 분산된 데이터를 처리할 수 있는 시스템
-> DataFrame으로 발전(가상의 테이블을 만들기 때문에 SQL을 적용할 수 있음)
- MLlib: Machine Learning Library, 머신러닝이 가능한 모듈을 가지고 있음
- GraphFrame: 시각화도구
※ 설치
1. File Download
wget https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
https://downloads.apache.org/spark/spark-3.0.2/
2. 환경변수 설정
export SPARK_HOME=/home/hdoop/spark-3.0.2
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export SPARK_CONF_DIR=$SPARK_HOME/conf
export SPARK_MASTER_HOST=localhost
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH
3. findspark 설치(pyspark를 초기화)
pip3 install findspark
4. 실행(Scala)
spark-shell
실행(Python)
pyspark
5. 테스트
df = spark.read.csv('biostats.csv')
df.show()
※ 예제
* HDFS에 있는 파일을 읽어서 Local에 저장
- DataFrame.show(): Print DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp = sparkSession.read.csv('hdfs://localhost:9000/user/hive/warehouse/mytestdb.db/employee/employee.csv')
# 만약 header가 있다면, sparkSession.read.option('header', True).csv(filename)으로 실행한다.
print(type(df_emp))
df_emp.show()
print(dir(df_emp))
df_emp.write.csv('/home/hdoop/csv') # 출력 디렉터리가 미리 만들어져 있으면 충돌이 일어난다.
print('CSV copy saved...')
* HDFS에 저장된 csv 파일 중 원하는 칼럼만 출력하기
- DataFrame.select( 'Column Name' ) => Return DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp2 = sparkSession.read.option('header', True).csv('hdfs://localhost:9000/user/hive/warehouse/employee2.csv')
# sparkSession.read.csv('~~', header = True)
df_emp2 = df_emp2.select('FIRST_NAME')
df_emp2.show()
print('CSV FIRST_NAME shown...')
* HDFS에 저장된 파일에서 급여가 9000이 넘는 직원의 이름과 급여를 출력
- DataFrame.filter( DataFrame 조건 ): SQL의 Where절과 비슷 => Return DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp2 = sparkSession.read.option('header', True).csv('hdfs://localhost:9000/user/hive/warehouse/employee2.csv')
# sparkSession.read.csv('~~', header = True)
# salary is more than 9000
df_emp2 = df_emp2.select(['FIRST_NAME', 'SALARY']).filter(df_emp2['SALARY'] > 9000)
df_emp2.show()
print('Adpat Condition')
* HDFS에서 txt파일을 읽어 출력
import findspark
findspark.init()
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
print(rdd.collect())
- collect()함수를 사용해서 분산된 파일의 정보를 모아 출력한다.
* HDFS에서 txt 파일을 읽고, collect(), map() 함수를 사용해 2차원 배열 형태로 출력
- RDD.map( Apply Function ): Use function => Return RDD (Transformation)
- RDD.collect(): Print RDD
import findspark
findspark.init()
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
print(parts.collect())
* HDFS에서 읽은 txt 파일을, DataFrame으로 제작
- SparkSession.createDataFrame( 'Row List' ): Create DataFrame => Return DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(',')) # rdd
row_list = parts.map(lambda e: Row(name =e[0], age=e[1])) # rdd
df = sparkSession.createDataFrame(row_list) # DataFrame
df.show()
* HDFS에서 읽은 txt 파일을 DataFrame으로 제작한 후, 'leo'의 나이를 출력
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name =e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)
df = df.filter(df['name'] == 'leo')
df.show()
* 제작한 DataFrame을 임시 테이블로 등록
- DataFrame.createOrReplaceTempView( 'Table Name' ): Create Temp View Table
- SparkSession.sql( 'SQL' ): Use SQL => Return DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name =e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)
df.createOrReplaceTempView('emp')
sparkSession.sql('SELECT * FROM emp').show()
* DataFrame에서 원하는 칼럼을 출력하고자 할 때 다른 방법(Map 함수 사용)
- Map 연산을 하기 위해 rdd로 변환한 후, 사용한다.
mport findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)
print(df.rdd.map(lambda r: 'Name: '+r.name).collect())
* Spark DataFrame <-> Pandas DataFrame
- Spark DataFrame => Pandas DataFrame
: spark_df.toPandas()
- Pandas DataFrame => Spark DataFrame
: sparkSession.createDataFrame( pandas_df )
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)
# Spark DataFrame -> Pandas DataFrame
print('='*10)
df2 = df.toPandas()
print(df2)
print('='*10)
print()
# Pandas DataFrame -> Spark DataFrame
sd = sparkSession.createDataFrame(df2)
sd.show()
* 집계함수 사용
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)
df.agg(max(col('age'))).show()
* csv파일에서 2005년도 상품의 Sales 합계와 평균 출력
import findspark
findspark.init()
from pyspark.sql import *
sparkSession = SparkSession.builder.appName('example-pyspark-hdfs').getOrCreate()
df = sparkSession.read.csv('hdfs://localhost:9000/user/hive/warehouse/sales_data_sample.csv', h>
# SQL: Year_ID == 2005 / SUM / AVG
# We need only 2005 Year_ID SALES
df2 = df.select(['Year_ID', 'SALES']).filter(df['Year_ID'] == 2005)
# Temp View 'SUM'
df2.createOrReplaceTempView('sales1')
sparkSession.sql('SELECT SUM(SALES) FROM sales1').show()
# Temp View 'AVG'
df2.createOrReplaceTempView('sales2')
sparkSession.sql('SELECT AVG(SALES) FROM sales2').show()
'프로그래밍' 카테고리의 다른 글
[pyspark] pyspark 프로그래밍 예제 (0) | 2021.06.10 |
---|---|
[Window Spark] Window OS에 Spark 설치 (0) | 2021.06.10 |
[Sqoop] Ubuntu 20.04 Sqoop 활용(2) - 예시 (0) | 2021.06.08 |
[Sqoop] Ubuntu 20.04 Sqoop 활용 (0) | 2021.06.07 |
[Python] Text Data Analysis(2) (0) | 2021.06.06 |