Spark에서 Window 함수의 다양한 이용

스파크(Spark)에서 데이터 프레임을 다루다 보면, 다양한 함수들이 요구됩니다. 기본 함수들은 직관적으로 새로운 값을 생성하는 것에 초점을 맞추고 있습니다. 그런데, 많은 경우에 데이터 비교를 위해서 현재 행을 이전 행과 비교하기를 원합니다. 혹은 특정 컬럼에 대해서 Partiton된 데이터를 구하고 싶기도 합니다. 이럴 때에 주로 사용하는 함수가 바로 Window 함수입니다.

스파크의 윈도우 함수는 별도로 import를 해줘야 합니다.


import org.apache.spark.sql.expressions.Window

아래와 같은 데이터 프레임을 예제로 삼아서 함수들을 살펴보겠습니다.

val df = Seq( (1,"chip",1), (2,"drink",2), (3,"chip",2), (4,"fish",1), (5,"drink",3), (6,"other",5), (7,"drink",1), (8,"fish",4), (9,"other",1), (10,"other",6), (11,"drink",5), (12,"fish",7) ).toDF("id", "product", "number") df.show(false) +---+-------+------+ |id |product|number| +---+-------+------+ |1 |chip |1 | |2 |drink |2 | |3 |chip |2 | |4 |fish |1 | |5 |drink |3 | |6 |other |5 | |7 |drink |1 | |8 |fish |4 | |9 |other |1 | |10 |other |6 | |11 |drink |5 | |12 |fish |7 | +---+-------+------+

sum, min, max, avg 함수

sum, min, max, avg는 각 파티션에서 합, 최소, 최대, 평균을 구할 때에 사용됩니다. 이 중에서 특히 sum 함수는 Window와 함께 사용하면 특정 행까지의 누적 합을 구할 수 있습니다. 보통 이런 누적 합은 구간을 계산해야 할 때에 주로 쓰입니다. 예를 들어, 어떤 음료가 5개까지는 2달러, 그 이후부터는 1달러라고 하면 어느 시점에 5개가 되는지를 파악해야 합니다. 이런 경우에 누적 합을 구하면 유용하게 사용할 수 있습니다.

val window1 = Window.partitionBy("product") val window2 = Window.partitionBy("product").orderBy("id") .rowsBetween(Window.unboundedPreceding, Window.currentRow) df.withColumn("min", min("number").over(window1)) .withColumn("max", max("number").over(window1)) .withColumn("avg", avg("number").over(window1)) .withColumn("cumulativeSum", sum("number").over(window2)) .withColumn("cost", when(col("cumulativeSum") > 5, 1).otherwise(2)) .show(false) +---+-------+------+---+---+----+-------------+----+ |id |product|number|min|max|avg |cumulativeSum|cost| +---+-------+------+---+---+----+-------------+----+ |1 |chip |1 |1 |2 |1.5 |1 |2 | |3 |chip |2 |1 |2 |1.5 |3 |2 | |6 |other |5 |1 |6 |4.0 |5 |2 | |9 |other |1 |1 |6 |4.0 |6 |1 | |10 |other |6 |1 |6 |4.0 |12 |1 | |2 |drink |2 |1 |5 |2.75|2 |2 | |5 |drink |3 |1 |5 |2.75|5 |2 | |7 |drink |1 |1 |5 |2.75|6 |1 | |11 |drink |5 |1 |5 |2.75|11 |1 | |4 |fish |1 |1 |7 |4.0 |1 |2 | |8 |fish |4 |1 |7 |4.0 |5 |2 | |12 |fish |7 |1 |7 |4.0 |12 |1 | +---+-------+------+---+---+----+-------------+----+

이런 식으로 각 파티션 (여기서는 product) 별로 id 순서대로 정렬하고 특정 행까지의 누적 합이 구해진 것을 볼 수 있습니다. 그러면 이 열을 기반으로 가격도 유동적으로 적용해볼 수 있습니다.

lag, lead 함수

데이터 프레임의 이전, 이후 행을 현재 행과 비교하고 싶을 때에는 lag, lead 함수를 이용할 수 있습니다. 이번에는 상품 전체에서 비교해 보도록 하겠습니다. lag, lead 함수를 이용해서 이전, 이후 행을 현재 행으로 끌고 옵니다.

