Same as dedupByKey(String*) but uses Column to specify the key columns
Remove duplicate records from the DataFrame by arbitrarly selecting the first record
from a set of records with same primary key or key combo.
Remove duplicate records from the DataFrame by arbitrarly selecting the first record
from a set of records with same primary key or key combo.
For example, given the following input DataFrame:
| id | product | Company | | --- | ------- | ------- | | 1 | A | C1 | | 1 | C | C2 | | 2 | B | C3 | | 2 | B | C4 |
and the following call:
df.dedupByKey("id")will yield the following DataFrame:
| id | product | Company | | --- | ------- | ------- | | 1 | A | C1 | | 2 | B | C3 |
while the following call:
df.dedupByKey("id", "product")
will yield the following:
| id | product | Company | | --- | ------- | ------- | | 1 | A | C1 | | 1 | C | C2 | | 2 | B | C3 |
Same as dedupByKeyWithOrder(Column*)(Column*) but use String as key *
Remove duplicated records by selecting the first record regarding a given ordering For example, given the following input DataFrame:
Remove duplicated records by selecting the first record regarding a given ordering For example, given the following input DataFrame:
| id | product | Company | | --- | ------- | ------- | | 1 | A | C1 | | 1 | C | C2 | | 2 | B | C3 | | 2 | B | C4 |
and the following call:
df.dedupByKeyWithOrder($"id")($"product".desc)
will yield the following DataFrame:
| id | product | Company | | --- | ------- | ------- | | 1 | C | C2 | | 2 | B | C3 |
Same as the dedupByKey method, we use RDD groupBy in the implementation of this
method to make sure we can handel large key space.
Create an Edd on DataFrame.
Create an Edd on DataFrame. See org.tresamigos.smv.edd.Edd for details.
Example:
scala> df.edd.summary().eddShow
Display 1st row in transposed view
Display 1st row in transposed view
show the columns with name match the regex, default as ".*"
Use default peek with or without the parenthesis
Display a dataframe row in transposed view.
Display a dataframe row in transposed view.
the n-th row to display, default as 1
show the columns with name match the regex, default as ".*"
Write peek result to a file
Write peek result to a file
local file name to write
the n-th row to display, default as 1
show the columns with names match the regex, default as ".*"
Apply a posfix to all column names in the given DataFrame.
Apply a posfix to all column names in the given DataFrame.
For Example:
df.posfixFieldNames("_x")The above will add "_x" to the end of every column name in the DataFrame
Print column names with description e.g.
Print column names with description e.g.
scala> val res = df.smvDesc("a" -> "column a is ....") scala> res.printDesc
persist the DataFrame as a CSV file (along with a schema file).
persist the DataFrame as a CSV file (along with a schema file).
df.saveAsCsvWithSchema("/tmp/output/test.csv")direct path where file is persisted. Can also be a relative path. The configured app data/output dir are not considered.
CSV attributes used to format output file. Defaults to CsvAttributes.defaultCsv
Provide the companion schema (usually used when we need to persist some schema meta data along with the standard schema)
DataFrame projection based on labels
DataFrame projection based on labels
Example:
val res = df.selectByLabel("yellow")
Same as selectPlus but the new columns are prepended to result.
Same as selectPlus but the new columns are prepended to result.
df.selectPlusPrefix($"price" * $"count" as "amt")
amt will be the first column in the output.
Add, or replace, columns to the data frame.
Add, or replace, columns to the data frame.
Each column expression in the argument list is added to the data frame. If the column is an alias (NamedExpression), any existing column by the same name as the alias will be replaced by the new column data.
Example 1:
df.selectWithReplace($"age" + 1 as "age")
will create a new data frame with the same schema and with all values in the "age" column incremented by 1
Example 2:
df.selectWithReplace($"age" + 1)
will create a new data frame with an additional column (named automatically by spark sql) containing the incremented values in the "age" column, unless there is already another column that happens to have the same spark-generated name (in which case that column will be replaced with the new expression)
Print Edd histogram with bins
Save Edd histogram with bins
Print EDD histogram of a group of cols (joint distribution)
Save Edd histogram of a group of cols (joint distribution)
Print Edd histogram on count of records for a group of given keys
Print Edd histogram on count of records for a group of given keys
Example Input
id, v 1, 1.0 1, 1.5 2, 0.3
df.smvCountHist(Seq("id"))
Output
N_id 1 1 33.3% 2 2 66.6%
Save Edd histogram on count of records for a group of given keys
Similar to the cube Spark DF method, but using "*" instead of null to represent "Any"
Similar to the cube Spark DF method, but using "*" instead of null to represent "Any"
Example:
df.smvCube("zip", "month").agg("zip", "month", sum("v") as "v")
Adds column descriptions
Adds column descriptions
Example:
val res = df.smvDesc( "name" -> "This is customer's name", "sex" -> "This is customer's self-identified sex" )
Adds column descriptions with a companion 2-column desciptionDF, which has variable names as column 1 and corresponding variable descriptions as column 2
Adds column descriptions with a companion 2-column desciptionDF, which has variable names as column 1 and corresponding variable descriptions as column 2
Example:
val res = df.smvDescFromDF(desciptionDF)
Find column combinations which uniquely identify a row from the data
Find column combinations which uniquely identify a row from the data
number of rows the PK discovery algorithm will run on.
if true printout debug info
(list_of_keys, unique-count) Please note the algorithm only look for a set of keys which uniquely identify the row, there could be more key combinations which can also be the primary key.
Add a DoubleBinHistogram column to a DataFrame using single key.
Add a DoubleBinHistogram column to a DataFrame using single key. Perform a DoubleBinHistogram on the column_to_bin using 1000 bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(key1, col)
Create a new column named the same as passed column name to bin post fixed with "_bin"
Add a DoubleBinHistogram column to a DataFrame using single key.
Add a DoubleBinHistogram column to a DataFrame using single key. Perform a DoubleBinHistogram on the column_to_bin using 1000 bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(key1, col, "_xyz")Create a new column named the same as passed column name to bin post fixed with post_fix.
Add a DoubleBinHistogram column to a DataFrame using single key.
Add a DoubleBinHistogram column to a DataFrame using single key. Perform a DoubleBinHistogram on the column_to_bin using the passed number of bins num_of_bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(key, col, 100)Create a new column named the same as passed column name to bin post fixed with "_bin"
Add a DoubleBinHistogram column to a DataFrame using single key.
Add a DoubleBinHistogram column to a DataFrame using single key. Perform a DoubleBinHistogram on the column_to_bin using the passed number of bins num_of_bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(key, col, 100, "_xyz")
Create a new column named the same as passed column name to bin post fixed with post_fix.
Add a DoubleBinHistogram column to a DataFrame using multiple keys.
Add a DoubleBinHistogram column to a DataFrame using multiple keys. Perform a DoubleBinHistogram on the column_to_bin using 1000 bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(Seq("key1", "key2"), col)
Create a new column named the same as passed column name to bin post fixed with "_bin"
Add a DoubleBinHistogram column to a DataFrame using multiple keys.
Add a DoubleBinHistogram column to a DataFrame using multiple keys. Perform a DoubleBinHistogram on the column_to_bin using 1000 bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(Seq("key1", "key2"), col)
Create a new column named the same as passed column name to bin post fixed with post_fix.
Add a DoubleBinHistogram column to a DataFrame using multiple keys.
Add a DoubleBinHistogram column to a DataFrame using multiple keys. Perform a DoubleBinHistogram on the column_to_bin using the passed number of bins num_of_bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(Seq("key1", "key2"), col, 100)
Create a new column named the same as passed column name to bin post fixed with "_bin"
Add a DoubleBinHistogram column to a DataFrame using multiple keys.
Add a DoubleBinHistogram column to a DataFrame using multiple keys. Perform a DoubleBinHistogram on the column_to_bin using the passed number of bins num_of_bins The column_to_bin is expected to be of type double
df.smvDoubleBinHistogram(Seq("key1", "key2"), col, 100)
Create a new column named the same as passed column name to bin post fixed with post_fix.
Add a set of DoubleBinHistogram columns to a DataFrame.
Add a set of DoubleBinHistogram columns to a DataFrame. Perform a DoubleBinHistogram on all the columns_to_bin. The num_of_bins is the corresponding number of bin for each column in columns_to_bin. The default number of bin is 1000, if the size of num_of_bins is less then the size of columns_to_bin, only the extra columns that does not have the corresponding number of bin will be default to 1000 The columns_to_bin are expected to be of type double
df.smvDoubleBinHistogram(Seq("key1", "key2"), Seq(col1, col2), Seq(100, 200))
Create a new columns named the same as the columns to bin post fixed with post_fix. The post_fix is defaulted to "_bin"
Dump the schema and data of given df to screen for debugging purposes.
Dump the schema and data of given df to screen for debugging purposes.
Similar to show() method of DF from Spark 1.3, although the format is slightly different.
This function's format is more convenient for us and hence has remaint.
Print EDD summary
Print EDD summary
df.smvEdd()
Perform EDD summary on all columns
df.smvEdd("a", "b")
Perform EDD summary on specified columns
Compare 2 DFs by comparing their Edd Summary result
Compare 2 DFs by comparing their Edd Summary result
Example
df.smvEddCompare(df2)
df.smvEddCompare(df2, ignoreColName = true)Print out comparing result
Save Edd summary
Expand structure type column to a group of columns Example input df:
Expand structure type column to a group of columns Example input df:
[id:string, address: struct<state:string, zip:string, street:string>]
output df:
[id:string, state:string, zip:string, street:string]
Example code:
df.smvExpandStruct("address")
Export DF to local file system.
Export DF to local file system. Path is relative to the app runing dir
relative path to the app runing dir on local file system (instead of HDFS)
number of records to be exported. Defualt is to export every records **NOTE** since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user's responsibility to make sure that the DF is small enough to fit into local file system.
Print EDD histogram with frequency sorting
Save Edd histogram with frequency sorting
Return the sequence of field name - description pairs
Return column description of a specified column (by name string)
Returns all the labels on a specified column; throws if the column is missing
Same as smvGroupBy(Column*) but uses String to specify the columns.
Same as smvGroupBy(Column*) but uses String to specify the columns.
Note: This is going away shortly and user will be able to use standard Spark groupBy method directly.
Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.
Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.
See org.tresamigos.smv.SmvGroupedDataFunc for list of functions that can be applied to the grouped data.
Note: This is going away shortly and user will be able to use standard Spark groupBy method directly.
Example:
df.smvGroup($"k").
Sample the df according to the hash of a column.
Sample the df according to the hash of a column. MurmurHash3 algorithm is used for generating the hash
df.smvHashSample($"key", rate=0.1, seed=123)
column to sample on.
sample rate in range (0, 1] with a default of 0.01 (1%)
random generator integer seed with a default of 23.
Print EDD histogram (each col's histogram prints separately)
Save Edd histogram
The Spark DataFrame join operation does not handle duplicate key names.
The Spark DataFrame join operation does not handle duplicate key names.
If both left and right side of the join operation contain the same key,
the result DataFrame is unusable.
The smvJoinByKey method will allow the user to join two DataFrames using the same join key.
Post join, only the left side keys will remain. In case of outer-join, the
coalesce(leftkey, rightkey) will replace the left key to be kept.
df1.smvJoinByKey(df2, Seq("k"), SmvJoinType.Inner)
Note the use of the SmvJoinType.Inner const instead of the naked "inner" string.
If, in addition to the duplicate keys, both df1 and df2 have column with name "v",
both will be kept in the result, but the df2 version will be prefix with "_" if no
postfix parameter is specified, otherwise df2 version with be postfixed with
the specified postfix.
Create multiple DF join builder: SmvMultiJoin.
Create multiple DF join builder: SmvMultiJoin.
Example:
df.joinMultipleByKey(Seq("k1", "k2"), Inner). joinWith(df2, "_df2"). joinWith(df3, "_df3", LeftOuter). doJoin()
In above example, df will inner join with df2 on k1 and k2, then
left outer join with df3 with the same keys.
In the cases that there are columns with the same name, df2's column will be
renamed with postfix "_df2", and, df3's column will be renamed with postfix
"_df3".
an SmvMultiJoin object which support joinWith and doJoin method
Adds labels to the specified columns.
Adds labels to the specified columns.
Each column could have multiple labels.
Example:
val res = df.smvLabel("name", "sex")("red", "yellow").smvLabel("sex")("green")
In this example, assume df has no labels, the res' "name" column will have "red" and "yellow" labels, and "sex" column will have "red", "yellow", and "green" labels.
For a set of DFs, which share the same key column, check the overlap across them.
For a set of DFs, which share the same key column, check the overlap across them.
df1.smvOverlapCheck("key")(df2, df3, df4)The output is another DF with 2 columns:
key, flag
where flag is a bit string, e.g. 0110. Each bit represent whether the original DF has this key.
It can be used with EDD to summarize on the flag:
df1.smvOverlapCheck("key")(df2, df3).smvHist("flag")
smvPivot adds the pivoted columns without additional aggregation.
smvPivot adds the pivoted columns without additional aggregation. In other words N records in, N records out
Please note that no keyCols need to be provided, since all original columns will be kept
Example:
df.smvPivot(Seq("month", "product"))("count")("5_14_A", "5_14_B", "6_14_A", "6_14_B")
Input | id | month | product | count | | --- | ----- | ------- | ----- | | 1 | 5/14 | A | 100 | | 1 | 6/14 | B | 200 | | 1 | 5/14 | B | 300 | Output | id | month | product | count | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | | --- | ----- | ------- | ----- | ------------ | ------------ | ------------ | ------------ | | 1 | 5/14 | A | 100 | 100 | NULL | NULL | NULL | | 1 | 6/14 | B | 200 | NULL | NULL | NULL | 200 | | 1 | 5/14 | B | 300 | NULL | 300 | NULL | NULL |
The sequence of column names whose values will be used as the output pivot column names.
The columns whose value will be copied to the pivoted output columns.
The expected base output column names (without the value column prefix). The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns.
Apply a prefix to all column names in the given DataFrame.
Apply a prefix to all column names in the given DataFrame.
For Example:
df.smvPrefixFieldNames("x_")The above will add "x_" to the beginning of every column name in the DataFrame
Add a rank/sequence column to a DataFrame.
Add a rank/sequence column to a DataFrame.
It uses zipWithIndex method of RDD to add a sequence number to records in a DF.
It ranks records sequentially by partition.
Please refer to Spark's document for the detail behavior of zipWithIndex.
Note: May force an action on the DataFrame if the DataFrame has more than one partition.
df.smvRank("seqId", 100L)
Create a new column named "seqId" and start from 100.
Remove descriptions from specified columns (by name string) If parameter is empty,
Remove descriptions from specified columns (by name string) If parameter is empty,
df.smvRemoveDesc()
, remove all descriptions
df.smvRemoveDesc() }}}
Removes the specified labels from the specified columns.
Removes the specified labels from the specified columns.
Example:
df.smvRemoveLabel("sex")("yellow", "green")
If no columns are specified, the specified labels are removed from all applicable columns in the data frame.
If no labels are specified, all labels are removed from the specified columns.
If neither columns nor labels are specified, i.e. both parameter lists are empty, then all labels are removed from all columns in the data frame, essentially clearing the label meta data.
Rename one or more fields of a DataFrame.
Rename one or more fields of a DataFrame.
The old/new names are given as string pairs.
df.smvRenameField( "a" -> "aa", "b" -> "bb" )
The method preserves any pre-existing metadata associated with renamed columns, whereas the method withColumnRenamed in Spark, as of 1.5.2, would drop them.
Similar to the rollup Spark DF method, but using "*" instead of null to represent "Any"
Similar to the rollup Spark DF method, but using "*" instead of null to represent "Any"
Example:
df.smvRollup("county", "zip").agg("county", "zip", sum("v") as "v")
Remove one or more columns from current DataFrame.
Remove one or more columns from current DataFrame.
Column names are specified as Column
df.smvSelectMinus($"col1", df("col2"))
Remove one or more columns from current DataFrame.
Remove one or more columns from current DataFrame. Column names are specified as string.
df.smvSelectMinus("col1", "col2")
selects all the current columns in current DataFrame plus the supplied expressions.
selects all the current columns in current DataFrame plus the supplied expressions.
The new columns are added to the end of the current column list.
df.smvSelectPlus($"price" * $"count" as "amt")
Join that leverages broadcast (map-side) join of skewed (high-frequency) key values
Join that leverages broadcast (map-side) join of skewed (high-frequency) key values
Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results.
Example:
df.smvSkewJoinByKey(df2, SmvJoinType.Inner, Seq("9999999"), "cid")
will broadcast join the rows of df1 and df2 where col("cid") == "9999999" and join the remaining rows of df1 and df2 without broadcast join.
For return the global top N records according to an ordering
For return the global top N records according to an ordering
Example:
df.smvTopNRecs(3, $"amt".desc)
Will keep the 3 largest amt records
smvUnion unions DataFrames with different number of columns by column name & schema.
smvUnion unions DataFrames with different number of columns by column name & schema. spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns.
Almost the opposite of the pivot operation.
Almost the opposite of the pivot operation. Given a set of records with value columns, turns the value columns into value rows. For example, Given the following input:
| id | X | Y | Z | | -- | - | - | - | | 1 | A | B | C | | 2 | D | E | F | | 3 | G | H | I |
and the following command:
df.smvUnpivot("X", "Y", "Z")
will result in the following output:
| id | column | value | | -- | ------ | ----- | | 1 | X | A | | 1 | Y | B | | 1 | Z | C | | ... ... ... | | 3 | Y | H | | 3 | Z | I |
Warning: This only works for String columns for now (due to limitation of Explode method)
The reverse of smvPivot.
The reverse of smvPivot. Specifically, given the following table
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | Id | A_1 | A_2 | ... | A_11 | B_1 | ... | B_11 | ... | Z_1 | ... | Z_11 | +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | 1 | 1_a_1 | 1_a_2 | ... |1_a_11 | 1_b_1 | ... |1_b_11 | ... | 1_z_1 | ... |1_z_11 | +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | 2 | 2_a_1 | 2_a_2 | ... |2_a_11 | 2_b_1 | ... |2_b_11 | ... | 2_z_1 | ... |2_z_11 | +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
and a function that would map "A_1" to ("A", "1"), unpivoting all
columns except 'Id' (in other words, valueCols === columns - Id)
would transform the table into the following
+-----+-----+------+------+-----+-------+ | Id |Index| A | B | ... | Z | +-----+-----+------+------+-----+-------+ | 1 | 1 |1_a_1 |1_b_1 | ... |1_z_1 | +-----+-----+------+------+-----+-------+ | 1 | 2 |1_a_2 |1_b_2 | ... |1_z_2 | +-----+-----+------+------+-----+-------+ | ... | ... | ... | ... | ... | ... | +-----+-----+------+------+-----+-------+ | 1 | 11 |1_a_11|1_b_11| ... |1_z_11 | +-----+-----+------+------+-----+-------+ | 2 | 1 |2_a_1 |2_b_1 | ... | 2_z_1 | +-----+-----+------+------+-----+-------+ | 2 | 2 |2_a_2 |2_b_2 | ... | 2_z_2 | +-----+-----+------+------+-----+-------+ | ... | ... | ... | ... | ... | ... | +-----+-----+------+------+-----+-------+ | 2 | 11 |2_a_11|2_b_11| ... |2_z_11 | +-----+-----+------+------+-----+-------+
See Issue 243
names of the columns to transpose
the function that takes a column name and returns a tuple2, the first part is the transposed column name, the second part is the value that goes into the Index column.
the name of the index column, if present, if None, no index column would be added
a variation of the smvUnpivot function that takes a regex instead of a function.
a variation of the smvUnpivot function that takes a regex instead of a function. this is due to the following reasons:
The function name is different to keep consistency between Python and Scala
Just an alias to smvGroupBy to make client code more readable
Returns all column names in the data frame that contain all the specified labels.
Returns all column names in the data frame that contain all the specified labels. If the labels argument is an empty sequence, returns all unlabeled columns in the data frame.
Will throw if there are no columns that satisfy the condition.
Example:
val cols = df.smvWithLabel("A", "B")
Get top N most frequent values in Column c
Get top N most frequent values in Column c
Example:
df.topNValsByFreq(1, col("cid"))
will return the single most frequent value in the cid column
Apply user defined chunk mapping on data grouped by a set of keys
Apply user defined chunk mapping on data grouped by a set of keys
val addFirst = (l: List[Seq[Any]]) => { val firstv = l.head.head l.map{r => r :+ firstv} } val addFirstFunc = SmvChunkUDF( Seq('time, 'call_length), SmvSchema.fromString("time: TimeStamp; call_length: Double; first_call_time: TimeStamp").toStructType, addFirst) df.chunkBy('account, 'cycleId)(addFirstFunc)
TODO: Current version will not keep teh key columns. It's SmvChunkUDF's responsibility to make sure key column is carried. This behavior should be changed to automatically carry keys, as chanegs made on Spark's groupBy.agg
(Since version 1.5) will rename and refine interface
Same as chunkBy, but add the new columns to existing columns
Same as chunkBy, but add the new columns to existing columns
(Since version 1.5) will rename and refine interface