프로그래밍

[Spark] Ubuntu 20.04 Spark

RainIron 2021. 6. 9. 12:18
반응형

※ Apache Spark

- RDD(Resilitent(유연한, 탄력적인) Distributed Dataset): 분산된 데이터를 처리할 수 있는 시스템
   -> DataFrame으로 발전(가상의 테이블을 만들기 때문에 SQL을 적용할 수 있음)
 - MLlib: Machine Learning Library, 머신러닝이 가능한 모듈을 가지고 있음
 - GraphFrame: 시각화도구


※ 설치

1. File Download

wget https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz

https://downloads.apache.org/spark/spark-3.0.2/

 

Index of /spark/spark-3.0.2

 

downloads.apache.org

2. 환경변수 설정

export SPARK_HOME=/home/hdoop/spark-3.0.2
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export SPARK_CONF_DIR=$SPARK_HOME/conf
export SPARK_MASTER_HOST=localhost
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH

3. findspark 설치(pyspark를 초기화)

pip3 install findspark

4. 실행(Scala)

spark-shell

실행(Python)

pyspark

5. 테스트

df = spark.read.csv('biostats.csv')
df.show()


※ 예제

* HDFS에 있는 파일을 읽어서 Local에 저장

- DataFrame.show(): Print DataFrame

import findspark
findspark.init()

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp = sparkSession.read.csv('hdfs://localhost:9000/user/hive/warehouse/mytestdb.db/employee/employee.csv')
# 만약 header가 있다면, sparkSession.read.option('header', True).csv(filename)으로 실행한다.

print(type(df_emp))
df_emp.show()
print(dir(df_emp))

df_emp.write.csv('/home/hdoop/csv') # 출력 디렉터리가 미리 만들어져 있으면 충돌이 일어난다.
print('CSV copy saved...')

df_emp의 Type, df_emp의 내용, df_emp의 정보 출력

 

* HDFS에 저장된 csv 파일 중 원하는 칼럼만 출력하기

- DataFrame.select( 'Column Name' ) => Return DataFrame

import findspark
findspark.init()

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp2 = sparkSession.read.option('header', True).csv('hdfs://localhost:9000/user/hive/warehouse/employee2.csv')
# sparkSession.read.csv('~~', header = True)

df_emp2 = df_emp2.select('FIRST_NAME')
df_emp2.show()

print('CSV FIRST_NAME shown...')

 

* HDFS에 저장된 파일에서 급여가 9000이 넘는 직원의 이름과 급여를 출력

- DataFrame.filter( DataFrame 조건 ): SQL의 Where절과 비슷 => Return DataFrame

import findspark
findspark.init()

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
df_emp2 = sparkSession.read.option('header', True).csv('hdfs://localhost:9000/user/hive/warehouse/employee2.csv')
# sparkSession.read.csv('~~', header = True)

# salary is more than 9000
df_emp2 = df_emp2.select(['FIRST_NAME', 'SALARY']).filter(df_emp2['SALARY'] > 9000)
df_emp2.show()
print('Adpat Condition')

 

* HDFS에서 txt파일을 읽어 출력

import findspark
findspark.init()

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
print(rdd.collect())

- collect()함수를 사용해서 분산된 파일의 정보를 모아 출력한다.

 

* HDFS에서 txt 파일을 읽고, collect(), map() 함수를 사용해 2차원 배열 형태로 출력

- RDD.map( Apply Function ): Use function => Return RDD (Transformation)

- RDD.collect(): Print RDD

import findspark
findspark.init()

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()

rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))

print(parts.collect())

 

* HDFS에서 읽은 txt 파일을, DataFrame으로 제작

- SparkSession.createDataFrame( 'Row List' ): Create DataFrame => Return DataFrame

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')

parts = rdd.map(lambda line: line.split(',')) # rdd

row_list = parts.map(lambda e: Row(name =e[0], age=e[1])) # rdd

df = sparkSession.createDataFrame(row_list) # DataFrame
df.show()

 

* HDFS에서 읽은 txt 파일을 DataFrame으로 제작한 후, 'leo'의 나이를 출력

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')

parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name =e[0], age=e[1]))

df = sparkSession.createDataFrame(row_list)
df = df.filter(df['name'] == 'leo')
df.show()

 

* 제작한 DataFrame을 임시 테이블로 등록

- DataFrame.createOrReplaceTempView( 'Table Name' ): Create Temp View Table

- SparkSession.sql( 'SQL' ): Use SQL => Return DataFrame

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')

parts = rdd.map(lambda line: line.split(','))

row_list = parts.map(lambda e: Row(name =e[0], age=e[1]))

df = sparkSession.createDataFrame(row_list)
df.createOrReplaceTempView('emp')
sparkSession.sql('SELECT * FROM emp').show()

 

* DataFrame에서 원하는 칼럼을 출력하고자 할 때 다른 방법(Map 함수 사용)

- Map 연산을 하기 위해 rdd로 변환한 후, 사용한다.

mport findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')

parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)

print(df.rdd.map(lambda r: 'Name: '+r.name).collect())

 

* Spark DataFrame <-> Pandas DataFrame

- Spark DataFrame => Pandas DataFrame

  : spark_df.toPandas()

- Pandas DataFrame => Spark DataFrame

  : sparkSession.createDataFrame( pandas_df )

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')

parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)

# Spark DataFrame -> Pandas DataFrame
print('='*10)
df2 = df.toPandas()
print(df2)
print('='*10)
print()

# Pandas DataFrame -> Spark DataFrame
sd = sparkSession.createDataFrame(df2)
sd.show()

 

* 집계함수 사용

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *

sparkSession = SparkSession.builder.appName('example-pysaprk-hdfs').getOrCreate()
rdd = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/hive/warehouse/text.txt')
parts = rdd.map(lambda line: line.split(','))
row_list = parts.map(lambda e: Row(name=e[0], age=e[1]))
df = sparkSession.createDataFrame(row_list)

df.agg(max(col('age'))).show()

 

* csv파일에서 2005년도 상품의 Sales 합계와 평균 출력

import findspark
findspark.init()

from pyspark.sql import *

sparkSession = SparkSession.builder.appName('example-pyspark-hdfs').getOrCreate()
df = sparkSession.read.csv('hdfs://localhost:9000/user/hive/warehouse/sales_data_sample.csv', h>

# SQL: Year_ID == 2005 / SUM / AVG

# We need only 2005 Year_ID SALES
df2 = df.select(['Year_ID', 'SALES']).filter(df['Year_ID'] == 2005)

# Temp View 'SUM'
df2.createOrReplaceTempView('sales1')
sparkSession.sql('SELECT SUM(SALES) FROM sales1').show()

# Temp View 'AVG'
df2.createOrReplaceTempView('sales2')
sparkSession.sql('SELECT AVG(SALES) FROM sales2').show()

 

반응형