Assuming I have the following DataFrame:
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 3|null| 11|
| 2| null| 2| xxx| 22|
| 1| null| 1| yyy|null|
| 2| null| 7|null| 33|
| 1| null| 12|null|null|
| 2| null| 19|null| 77|
| 1| null| 10| s13|null|
| 2| null| 11| a23|null|
+---+--------+---+----+----+
here is the same sample DF with comments, sorted by grp
and ord
:
scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 1| yyy|null|
| 1| null| 3|null| 11| # grp:1 - last value for `col2` (11)
| 1| null| 10| s13|null| # grp:1 - last value for `col1` (s13)
| 1| null| 12|null|null| # grp:1 - last values for `null_col`, `ord`
| 2| null| 2| xxx| 22|
| 2| null| 7|null| 33|
| 2| null| 11| a23|null| # grp:2 - last value for `col1` (a23)
| 2| null| 19|null| 77| # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+
I would like to compress it. I.e. to group it by column "grp"
and for each group, sort rows by the "ord"
column and take the last not null
value in each column (if there is one).
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 12| s13| 11|
| 2| null| 19| a23| 77|
+---+--------+---+----+----+
I've seen the following similar questions:
- How to select the first row of each group?
- How to find first non-null values in groups? (secondary sorting using dataset api)
but my real DataFrame has over 250 columns, so I need a solution where I don't have to specify all the columns explicitly.
I can't wrap my head around it...
MCVE: how to create a sample DataFrame:
- create local file "/tmp/data.txt" and copy and paste there a context of the DataFrame (as it's posted above)
- define function
readSparkOutput()
: parse "/tmp/data.txt" to DataFrame:
val df = readSparkOutput("file:///tmp/data.txt")
UPDATE: I think it should be similar to the following SQL:
SELECT
grp, ord, null_col, col1, col2
FROM (
SELECT
grp,
ord,
FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
FROM table_name) as v
WHERE v.rn = 1;
how can we dynamically generate such a Spark query?
I tried the following simplified approach:
import org.apache.spark.sql.expressions.Window
val win = Window
.partitionBy("grp")
.orderBy($"ord".desc)
val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))
which produces:
scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)
but i couldn't pass it to df.select
:
scala> df.select(cols.head, cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
df.select(cols.head, cols.tail: _*).show
another attempt:
scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
found : String => org.apache.spark.sql.Column
required: org.apache.spark.sql.Column => ?
df.select(cols.map(col): _*).show
So here we are grouping by a and selecting the max of all other columns in the group:
I'd go with same approach like @LeoC, but I believe that there is no need to manipulate column names as string and I would go with a more spark-sql like answer.
I hope this helps.
EDIT: The reason you are not getting the same results is because the reader that you are using isn't correct.
It interprets
null
from the file as a string and not anull
; i.e :Here is my version of
readSparkOutput
:Consider the following approach that applies Window function
last(c, ignoreNulls=true)
ordered by "ord" per "grp" to each of the selected columns; followed by agroupBy("grp")
to fetch thefirst
agg(colFcnMap) result:Note that the final
select
is for stripping the function name (in this casefirst()
) from the aggregated column names.I have worked something out, here is the code and output
Ordering the data on first 2 columns
You can drop the columns you don't need by using .drop("column_name")
Here is your answer (and hopefully my bounty!!!)
//Create window specification
//Use foldLeft with first over window specification over all columns and take distinct
Hope this helps.