반응형
※ employees.csv를 spark DataFrame으로 읽은 후, 아래 질의를 수행
- 급여(Salary)가 5000이상인 50번 부서 소속의 직원들을 대상으로 이름, 부서번호, 급여액을 표시
employee_sdf = spark.read.csv('C:\\labs\\employees.csv', header = True, inferSchema = True)
employee_sdf.printSchema()
employee_sdf.show()
# SQL을 사용한 결과
employee_sdf.createOrReplaceTempView('employees')
spark.sql('SELECT LAST_NAME, DEPARTMENT_ID, SALARY FROM employees WHERE SALARY >= 5000 AND DEPARTMENT_ID = 50').show()
# select()와 filter()를 사용한 결과
employee_sdf.filter((employee_sdf.SALARY>=5000) & (employee_sdf.DEPARTMENT_ID == 50)).select('LAST_NAME','DEPARTMENT_ID', 'SALARY').show()
- 급여액에 대한 라벨 추가하기
0~5000: Low
5001~10000: Middle
10001~: High
@udf
def measure_salary(x):
if x > 10001:
return 'High'
elif x > 5000:
return 'Middle'
else:
return 'Low'
employee_sdf.withColumn('Score', measure_salary(employee_sdf.SALARY)).show()
- 정렬: 급여액이 높은 순서대로 정렬, 급여액이 동일할 경우 부서번호 낮은 순으로 정렬
employee_sdf.orderBy('SALARY', 'DEPARTMENT_ID', ascending = [False, True]).show()
- 중복제거: 부서의 개수는 몇 개인가?
# 중복제거
employee_sdf.select('DEPARTMENT_ID').drop_duplicates().show()
employee_sdf.select('DEPARTMENT_ID').drop_duplicates().count()
- 복원추출, 층화추출
복원추출) 비복원추출로 전체의 30% 추출
employee_sdf.sample(withReplacement=False, fraction = 0.3).show()
층화추출) 50번 부서 : 20번 부서 : 70번 부서 = 0.3 : 0.5 : 0.2
employee_sdf.sampleBy(col = 'DEPARTMENT_ID', fractions={50: 0.3, 20:0.5, 70: 0.2}).show()
- 빈도 확인
employee_sdf.freqItems(cols = ['DEPARTMENT_ID']).show()
1. pySpark 컬럼 추가
# withColumn(컬럼명, 데이터)
employee_sdf = employee_sdf.withColumn('Incentive', employee_sdf['Salary']*0.4)
employee_sdf.show()
2. pySpark 컬럼 삭제
# drop(컬럼명)
employee_sdf = employee_sdf.drop('Incentive')
employee_sdf.show()
3. pySpark 행 삭제
import pandas as pd
import numpy as np
tmp = pd.DataFrame({'a': [1, 2, np.nan], 'b': [np.nan, np.nan, np.nan]})
tmp = spark.createDataFrame(tmp)
tmp.show()
# 임의로 지정한 컬럼 값이 null인 경우 그 컬럼을 삭제하고 삭제된 컬럼이 제거된 DataFrame을 출력
# 행이 모두 null 값일 경우 행 삭제
tmp.na.drop('all').show()
# 행에서 하나라도 null값인 경우 행 삭제
tmp.na.drop('any').show()
4. UDF(User Defined Functions)
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
belem = spark.read.csv('station_belem.csv', header = True, inferSchema = True)
belem.show()
@udf(DoubleType())
def toCelsius(temp):
return (temp-32)*5/9
belem = belem.withColumn('TempC', toCelsius(belem.metANN))
belem.show()
5. 통계 관련 함수 적용
* belem2는 belem을 저장 후, 다시 불러온 스파크 데이터프레임
belem2.select('JAN', 'FEB', 'MAR').agg({'JAN': 'avg', 'FEB': 'count', 'MAR': 'stddev_samp'}).show()
6. 기술통계량
belem2.select('JAN').describe().show()
# summary는 보고 싶은 지표 지정이 가능
belem2.select('JAN').summary().show()
belem2.select('JAN').summary('count', 'mean','max').show()
7. 정렬
belem2.orderBy('YEAR', ascending = False).show(5)
8. 중복제거
# DataFrame.drop_duplicates()
(위의 예시 문제 참고)
반응형
'프로그래밍' 카테고리의 다른 글
[Web] 웹 사이트 개발(Tomcat, Servlet) (0) | 2021.06.16 |
---|---|
[데이터 수집] 빅데이터 수집 시스템 개발(BeautifulSoup) (0) | 2021.06.15 |
[pyspark] pyspark 프로그래밍 예제 (0) | 2021.06.10 |
[Window Spark] Window OS에 Spark 설치 (0) | 2021.06.10 |
[Spark] Ubuntu 20.04 Spark (0) | 2021.06.09 |