val window = Window.orderBy("id") df.withColumn("lead3", lead("number", 3, 0).over(window)) .withColumn("lead2", lead("number", 2, 0).over(window)) .withColumn("lead1", lead("number", 1, 0).over(window)) .withColumn("center", col("number")) .withColumn("lag1", lag("number", 1, 0).over(window)) .withColumn("lag2", lag("number", 2, 0).over(window)) .withColumn("lag3", lag("number", 3, 0).over(window)) .show(false) +---+-------+------+-----+-----+-----+------+----+----+----+ |id |product|number|lead3|lead2|lead1|center|lag1|lag2|lag3| +---+-------+------+-----+-----+-----+------+----+----+----+ |1 |chip |1 |1 |2 |2 |1 |0 |0 |0 | |2 |drink |2 |3 |1 |2 |2 |1 |0 |0 | |3 |chip |2 |5 |3 |1 |2 |2 |1 |0 | |4 |fish |1 |1 |5 |3 |1 |2 |2 |1 | |5 |drink |3 |4 |1 |5 |3 |1 |2 |2 | |6 |other |5 |1 |4 |1 |5 |3 |1 |2 | |7 |drink |1 |6 |1 |4 |1 |5 |3 |1 | |8 |fish |4 |5 |6 |1 |4 |1 |5 |3 | |9 |other |1 |7 |5 |6 |1 |4 |1 |5 | |10 |other |6 |0 |7 |5 |6 |1 |4 |1 | |11 |drink |5 |0 |0 |7 |5 |6 |1 |4 | |12 |fish |7 |0 |0 |0 |7 |5 |6 |1 | +---+-------+------+-----+-----+-----+------+----+----+----+

대각선으로 보시면, 한 줄씩 값들이 이동하는 모습을 쉽게 확인해 볼 수 있습니다. 기본 값은 원하는 값으로 설정 가능한데, 저는 0으로 설정했습니다.

first 함수

각각의 파티션에서 처음 값만을 뽑고 싶을 때에 사용하면 됩니다. 예를 들면 한 번에 가장 많이 팔린 경우의 id를 상품별로 뽑아보도록 하겠습니다.

val window = Window.partitionBy("product").orderBy(col("number").desc) df.withColumn("max", first("number").over(window)) .withColumn("ratio", round(col("number")/col("max") * 100, 2)) .show(false) +---+-------+------+---+-----+ |id |product|number|max|ratio| +---+-------+------+---+-----+ |3 |chip |2 |2 |100.0| |1 |chip |1 |2 |50.0 | |10 |other |6 |6 |100.0| |6 |other |5 |6 |83.33| |9 |other |1 |6 |16.67| |11 |drink |5 |5 |100.0| |5 |drink |3 |5 |60.0 | |2 |drink |2 |5 |40.0 | |7 |drink |1 |5 |20.0 | |12 |fish |7 |7 |100.0| |8 |fish |4 |7 |57.14| |4 |fish |1 |7 |14.29| +---+-------+------+---+-----+

groupBy와는 다르게 값을 뽑아서 별도의 컬럼에 집어넣기 때문에 최대값, 최소값을 추려서 기존 컬럼과 비교해보는 것이 가능합니다. 최대값 대비 몇 퍼센트를 기록했는지 ratio 컬럼을 추가해봤습니다.

사실, 이런 목적이라면 max 함수를 이용하는 것이 더 빠를 겁니다. 저는 주로 어떤 상품에 라벨이 추가되었을 때, 해당 라벨을 기존 상품 전체에 라벨을 적용하기 위해서 사용하고 있습니다. 한마디로 null을 제거하기 위해서 사용한다는 뜻이네요.

row_number, rank, dense_rank 함수

데이터의 순위를 매기는 세 가지 함수가 있습니다. 각각이 가지는 특성은 orderBy 컬럼에 중복된 값이 있을 때, 명확하게 드러납니다. 위의 예제에서 product 컬럼을 이용하면 세 함수가 잘 구별됩니다.

val window = Window.orderBy("product") df.withColumn("row_number", row_number.over(window)) .withColumn("rank", rank.over(window)) .withColumn("dense_rank", dense_rank.over(window)) .show(false) +---+-------+------+----------+----+----------+ |id |product|number|row_number|rank|dense_rank| +---+-------+------+----------+----+----------+ |1 |chip |1 |1 |1 |1 | |3 |chip |2 |2 |1 |1 | |2 |drink |2 |3 |3 |2 | |5 |drink |3 |4 |3 |2 | |7 |drink |1 |5 |3 |2 | |11 |drink |5 |6 |3 |2 | |4 |fish |1 |7 |7 |3 | |8 |fish |4 |8 |7 |3 | |12 |fish |7 |9 |7 |3 | |6 |other |5 |10 |10 |4 | |9 |other |1 |11 |10 |4 | |10 |other |6 |12 |10 |4 | +---+-------+------+----------+----+----------+
  • row_number 함수는 그냥 순서대로 숫자를 매깁니다.
  • rank 함수는 같은 product 값을 가지는 경우에 같은 값을 매기면서 row_number에 맞춰서 값이 증가합니다.
  • dense_rank는 rank 함수와 같은 기능을 하지만, 중간의 빈 값을 없애서 순서대로 증가하는 값을 제공합니다.

그 밖에도 다양한 함수들이 있지만, 이런 기본 함수들만 알아도 데이터 처리가 쉬워집니다. 저는 UDF를 최소화하기 위해서 이런 기본 함수들을 최대한 활용하는 편이기 때문에 함수의 기본 동작을 파악하는 것이 매우 중요합니다. 이런 예제들이 다른 분들께도 도움이 되길 바랍니다.