pySpark & GraphFrames

pySpark & GraphFrames

Install

pyspark - graphframe Docker Image

https://hub.docker.com/r/tomerlevi/jupyter-spark-graphframes

Docker Run

docker run -p 8888:8888 -p 4040:4040 -v ~:/home/jovyan/workspace --name jupyter tomerlevi/jupyter-spark-graphframes

Use

위 Docker Container가 실행된 후, Jupyter Lab 으로 이동

http://127.0.0.1:8888/lab

pyspark 를 사용하기 위해, Session 을 생성해야함.

#from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local") \ .appName("Spark") \ .config("spark.some.config.option" , "some-value") \ .config("spark.sql.repl.eagerEval.enabled", True) \ .getOrCreate()

나는 파일 로 존재하는 Json Data 를 가져와 Graphframe을 통해 graph를 생성 하려 함.

with open ("/home/jovyan/work/response.json", 'r') as f: fread = f.read() context = json.loads(fread) with open ("/home/jovyan/work/result.json", 'w') as f: f.write(json.dumps(...)) df = spark.read.json("/home/jovyan/work/result.json")

주의! : 파일형태의 Json을 read하는 과정에서 오류가 발생 할 수 있다. 그때는 Json을 한줄 형태로 바꿔보자.

GraphFrames에서 사용하는 노드, 연관 관계 는 Vertices와 Edges 로 구분된다.

Vertices와 Edges DataFrame의 기본 형태는 다음과 같다.

vv = spark.createDataFrame(vertices, verti_colums) ee = spark.createDataFrame(edges, edge_colums) # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-===-=-=-=-=-=-==-= # vertices = [(), (), ...] # edges = [(), (), ...] # verti_colums = [ column1, column2, ...] # edges = [ column1, column2, ..., relationship] # Ex) # vertices = [(1, "Alpha", 20), (2, "Beta", 25)] # edges = [(1,2,"friend")] # verti_columns = ["id", "name", "age"] # edge_colums = ["src", "dst", "relation"]

GraphFrame 만들기

위 과정에서 만든 DataFrame을 활용한다.

from graphframe import * g = GraphFrame(vv, ee)

만든 GraphFrame을 시각적으로 표현하기

matplotlib의 pyplot 을 사용한다.

edges_list에 들어갈 'src', 'dst' 는 이전에 정의한거에 따라 바뀔 수 있다.

import networkx as nx import matplotlib.pyplot as plt def PlotGraph(edge_list): Gplot=nx.Graph() tmp = [] for row in edge_list.select('src','dst').take(g.edges.count()): tmp.append((row['src'], row['dst'])) Gplot.add_edges_from(tmp) #plt.subplot(121) plt.figure(figsize=(12, 12)) nx.draw(Gplot, cmap = plt.get_cmap('jet'), node_size=20) plt.show()

PlotGraph(g.edges)

만든 GraphFrame에서 BFS 알고리즘 적용하여 관계 찾아내기

pp = g.bfs( fromExpr = <시작할 노드의 특정 값>, #ex : "id = 0" toExpr = <도착할 노드의 특정 값>, #ex : "id = 1" ) pp.show()

이 외, GraphFrames에서 사용할 수 있는 함수들

g.find() #ex : g.find("(parent)-[e]->(child)") g.filter() #ex : g.filter("e.relation=='child'") g.show() pp.select() pp.first()

Pandas를 사용하여 Json을 DataFrame으로 만들고 데이터 확인 및 재정렬 하기

import pandas as pd df.toPandas() # all check df.toPandas().head() # 상위 5개만 df = df.orderBy("<컬럼명>", ascending=False) # DESC 정렬

from http://kangprog.tistory.com/112 by ccl(A) rewrite - 2021-08-05 12:26:32