※ HDFS
1. Hadoop
- 빅데이터 분산, 저장, 처리 시스템
- Java 언어로 작성되었고, Hadoop 기반의 프로그래밍은 자바를 사용
- 반드시 Java를 사용할 필요는 없음
- MapReduce: Java(비범용적, 복잡) / Python(범용적, 쉽고 간단)
2. Python을 이용하여 HDFS 활용
- Hadoop이 지원하는 hadoop-streamxxxxx.jar 기능 활용
- Map, Reduce를 연결할 때, 표준 입력 스트림/표준 출력 스트림을 사용
- 리눅스 상에서 파이프('|') 사용
ex) cat test.txt > copy.txt: 출력 방향이 모니터가 아닌 파일
cat test.txt >> copy.txt: copy.txt의 내용에 이어서 작성
cat test.py | copy.py: 왼쪽의 표준 출력 스트림을 오른쪽의 표준 입력 스트림으로 전달
- Python 모듈 중, hdfs 설치해야 사용 가능
" $ pip3 install hdfs "
- 모듈 확인 명령어: $ pip3 freeze -> 각 모듈 버전 확인 가능
3. 활용예시
1) HDFS로 파일 저장
import pandas as pd
from hdfs import InsecureClient
client_hdfs = InsecureClient('http://localhost:9870')
df = pd.DataFrame({'a': [1, 2, 3],
'b': [4, 5, 6]})
with client_hdfs.write('/user/csv/sample2.csv', append = False, overwrite = True,
encoding = 'utf-8') as writer:
df.to_csv(writer)
print('saved to HDFS')
2) HDFS로부터 읽기
import pandas as pd
from hdfs import InsecureClient
client_hdfs = InsecureClient('http://localhost:9870')
with client_hdfs.read('/user/csv/sample2.csv') as reader:
df = pd.read_csv(reader, index_col = 0)
print(df)
3) txt의 경우
# txt(read)의 경우
with client_hdfs.read('/user/csv/sample2.txt') as reader:
contents = reader.read()
# txt(write)의 경우
with client_hdfs.write('/user/csv/sample2.txt') as writer:
writer.write('Text 내용을 작성하세요')
※ HDFS 명령어
* 기본 명령어에서 'hdfs dfs'를 추가하면 거의 같은 기능을 실행한다.
1. $ hdfs dfs -ls 위치: 위치 폴더에 있는 폴더 및 파일 리스트 출력
2. $ hdfs dfs -ls -R 위치: Recursive Print List
3. $ hdfs dfs -put 파일명 저장위치/파일명: HDFS에 로컬에 있는 파일을 저장
4. $ hdfs dfs -appendToFile 파일명 저장위치/파일명: 기존 파일에 로컬 파일의 내용을 이어서 작성
5. $ hdfs dfs -chgrp 그룹명 파일: 파일을 사용할 수 있는 권한을 가진 그룹 변경
6. $ hdfs dfs -copyFromLocal 로컬파일 파일명: (로컬 -> HDFS) 파일 저장
7. $ hdfs dfs -copyToLocal 리모트파일 파일명: (HDFS -> 로컬) 파일 저장
8. $ hdfs dfs -count 폴더명: 폴더 내 파일 갯수
9. $ hdfs dfs -cp 리모트파일 '리모트에 저장할 파일이름': 파일 복사
10. $ hdfs dfs -get 리모트파일 '로컬 파일로 저장할 이름': (HDFS -> 로컬) 파일 저장
11. $ hdfs dfs -moveFromLocal 로컬파일 리모트위치: (로컬 -> HDFS) 파일 이동
※ HDFS 운영 규칙
1. 데이터 파일과 데이터 디렉터리를 HDFS 영역에 준비
$ hdfs dfs -mkdir -p /user/hdoop/input : input 폴더 생성
(단, /user/hdoop/output <- 미리 생성하면 실행 중에 오류가 발생)
2. mapper.py, reducer.py는 로컬에 준비
3. 리눅스의 파이프와 같은 역할을 하는 프로그램을 준비(경로확인)
- 파이프 역할: hadoop-streaming-3.2.1.jar
- 경로 확인: $ $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar
4. Hadoop 명령으로 mapper.py, reduer.py 실행
(예시)
$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
-files mapper.py,reducer.py \
-input /user/hdoop/input/* \
-output /user/hdoop/output \
-mapper mapper.py \
-reducer reducer.py
* -files : 로컬에 존재하는 mapper, reducer 파일
* -input : HDFS에 존재하는 입력 데이터 파일( ex: /user/hdoop/input/sample.csv)
* -output: HDFS에 출력 데이터를 만들 위치(같은 폴더가 있을 경우 오류 발생)
* -mapper: -files 중 Mapping할 파일
* -reducer: -files 중 Reduce할 파일
5. 출력 디렉터리에서 결과 확인
[ 기타 ]
1) Hadoop 명령으로 실행할 때, -files 옵션에서 파일이 띄어쓰기로 구분되어 있을 경우 오류 발생!
' Found 1 unexpected arguments on the command line '
2) 다른 예외 명령이 나올 때, 보통 작성한 파일의 문제인 경우가 많다.
특히, 로컬에서는 정상적으로 작동하지만, 실제로 하둡으로 돌릴 경우 원하는 결과가 나오지 않을 수 있다.
왜냐하면 로컬과 달리 하둡에서는 파일을 분할하여 분산 프로그래밍으로 실행하기 때문이다.
(예) 각 년도별 이용객 수 평균
Month | "1958" | "1959" | "1960" |
JAN | 340 | 360 | 417 |
FEB | 318 | 342 | 391 |
MAR | 362 | 406 | 419 |
APR | 348 | 396 | 461 |
MAY | 363 | 420 | 472 |
JUN | 435 | 472 | 535 |
JUL | 491 | 548 | 622 |
AUG | 505 | 559 | 606 |
SEP | 404 | 463 | 580 |
OCT | 359 | 407 | 461 |
NOV | 310 | 362 | 390 |
DEC | 337 | 405 | 432 |
1. 로컬에서 mapper.py, reducer.py를 실행한 경우
2. hadoop 명령어를 사용하여 mapper.py, reducer.py를 실행한 경우
- $ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar -files mapper3.py,reducer3.py -input /user/hdoop/input/airtravel.csv -output /user/hdoop/output/airtravel -mapper mapper3.py -reducer reducer3.py
- $ hdfs dfs -cat /user/hdoop/output/airtravel/part-00000
=> 파트를 2개로 나누어 돌려보면서 디버깅해본 결과, 인덱스 중간의 한 행이 사라진 것을 알 수 있었고, 코드를 살펴보기로 했다. 살펴보면서 잘못된 것을 발견했고, 분산 프로그래밍의 특성을 간과했다는 생각이 들었다.
!/usr/bin/python3
import csv
import sys
colnames = sys.stdin.readline().strip() # 잘못된 부분
for line in sys.stdin.readlines():
ls = line.strip().split(', ')
if ls != ['']:
print('%s\t%s\t%s' %(ls[1].strip(), ls[2].strip(), ls[3].strip()))
=> 코드 중, sys.stdin.readline().strip()이 잘못되었다.
분산 프로그래밍은 다른 노드에서 데이터를 분할하여 코드를 실행하는데, 분할된 상태에서도 readline()이 실행이 되어, column 값이 있는 행 뿐만 아니라, 분할된 행의 첫번째 행이 사라지게 되는 것이다.
3) 입력한 파일을 통해 데이터가 분할되서 실행된다는 것을 감안하여 분산 프로그래밍을 해야 한다.
4) 분산 프로그래밍으로 동적 프로그래밍은 거의 불가능하다.
예) 테이블에서 Beds가 3인 경우, Sell의 합을 구하시오
Sell | List | Living | Rooms | Beds |
142 | 160 | 28 | 10 | 5 |
175 | 180 | 18 | 8 | 4 |
129 | 132 | 13 | 6 | 3 |
138 | 140 | 17 | 7 | 3 |
232 | 240 | 25 | 8 | 4 |
135 | 140 | 18 | 7 | 4 |
150 | 160 | 20 | 8 | 4 |
207 | 225 | 22 | 8 | 4 |
271 | 285 | 30 | 10 | 5 |
89 | 90 | 10 | 5 | 3 |
153 | 157 | 22 | 8 | 3 |
87 | 90 | 16 | 7 | 3 |
234 | 238 | 25 | 8 | 4 |
106 | 116 | 20 | 8 | 4 |
175 | 180 | 22 | 8 | 4 |
165 | 170 | 17 | 8 | 4 |
분산 프로그래밍으로 첫번째 행에서 Beds의 인덱스와, Sell의 인덱스를 구하려는 코드를 넣으려고 했으나, 데이터가 분할되어 실행되는 특성으로 구하지 못했다. 대신, 사전에 사용자가 테이블에서 Sell과 Beds가 위치하는 인덱스를 알고 있어야 한다는 생각을 했고, 분산 프로그램을 실행하기 전에, 추가 프로그램으로 인덱스를 구해야한다.
'프로그래밍' 카테고리의 다른 글
[R] 예측 오차를 통한 예측 모델 성능 평가 (0) | 2021.06.02 |
---|---|
[Hive] Hive 설치 및 환경 조성 (0) | 2021.06.02 |
[Linux] Linux 명령어 정리 & Hadoop 설치 및 환경 조성 (0) | 2021.06.02 |
[R] 분석모델 확장(파생변수, 앙상블 기법) (0) | 2021.05.31 |
[R] R을 이용한 군집분석 (0) | 2021.05.29 |