프로그래밍

[pyspark] pyspark 프로그래밍 예제2

RainIron 2021. 6. 11. 13:20
반응형

※ employees.csv를 spark DataFrame으로 읽은 후, 아래 질의를 수행  

- 급여(Salary)가 5000이상인 50번 부서 소속의 직원들을 대상으로 이름, 부서번호, 급여액을 표시

employee_sdf = spark.read.csv('C:\\labs\\employees.csv', header = True, inferSchema = True)
employee_sdf.printSchema()
employee_sdf.show()

# SQL을 사용한 결과
employee_sdf.createOrReplaceTempView('employees')
spark.sql('SELECT LAST_NAME, DEPARTMENT_ID, SALARY FROM employees WHERE SALARY >= 5000 AND DEPARTMENT_ID = 50').show()

# select()와 filter()를 사용한 결과
employee_sdf.filter((employee_sdf.SALARY>=5000) & (employee_sdf.DEPARTMENT_ID == 50)).select('LAST_NAME','DEPARTMENT_ID', 'SALARY').show()

 

- 급여액에 대한 라벨 추가하기

  0~5000: Low

  5001~10000: Middle

  10001~: High

@udf
def measure_salary(x):
    if x > 10001:
        return 'High'
    elif x > 5000:
        return 'Middle'
    else:
        return 'Low'

employee_sdf.withColumn('Score', measure_salary(employee_sdf.SALARY)).show()

 

- 정렬: 급여액이 높은 순서대로 정렬, 급여액이 동일할 경우 부서번호 낮은 순으로 정렬

employee_sdf.orderBy('SALARY', 'DEPARTMENT_ID', ascending = [False, True]).show()

- 중복제거: 부서의 개수는 몇 개인가?

# 중복제거
employee_sdf.select('DEPARTMENT_ID').drop_duplicates().show()
employee_sdf.select('DEPARTMENT_ID').drop_duplicates().count()

- 복원추출, 층화추출

  복원추출) 비복원추출로 전체의 30% 추출

employee_sdf.sample(withReplacement=False, fraction = 0.3).show()

  층화추출) 50번 부서 : 20번 부서 : 70번 부서 = 0.3 : 0.5 : 0.2

employee_sdf.sampleBy(col = 'DEPARTMENT_ID', fractions={50: 0.3, 20:0.5, 70: 0.2}).show()

- 빈도 확인

employee_sdf.freqItems(cols = ['DEPARTMENT_ID']).show()


1. pySpark 컬럼 추가

# withColumn(컬럼명, 데이터)
employee_sdf = employee_sdf.withColumn('Incentive', employee_sdf['Salary']*0.4)
employee_sdf.show()

2. pySpark 컬럼 삭제

# drop(컬럼명)
employee_sdf = employee_sdf.drop('Incentive')
employee_sdf.show()

3. pySpark 행 삭제

import pandas as pd
import numpy as np
tmp = pd.DataFrame({'a': [1, 2, np.nan], 'b': [np.nan, np.nan, np.nan]})
tmp = spark.createDataFrame(tmp)
tmp.show()
# 임의로 지정한 컬럼 값이 null인 경우 그 컬럼을 삭제하고 삭제된 컬럼이 제거된 DataFrame을 출력

# 행이 모두 null 값일 경우 행 삭제
tmp.na.drop('all').show()

# 행에서 하나라도 null값인 경우 행 삭제
tmp.na.drop('any').show()

4. UDF(User Defined Functions)

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

belem = spark.read.csv('station_belem.csv', header = True, inferSchema = True)
belem.show()

@udf(DoubleType())
def toCelsius(temp):
    return (temp-32)*5/9
    
belem = belem.withColumn('TempC', toCelsius(belem.metANN))
belem.show()

 

5. 통계 관련 함수 적용

* belem2는 belem을 저장 후, 다시 불러온 스파크 데이터프레임

belem2.select('JAN', 'FEB', 'MAR').agg({'JAN': 'avg', 'FEB': 'count', 'MAR': 'stddev_samp'}).show()

6. 기술통계량

belem2.select('JAN').describe().show()

# summary는 보고 싶은 지표 지정이 가능
belem2.select('JAN').summary().show()

belem2.select('JAN').summary('count', 'mean','max').show()

7. 정렬

belem2.orderBy('YEAR', ascending = False).show(5)

8. 중복제거

# DataFrame.drop_duplicates()

(위의 예시 문제 참고)

 

 

반응형