Same as addTimePanels with specified value columns filled with previous perioud value.
Same as addTimePanels with specified value columns filled with previous perioud value.
Example Input:
k, ts, v 1,20120201,1.5 1,20120701,7.5 1,20120501,2.45
f.smvGroupBy("k").addTimePanelsWithValueFill("ts")(TimePanel(Month(2012, 1), Month(2012, 6)))("v")
Output:
k, ts, v, smvTime 1,null,1.5,M201201 1,2012-02-01 00:00:00.0,1.5,M201202 1,null,1.5,M201203 1,null,1.5,M201204 1,2012-05-01 00:00:00.0,2.45,M201205 1,null,2.45,M201206
Create an Edd on SmvGroupedData.
Create an Edd on SmvGroupedData. See org.tresamigos.smv.edd.Edd for details.
Example:
scala> df.smvGroupBy("k").edd.summary().eddShow
Print EDD histogram of a group of cols (joint distribution)
Save Edd histogram of a group of cols (joint distribution)
Same as smvCube(String*) but using Column to define the input columns
implement the cube operations on a given DF and a set of columns.
implement the cube operations on a given DF and a set of columns. See http://joshualande.com/cube-rollup-pig-data-science/ for the pig implementation. Rather than using nulls as the pig version, a sentinel value of "*" will be used
Example:
df.smvGroupBy("year").smvCube("zip", "month").agg("year", "zip", "month", sum("v") as "v")
For zip & month columns with input values:
90001, 201401 10001, 201501
The "cubed" values on those 2 columns are:
90001, * 10001, * *, 201401 *, 201501 90001, 201401 10001, 201501 *, *
where * stand for "any"
Also have a version on DataFrame.
Compute the decile for a given column value with a DataFrame group.
Compute the decile for a given column value with a DataFrame group.
Equivelant to smvQuantile with numBins set to 10.
Fill in Null values with "previous" value according to an ordering
Fill in Null values with "previous" value according to an ordering
Example: Input:
K, T, V a, 1, null a, 2, a a, 3, b a, 4, null
df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V")
Output:
K, T, V a, 1, null a, 2, a a, 3, b a, 4, b
This methods only fill forward, which means that at T=1, V is still null as in above example. In case one need all the null filled and allow fill backward at the beginning of the sequence, you can apply this method again with reverse ordering:
df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V"). smvGroupBy("K").smvFillNullWithPrevValue($"T".desc)("V")
Output:
K, T, V a, 1, a a, 2, a a, 3, b a, 4, b
Print EDD histogram (each col's histogram prints separately)
Save Edd histogram
smvMapGroup: apply SmvGDO (GroupedData Operator) to SmvGroupedData
smvMapGroup: apply SmvGDO (GroupedData Operator) to SmvGroupedData
Example:
val res1 = df.smvGroupBy('k).smvMapGroup(gdo1).agg(sum('v) as 'sumv, sum('v2) as 'sumv2) val res2 = df.smvGroupBy('k).smvMapGroup(gdo2).toDF
Compute the percent rank of a sequence of columns within a group in a given DataFrame.
Compute the percent rank of a sequence of columns within a group in a given DataFrame.
Used Spark's percent_rank window function. The precent rank is defined as
R/(N-1), where R is the base 0 rank, and N is the population size. Under
this definition, min value (R=0) has percent rank 0.0, and max value has percent
rank 1.0.
Example:
df.smvGroupBy('g, 'g2).smvPercentRank(["v1", "v2", "v3"])
smvPercentRank takes another parameter ignoreNull. If it is set to true, null values's
percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value,
nulls percent ranks will be zero. Default value of ignoreNull is true.
For each column for which the percent rank is computed (e.g. "v"), an additional column is
added to the output, v_pctrnk
All other columns in the input are untouched and propagated to the output.
smvPivot on SmvGroupedData is similar to smvPivot on DF with the keys being provided in the smvGroupBy method instead of to the method directly.
smvPivot on SmvGroupedData is similar to smvPivot on DF with the keys being provided in the smvGroupBy method instead of to the method directly.
See org.tresamigos.smv.SmvDFHelper#smvPivot for details
For example:
df.smvGroupBy("id").smvPivot(Seq("month", "product"))("count")( "5_14_A", "5_14_B", "6_14_A", "6_14_B")
and the following input:
Input | id | month | product | count | | --- | ----- | ------- | ----- | | 1 | 5/14 | A | 100 | | 1 | 6/14 | B | 200 | | 1 | 5/14 | B | 300 |
will produce the following output:
Output: | id | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | | --- | ------------ | ------------ | ------------ | ------------ | | 1 | 100 | NULL | NULL | NULL | | 1 | NULL | NULL | NULL | 200 | | 1 | NULL | 300 | NULL | NULL |
The sequence of column names whose values will be used as the output pivot column names.
The columns whose value will be copied to the pivoted output columns.
The expected base output column names (without the value column prefix). The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns. if an empty sequence is provided, then the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)
Same as smvPivotSum except that, instead of summing, we coalesce the pivot or grouped columns.
Perform a normal SmvPivot operation followed by a sum on all the output pivot columns.
Perform a normal SmvPivot operation followed by a sum on all the output pivot columns.
For example:
df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", "5_14_B", "6_14_A", "6_14_B")
and the following input:
Input | id | month | product | count | | --- | ----- | ------- | ----- | | 1 | 5/14 | A | 100 | | 1 | 6/14 | B | 200 | | 1 | 5/14 | B | 300 |
will produce the following output:
| id | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | | --- | ------------ | ------------ | ------------ | ------------ | | 1 | 100 | 300 | NULL | 200 |
The sequence of column names whose values will be used as the output pivot column names.
The columns whose value will be copied to the pivoted output columns.
The expected base output column names (without the value column prefix). The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns. if an empty sequence is provided, then the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)
Compute the quantile bin number within a group in a given DataFrame.
Compute the quantile bin number within a group in a given DataFrame.
Estimate quantiles and quantile groups given a data with unknown distribution is quite arbitrary. There are multiple 'definitions' used in different softwares. Please refer https://en.wikipedia.org/wiki/Quantile#Estimating_quantiles_from_a_sample for details.
smvQuantile calculated from Spark's percent_rank. The algorithm is equavalent to the
one labled as R-7, Excel, SciPy-(1,1), Maple-6 in above wikipedia page. Please note it
is slight different from SAS's default algorithm (labled as SAS-5).
Returned quantile bin numbers are 1 based. For example when bin_num=10, returned values are
integers from 1 to 10, inclusively.
Example:
df.smvGroupBy('g, 'g2).smvQuantile(Seq("v"), 100)
For each column for which the quantile is computed (e.g. "v"), an additional column is added to the output, "v_quantile".
All other columns in the input are untouched and propagated to the output.
smvQuantile takes another parameter ignoreNull. If it is set to true, null values's
percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value,
nulls percent ranks will be zero. Default value of ignoreNull is true.
Repartition SmvGroupedData using specified partitioner on the keys.
Repartition SmvGroupedData using specified partitioner on the keys. A HashPartitioner with the specified number of partitions will be used.
This method is used in the cases that the key-space is very large. In the current Spark DF's groupBy method, the entire key-space is actually loaded into executor's memory, which is very dangerous when the key space is big. The regular DF's repartition function doesn't solve this issue since a random repartition will not guaranteed to reduce the key-space on each executor. In that case we need to use this function to linearly reduce the key-space.
Example:
df.smvGroupBy("k1", "k2").smvRePartition(32).aggWithKeys(sum($"v") as "v")
Same as smvRollup(String*) but using Column to define the input columns
implement the rollup operations on a given DF and a set of columns.
implement the rollup operations on a given DF and a set of columns. See http://joshualande.com/cube-rollup-pig-data-science/ for the pig implementation.
Example:
df.smvGroupBy("year").smvRollup("county", "zip").agg("year", "county", "zip", sum("v") as "v")
For county & zip with input values:
10234, 92101 10234, 10019
The "rolluped" values are:
*, * 10234, * 10234, 92101 10234, 10019
Also have a version on DF.
Scale a group of columns to given ranges
Scale a group of columns to given ranges
Example:
df.smvGroupBy("k").smvScale($"v1" -> ((0.0, 100.0)), $"v2" -> ((100.0, 200.0)))()
Note that the range tuple needs to be wrapped inside another pair of parenthesis for the compiler to constructed the nested tuple.
In this example, "v1" column within each k-group, the lowest value is scaled to 0.0 and highest value is scaled to 100.0. The scaled column is called "v1_scaled".
Two optional parameters can be provided by the user:
withZeroPivot: Boolean = false
doDropRange: Boolean = true
When "withZeroPivot" is set, the scaling ensures that the zero point pivot is maintained. For example, if the input range is [-5,15] and the desired output ranges are [-100,100], then instead of mapping -5 -> -100 and 15 -> 100, we would maintain the zero pivot by mapping [-15,15] to [-100,100] so a zero input will map to a zero output. Basically we extend the input range to the abs max of the low/high values.
When "doDropRange" is set, the upper and lower bound of the unscaled value will be dropped from the output. Otherwise, the lower and upper bound of the unscaled value will be names as "v1_min" and "v1_max" as for the example. Please note that is "withZeroPivot" also set, the lower and upper bounds will be the abs max.
Apply aggregation on given keys and specified time panel period Example
Apply aggregation on given keys and specified time panel period Example
val res = df.smvGroupBy("sku").smvTimePanelAgg("time", Day(2014, 1, 1), Day(2017,3,31))( sum("amt").as("amt"), sum("qty").as("qty") )
The input df of above example has a timestamp field "time", and the output aggregates on
"sku" and the "Day" of the timestamp, from the start of 2014-1-1 to 2017-3-31.
The output will have 4 columns in above example: "sku", "smvTime", "amt", "qty". The values of "smvTime" column will look like:
D20140101 D20140102 ...
For PartialTimes, please refer smv.panel package for details
For each group, return the top N records according to an ordering
For each group, return the top N records according to an ordering
Example:
df.smvGroupBy("id").smvTopNRecs(3, $"amt".desc)
Will keep the 3 largest amt records for each id
Add smvTime column according to some TimePanels
Example
Add smvTime column according to some TimePanels
Example
val dfWithTP = df.smvGroupBy("k").smvWithTimePanel(timeColName, Month(2013,1), Month(2014, 2))
If there are no smvTime column in the input DF, the added column will
be named smvTime, otherwise an underscore, "_" will be prepended to the name as
the new column name.
The values of the smvTine column are strings, e.g. "M201205", "Q201301", "D20140527".
ColumnHelper smvTimeToType, smvTineToIndex, smvTineToLabel can be used to
create other columns from smvTime.
Since TimePanel defines a period of time, if for some group in the data
there are missing Months (or Quarters), this function will add records with non-null keys and
smvTime columns with all other columns null-valued.
Example Input
k, time, v 1, 20140101, 1.2 1, 20140301, 4.5 1, 20140325, 10.3
Code
df.smvGroupBy("k").smvWithTimePanel("time", Month(2014,1), Month(2014, 2))
Output
k, time, v, smvTime 1, 20140101, 1.2, M201401 1, null, null, M201402 1, 20140301, 4.5, M201403 1, 20140325, 10.3, M201403
For a DF with TimePanel column already, fill the null values with previous peoriod value
For a DF with TimePanel column already, fill the null values with previous peoriod value
Example: Input:
K, T, V a, 1, null a, 2, a a, 3, b a, 4, null
df.smvGroupBy("K").timePanelValueFill("T")("V")
Output:
K, T, V a, 1, a a, 2, a a, 3, b a, 4, b
- default "true"
- list of column names which need to be null-filled
By default, the leeding nulls of each group in the time sequece are filled with the
earlist non-null value. In the example, V and T=1 was filed as "a", which is the T=2 value.
One can change that behavior by passing in backwardFill = false, which will leave V = null
at T=1.
SMV operations that can be applied to grouped data. For example:
We can not use the standard Spark
GroupedDatabecause the internal DataFrame and keys are not exposed.