smv package¶
Submodules¶
smv.csv_attributes module¶
SMV User Run Configuration Parameters
This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.
smv.dqm module¶
SMV DataSet Framework interface
This module defines the abstract classes which formed the SmvDataSet Framework for clients’ projects
-
smv.dqm.
DQMFix
(condition, fix, name=None, taskPolicy=None)[source]¶ DQMFix will fix a column with a default value
Example
# If “age” greater than 100, make it 100 val f = DQMFix($”age” > 100, lit(100) as “age”, “age_cap100”, FailNone) DQMFix(col(‘age’) > 100, lit(100).alias(‘age’), ‘age_cap100’, FailNone)
Parameters: - condition (Column) – boolean condition that determines when the fix should occur on the records of a DF
- fix (Column) – the fix to use when replacing a value that does not pass the condition
- name (String) – optional parameter for naming the DQMFix. if not specified, defaults to the condition text
- taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns: a DQMFix object
Return type: (DQMFix)
-
smv.dqm.
DQMRule
(rule, name=None, taskPolicy=None)[source]¶ DQMRule defines a requirement on the records of a DF
Example
# Require the sum of “a” and “b” columns less than 100 DQMRule(col(‘a’) + col(‘b’) < 100.0, ‘a_b_sum_lt100’, FailPercent(0.01))
Parameters: - rule (Column) – boolean condition that defines the requirement on the records of a DF
- name (string) – optional parameter for naming the DQMRule. if not specified, defaults to the rule text
- taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns: a DQMRule object
Return type: (DQMRule)
-
smv.dqm.
FailAny
()[source]¶ Any rule fail or fix with FailAny will cause the entire DF to fail
Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailCount
(threshold)[source]¶ Tasks with FailCount(n) will fail the DF if the task is triggered >= n times
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailNone
()[source]¶ Tasks with FailNone will not trigger any DF level policy
Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailParserCountPolicy
(threshold)[source]¶ If the total time of parser fails >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailPercent
(threshold)[source]¶ Tasks with FailPercent(r) will fail the DF if the task is triggered >= r percent of the total number of records in the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailTotalFixCountPolicy
(threshold)[source]¶ For all the fixes in a DQM, if the total number of times they are triggered is >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalFixPercentPolicy
(threshold)[source]¶ For all the fixes in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalRuleCountPolicy
(threshold)[source]¶ For all the rules in a DQM, if the total number of times they are triggered is >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalRulePercentPolicy
(threshold)[source]¶ For all the rules in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Return type: (DQMPolicy)
smv.error module¶
Errors thrown by SMV
smv.functions module¶
-
smv.functions.
diceSorensen
(c1, c2)[source]¶ 2-gram UDF with formula (2 * number of overlaped gramCnt)/(s1.gramCnt + s2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 2-gram
Return type: (Column)
-
smv.functions.
jaroWinkler
(c1, c2)[source]¶ Jaro-Winkler edit distance metric UDF
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: distances
Return type: (Column)
-
smv.functions.
nGram2
(c1, c2)[source]¶ 2-gram UDF with formula (number of overlaped gramCnt)/max(c1.gramCnt, c2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 2-gram
Return type: (Column)
-
smv.functions.
nGram3
(c1, c2)[source]¶ 3-gram UDF with formula (number of overlaped gramCnt)/max(s1.gramCnt, s2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 3-gram
Return type: (Column)
-
smv.functions.
normlevenshtein
(c1, c2)[source]¶ Levenshtein edit distance metric UDF
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: distances
Return type: (Column)
-
smv.functions.
smvArrayCat
(sep, col)[source]¶ For an array typed column, concat the elements to a string with the given separater.
Parameters: - sep – a Python string to separate the fields
- col – a Column with ArrayType
Returns: a Column in StringType with array elements concatenated
Return type: (col)
-
smv.functions.
smvCollectSet
(col, datatype)[source]¶ An aggregate function, which will collect all the values of the given column and create a set as an array typed column. Since Spark 1.6, a spark function collect_set was introduced, so as migrate to Spark 1.6 and later, this smvCollectSet will be depricated.
Parameters: - col (Column) – column to be aggregated on
- datatype (DataType) – datatype of the input column
-
smv.functions.
smvCreateLookUp
(m, default, outputType)[source]¶ Return a Python UDF which will perform a dictionary lookup on a column
Parameters: - m (dictionary) – a Python dictionary to be applied
- default (any) – default value if dictionary lookup failed
- outputType (DataType) – output value’s data type
Returns: an udf which can apply to a column and apply the lookup
Return type: (udf)
-
smv.functions.
smvFirst
(c, nonNull=False)[source]¶ Variation of Spark “first” which also returns null values
Since Spark “first” will return the first non-null value, we have to create our version smvFirst which to retune the real first value, even if it’s null. Alternatively can return the first non-null value.
Parameters: - (Column (c) – column to extract first value from
- nonNull (bool) – If false, return first value even if null. If true, return first non-null value. Defaults to false.
Returns: first value
Return type: (object)
-
smv.functions.
smvHashKey
(head, *others)[source]¶ Create MD5 on concatenated columns. Return “Prefix” + MD5 Hex string(size 32 string) as the unique key
MD5’s collisions rate on real data records could be ignored based on the following discussion.
https://marc-stevens.nl/research/md5-1block-collision/ The shortest messages have the same MD5 are 512-bit (64-byte) messages as below
4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa200a8284bf36e8e4b55b35f427593d849676da0d1555d8360fb5f07fea2 and the (different by two bits) 4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa202a8284bf36e8e4b55b35f427593d849676da0d1d55d8360fb5f07fea2 both have MD5 hash 008ee33a9d58b51cfeb425b0959121c9
There are other those pairs, but all carefully constructed. Theoretically the random collisions will happen on data size approaching 2^64 (since MD5 has 128-bit), which is much larger than the number of records we deal with (a billion is about 2^30) There for using MD5 to hash primary key columns is good enough for creating an unique key
This function can take 2 forms: - smvHashKey(prefix, col1, col2, ...) - smvHashKey(col1, col2, ...)
Parameters: - prefix (String) – return string’s prefix
- col. (Column) – columns to be part of hash
Returns: a StringType column as Prefix + MD5 Hex string
Return type: (col)
-
smv.functions.
smvStrCat
(head, *others)[source]¶ Concatenate multiple columns to a single string. Similar to concat and concat_ws functions in Spark but behaves differently when some columns are nulls. The Spark version will return null if any of the inputs is null. smvStrCat will return null if all of the inputs are nulls, otherwise it will coalesce null cols to blank.
This function can take 2 forms: - smvStrCat(sep, col1, col2, ...) - smvStrCat(col1, col2, ...)
Parameters: - sep (String) – separater for the concats
- col. (Column) – columns to be concatenated
Returns: a StringType column
Return type: (col)
smv.graph module¶
Provides dependency graphing of SMV modules.
smv.helpers module¶
SMV DataFrame Helpers and Column Helpers
This module provides the helper functions on DataFrame objects and Column objects
-
class
smv.helpers.
ColumnHelper
(col)[source]¶ Bases:
object
-
smvDay70
()[source]¶ Convert a Timestamp to the number of days from 1970-01-01
Example
>>> df.select(col("dob")).smvDay70
Returns: number of days from 1970-01-01 (start from 0) Return type: (integer)
-
smvDayOfMonth
()[source]¶ Extract day of month component from a timestamp
Example
>>> df.select(col("dob")).smvDayOfMonth()
Returns: day of month component as integer (range 1-31), or null if input column is null Return type: (integer)
-
smvDayOfWeek
()[source]¶ Extract day of week component from a timestamp
Example
>>> df.select(col("dob")).smvDayOfWeek()
Returns: day of week component as integer (range 1-7, 1 being Sunday), or null if input column is null Return type: (integer)
-
smvHour
()[source]¶ Extract hour component from a timestamp
Example
>>> df.select(col("dob")).smvHour()
Returns: hour component as integer, or null if input column is null Return type: (integer)
-
smvIsAllIn
(*vals)[source]¶ Returns true if ALL of the Array columns’ elements are in the given parameter sequence
Parameters: vals (*any) – vals must be of the same type as the Array content Example
input DF:
k v a b c d >>> df.select( array(col("k"), col("v")) ).smvIsAllIn("a", "b", "c").alias("isFound"))
output DF:
isFound true false false Returns: result of smvIsAllIn Return type: (DataFrame)
-
smvIsAnyIn
(*vals)[source]¶ Returns true if ANY one of the Array columns’ elements are in the given parameter sequence
Parameters: vals (*any) – vals must be of the same type as the Array content Example
input DF:
k v a b c d >>> df.select( array(col("k"), col("v")) ).smvIsAnyIn("a", "b", "c").alias("isFound"))
output DF:
isFound true true false Returns: result of smvIsAnyIn Return type: (DataFrame)
-
smvMonth
()[source]¶ Extract month component from a timestamp
Example
>>> df.select(col("dob")).smvMonth()
Returns: month component as integer, or null if input column is null Return type: (integer)
-
smvMonth70
()[source]¶ Convert a Timestamp to the number of months from 1970-01-01
Example
>>> df.select(col("dob")).smvMonth70
Returns: number of months from 1970-01-01 (start from 0) Return type: (integer)
-
smvPlusDays
(delta)[source]¶ Add N days to Timestamp column
Parameters: delta (integer) – the number of days to add Example
>>> df.select(col("dob")).smvPlusDays(3)
Returns: the incremented Timestamp, or null if input is null Return type: (Timestamp)
-
smvPlusMonths
(delta)[source]¶ Add N months to Timestamp column
Parameters: delta (integer) – the number of months to add Note
The calculation will do its best to only change the month field retaining the same day of month. However, in certain circumstances, it may be necessary to alter smaller fields. For example, 2007-03-31 plus one month cannot result in 2007-04-31, so the day of month is adjusted to 2007-04-30.
Example
>>> df.select(col("dob")).smvPlusMonths(3)
Returns: the incremented Timestamp, or null if input is null Return type: (Timestamp)
-
smvPlusWeeks
(delta)[source]¶ Add N weeks to Timestamp column
Parameters: delta (integer) – the number of weeks to add Example
>>> df.select(col("dob")).smvPlusWeeks(3)
Returns: the incremented Timestamp, or null if input is null Return type: (Timestamp)
-
smvPlusYears
(delta)[source]¶ Add N years to Timestamp column
Parameters: delta (integer) – the number of years to add Example
>>> df.select(col("dob")).smvPlusYears(3)
Returns: the incremented Timestamp, or null if input is null Return type: (Timestamp)
-
smvQuarter
()[source]¶ Extract quarter component from a timestamp
Example
>>> df.select(col("dob")).smvQuarter()
Returns: quarter component as integer (1-based), or null if input column is null Return type: (integer)
-
-
class
smv.helpers.
DataFrameHelper
(df)[source]¶ Bases:
object
-
peek
(pos=1, colRegex='.*')[source]¶ Display a DataFrame row in transposed view
Parameters: - pos (integer) – the n-th row to display, default as 1
- colRegex (string) – show the columns with name matching the regex, default as ”.*”
Returns: (None)
-
peekSave
(path, pos=1, colRegex='.*')[source]¶ Write peek result to a file
Parameters: - path (string) – local file name to Write
- pos (integer) – the n-th row to display, default as 1
- colRegex (string) – show the columns with name matching the regex, default as ”.*”
Returns: (None)
-
smvBinHist
(*colWithBin)[source]¶ Print distributions on numerical columns with applying the specified bin size
Parameters: colWithBin (*tuple) – each tuple must be of size 2, where the first element is the column name, and the second is the bin size for that column Example
>>> df.smvBinHist(("col_a", 1))
Returns: (None)
-
smvConcatHist
(*cols)[source]¶ Display EDD histogram of a group of columns (joint distribution)
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvConcatHist("a", "b")
Returns: (None)
-
smvCountHist
(keys, binSize)[source]¶ Print the distribution of the value frequency on specific columns
Parameters: - keys (list(string)) – the column names on which the EDD histogram is performed
- binSize (integer) – the bin size for the histogram
Example
>>> df.smvCountHist(["k"], 1)
Returns: (None)
-
smvDedupByKey
(*keys)[source]¶ Remove duplicate records from the DataFrame by arbitrarly selecting the first record from a set of records with same primary key or key combo.
Parameters: keys (*string or *Column) – the column names or Columns on which to apply dedup Example
input DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 2 B C4 >>> df.dedupByKey("id")
output DataFrame:
id product Company 1 A C1 2 B C3 >>> df.dedupByKey("id", "product")
output DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 Returns: a DataFrame without duplicates for the specified keys Return type: (DataFrame)
-
smvDedupByKeyWithOrder
(*keys)[source]¶ Remove duplicated records by selecting the first record relative to a given ordering
The order is specified in another set of parentheses, as follows:
>>> def smvDedupByKeyWithOrder(self, *keys)(*orderCols)
Note
Same as the dedupByKey method, we use RDD groupBy in the implementation of this method to make sure we can handle large key space.
Parameters: keys (*string or *Column) – the column names or Columns on which to apply dedup Example
input DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 2 B C4 >>> df.dedupByKeyWithOrder(col("id"))(col("product").desc())
output DataFrame:
id product Company 1 C C2 2 B C3 Returns: a DataFrame without duplicates for the specified keys / order Return type: (DataFrame)
-
smvDesc
(*colDescs)[source]¶ Adds column descriptions
Parameters: colDescs (*tuple) – tuples of strings where the first is the column name, and the second is the description to add Example
>>> df.smvDesc(("a", "description of col a"), ("b", "description of col b"))
Returns: the DataFrame with column descriptions added Return type: (DataFrame)
-
smvDescFromDF
(descDF)[source]¶ Adds column descriptions
Parameters: descDF (DataFrame) – a companion 2-column desciptionDF that has variable names as column 1 and corresponding variable descriptions as column 2 Example
>>> df.smvDescFromDF(desciptionDF)
Returns: the DataFrame with column descriptions added Return type: (DataFrame)
-
smvDiscoverPK
(n=10000)[source]¶ Find a column combination which uniquely identifies a row from the data
Note
The algorithm only looks for a set of keys which uniquely identifies the row. There could be more key combinations which can also be the primary key.
Parameters: n (integer) – number of rows the PK discovery algorithm will run on, defaults to 10000 Example
>>> df.smvDiscoverPK(5000)
Returns: (None)
-
smvDumpDF
()[source]¶ Dump the schema and data of given df to screen for debugging purposes
Similar to show() method of DF from Spark 1.3, although the format is slightly different. This function’s format is more convenient for us and hence has remained.
Example
>>> df.smvDumpDF()
Returns: (None)
-
smvEdd
(*cols)[source]¶ Display EDD summary
Parameters: cols (*string) – column names on which to perform EDD summary Example
>>> df.smvEdd("a", "b")
Returns: (None)
-
smvEddCompare
(df2, ignoreColName)[source]¶ Compare 2 DFs by comparing their Edd Summary result
The result of the comparison is printed out.
Parameters: - df2 (DataFrame) – the DataFrame to compare with
- ignoreColName (boolean) – specifies whether to ignore column names, default is false
Example
>>> df.smvEddCompare(df2) >>> df.smvEddCompare(df2, true)
Returns: (None)
-
smvExpandStruct
(*cols)[source]¶ Expand structure type column to a group of columns
Parameters: cols (*string) – column names to expand Example
- input DF:
- [id: string, address: struct<state:string, zip:string, street:string>]
>>> df.smvExpandStruct("address")
- output DF:
- [id: string, state: string, zip: string, street: string]
Returns: DF with expanded columns Return type: (DataFrame)
-
smvExportCsv
(path, n=None)[source]¶ Export DataFrame to local file system
Parameters: - path (string) – relative path to the app running directory on local file system (instead of HDFS)
- n (integer) – optional. number of records to export. default is all records
Note
Since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user’s responsibility to make sure that the DF is small enought to fit into memory.
Example
>>> df.smvExportCsv("./target/python-test-export-csv.csv")
Returns: (None)
-
smvFreqHist
(*cols)[source]¶ Print EDD histogram with frequency sorting
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvFreqHist("a")
Returns: (None)
-
smvGetDesc
(colName=None)[source]¶ Returns column description(s)
Parameters: colName (string) – optional column name for which to get the description. Example
>>> df.smvGetDesc("col_a") >>> df.smvGetDesc()
Returns: description string of colName, if specified (list(tuple)): a list of (colName, description) pairs for all columns Return type: (string)
-
smvGroupBy
(*cols)[source]¶ Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.
See [[org.tresamigos.smv.SmvGroupedDataFunc]] for list of functions that can be applied to the grouped data.
Parameters: cols (*string or *Column) – column names or Column objects to group on Note
This is going away shortly and user will be able to use standard Spark groupBy method directly.
Example
>>> df.smvGroupBy(col("k")) >>> df.smvGroupBy("k")
Returns: grouped data object Return type: (SmvGroupedData)
-
smvHashSample
(key, rate=0.01, seed=23)[source]¶ Sample the df according to the hash of a column
MurmurHash3 algorithm is used for generating the hash
Parameters: - key (string or Column) – column name or Column to sample on
- rate (double) – sample rate in range (0, 1] with a default of 0.01 (1%)
- seed (int) – random generator integer seed with a default of 23
Example
>>> df.smvHashSample(col("key"), rate=0.1, seed=123)
Returns: sampled DF Return type: (DataFrame)
-
smvHist
(*cols)[source]¶ Display EDD histogram
Each column’s histogram prints separately
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvHist("a")
Returns: (None)
-
smvJoinByKey
(other, keys, joinType)[source]¶ joins two DataFrames on a key
The Spark DataFrame join operation does not handle duplicate key names. If both left and right side of the join operation contain the same key, the result DataFrame is unusable.
The smvJoinByKey method will allow the user to join two DataFrames using the same join key. Post join, only the left side keys will remain. In case of outer-join, the coalesce(leftkey, rightkey) will replace the left key to be kept.
Parameters: - other (DataFrame) – the DataFrame to join with
- keys (list(string)) – a list of column names on which to apply the join
- joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
Example
>>> df1.smvJoinByKey(df2, ["k"], "inner")
Returns: result of the join operation Return type: (DataFrame)
-
smvJoinMultipleByKey
(keys, joinType='inner')[source]¶ Create multiple DF join builder
It is used in conjunction with joinWith and doJoin
Parameters: - keys (list(string)) – a list of column names on which to apply the join
- joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
Example
>>> df.joinMultipleByKey(["k1", "k2"], "inner").joinWith(df2, "_df2").joinWith(df3, "_df3", "leftouter").doJoin()
Returns: the builder object for the multi join operation Return type: (SmvMultiJoin)
-
smvOverlapCheck
(keyColName)[source]¶ For a set of DFs, which share the same key column, check the overlap across them
The other DataFrames are specified in another set of parentheses, as follows:
>>> df1.smvOverlapCheck(key)(*df)
Parameters: keyColName (string) – the column name for the key column Examples
>>> df1.smvOverlapCheck("key")(df2, df3, df4)
- output DF has 2 columns:
- key
- flag: a bit-string, e.g. 0110. Each bit represents whether the original DF has this key
It can be used with EDD to summarize on the flag:
>>> df1.smvOverlapCheck("key")(df2, df3).smvHist("flag")
-
smvRemoveDesc
(*colNames)[source]¶ Removes description for the given columns from the Dataframe
Parameters: colNames (*string) – names of columns for which to remove the description Example
>>> df.smvRemoveDesc("col_a", "col_b")
Returns: the DataFrame with column descriptions removed Return type: (DataFrame)
-
smvRenameField
(*namePairs)[source]¶ Rename one or more fields of a DataFrame
Parameters: namePairs (*tuple) – tuples of strings where the first is the source column name, and the second is the target column name Example
>>> df.smvRenameField(("a", "aa"), ("c", "cc"))
Returns: the DataFrame with renamed fields Return type: (DataFrame)
-
smvSelectMinus
(*cols)[source]¶ Remove one or more columns from current DataFrame
Parameters: cols (*string or *Column) – column names or Columns to remove from the DataFrame Example
>>> df.smvSelectMinus("col1", "col2") >>> df.smvSelectMinus(col("col1"), col("col2"))
Returns: the resulting DataFrame after removal of columns Return type: (DataFrame)
-
smvSelectPlus
(*cols)[source]¶ Selects all the current columns in current DataFrame plus the supplied expressions
The new columns are added to the end of the current column list.
Parameters: cols (*Column) – expressions to add to the DataFrame Example
>>> df.smvSelectPlus(col("price") * col("count") as "amt")
Returns: the resulting DataFrame after removal of columns Return type: (DataFrame)
-
smvSkewJoinByKey
(other, joinType, skewVals, key)[source]¶ Join that leverages broadcast (map-side) join of rows with skewed (high-frequency) values
Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results.
Parameters: - other (DataFrame) – DataFrame to join with
- joinType (str) – name of type of join (e.g. “inner”)
- skewVals (list(object)) – list of skewed values
- key (str) – key on which to join (also the Column with the skewed values)
Example
{{{ df.smvSkewJoinByKey(df2, SmvJoinType.Inner, Seq(“9999999”), “cid”) }}} will broadcast join the rows of df1 and df2 where col(“cid”) == “9999999” and join the remaining rows of df1 and df2 without broadcast join.
-
smvUnion
(*dfothers)[source]¶ Unions DataFrames with different number of columns by column name and schema
Spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns.
Parameters: dfOthers (*DataFrame) – the dataframes to union with Example
>>> df.smvUnion(df2, df3)
Returns: the union of all specified DataFrames Return type: (DataFrame)
-
smvUnpivot
(*cols)[source]¶ Unpivot the selected columns
Given a set of records with value columns, turns the value columns into value rows.
Parameters: cols (*string) – the names of the columns to unpivot Example
input DF:
id X Y Z 1 A B C 2 D E F 3 G H I >>> df.smvUnpivot("X", "Y", "Z")
output DF:
id column value 1 X A 1 Y B 1 Z C ... ... ... 3 Y H 3 Z I Returns: the unpivoted DataFrame Return type: (DataFrame)
-
smvUnpivotRegex
(cols, colNameFn, indexColName)[source]¶ Unpivot the selected columns using the specified regex
Parameters: - cols (*string) – the names of the columns to unpivot
- colNameFn (string) – a regex representing the function to be applied when unpivoting
- indexColName (string) – the name of the index column to be created
Example
input DF:
id A_1 A_2 B_1 B_2 1 1_a_1 1_a_2 1_b_1 1_b_2 2 2_a_1 2_a_2 2_b_1 2_b_2 >>> df.smvUnpivotRegex( ["A_1", "A_2", "B_1", "B_2"], "(.*)_(.*)", "index" )
output DF:
id index A B 1 1 1_a_1 1_b_1 1 2 1_a_2 1_b_2 2 1 2_a_1 2_b_1 2 2 2_a_2 2_b_2 Returns: the unpivoted DataFrame Return type: (DataFrame)
-
topNValsByFreq
(n, col)[source]¶ Get top N most frequent values in Column c
Parameters: - n (int) – maximum number of values
- col (Column) – which column to get values from
Examples
>>> df.topNValsByFreq(1, col("cid")) will return the single most frequent value in the cid column
Returns: most frequent values (type depends on schema) Return type: (list(object))
-
-
class
smv.helpers.
SmvGroupedData
(df, sgd)[source]¶ Bases:
object
The result of running smvGroupBy on a DataFrame. Implements special SMV aggregations.
-
smvFillNullWithPrevValue
(*orderCols)[source]¶ Fill in Null values with “previous” value according to an ordering
Examples
Given DataFrame df representing the table
K, T, V a, 1, null a, 2, a a, 3, b a, 4, null
we can use
>>> df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V")
to preduce the result
K, T, V a, 1, null a, 2, a a, 3, b a, 4, b
Returns: result of fill nulls with previous value Return type: (Dataframe)
-
smvPivotCoalesce
(pivotCols, valueCols, baseOutput)[source]¶ Perform SmvPivot, then coalesce the output
Parameters: - pivotCols (list(list(str))) – lists of names of column names to pivot
- valueCols (list(string)) – names of value columns to coalesce
- baseOutput (list(str)) – expected names pivoted column
Returns: result of pivot coalesce
Return type: (Dataframe)
-
smvPivotSum
(pivotCols, valueCols, baseOutput)[source]¶ Perform SmvPivot, then sum the results
The user is required to supply the list of expected pivot column output names to avoid extra action on the input DataFrame. If an empty sequence is provided, then the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)
Parameters: - pivotCols (list(list(str))) – lists of names of column names to pivot
- valueCols (list(string)) – names of value columns to sum
- baseOutput (list(str)) – expected names pivoted column
Examples
For example, given a DataFrame df that represents the table
id month product count 1 5/14 A 100 1 6/14 B 200 1 5/14 B 300 we can use
>>> df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", "5_14_B", "6_14_A", "6_14_B")
to produce the following output
id count_5_14_A count_5_14_B count_6_14_A count_6_14_B 1 100 300 NULL 200 Returns: result of pivot sum Return type: (DataFrame)
-
smvTopNRecs
(maxElems, *cols)[source]¶ For each group, return the top N records according to a given ordering
Example
# This will keep the 3 largest amt records for each id df.smvGroupBy(“id”).smvTopNRecs(3, col(“amt”).desc())
Parameters: - maxElems (int) – maximum number of records per group
- cols (*str) – columns defining the ordering
Returns: result of taking top records from groups
Return type: (DataFrame)
-
-
class
smv.helpers.
SmvMultiJoin
(sqlContext, mj)[source]¶ Bases:
object
Wrapper around Scala’s SmvMultiJoin
-
doJoin
(dropextra=False)[source]¶ Trigger the join operation
Parameters: dropExtra (boolean) – default false, which will keep all duplicated name columns with the postfix. when true, the duplicated columns will be dropped Example
joindf.doJoin()
Returns: result of executing the join operation Return type: (DataFrame)
-
joinWith
(df, postfix, jointype=None)[source]¶ Append SmvMultiJoin Chain
Parameters: - df (DataFrame) – the DataFrame to join with
- postfix (string) – postfix to use when renaming join columns
- jointype (string) – optional jointype. if not specified, conf.defaultJoinType is used. choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
Example
>>> joindf = df1.smvJoinMultipleByKey(['a'], 'inner').joinWith(df2, '_df2').joinWith(df3, '_df3', 'outer')
Returns: formula of the join. need to call doJoin() on it to execute Return type: (SmvMultiJoin)
-
smv.matcher module¶
-
smv.matcher.
ExactLogic
(colName, expr)[source]¶ Level match with exact logic
Parameters: - colName (string) – level name used in the output DF
- expr (Column) – match logic colName
Example
>>> ExactLogic("First_Name_Match", col("first_name") == col("_first_name"))
Returns: (ExactLogic)
-
smv.matcher.
ExactMatchPreFilter
(colName, expr)[source]¶ Specify the top-level exact match
Parameters: - colName (string) – level name used in the output DF
- expr (Column) – match logic condition Column
Example
>>> ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name"))
Returns: (ExactMatchPreFilter)
-
smv.matcher.
FuzzyLogic
(colName, predicate, valueExpr, threshold)[source]¶ Level match with fuzzy logic
Parameters: - colName (string) – level name used in the output DF
- predicate (Column) – a condition column, no match if this condition evaluates as false
- valueExpr (Column) – a value column, which typically return a score. Higher score means higher chance of matching
- threshold (float) – No match if the evaluated valueExpr < threshold
Example
>>> FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
Returns: (FuzzyLogic)
-
smv.matcher.
GroupCondition
(expr)[source]¶ Specify the shared matching condition of all the levels (except the top-level exact match)
Parameters: expr (Column) – shared matching condition Note
expr should be in “left == right” form so that it can really optimize the process by reducing searching space
Example
>>> GroupCondition(soundex("first_name") == soundex("_first_name"))
Returns: (GroupCondition)
-
class
smv.matcher.
SmvEntityMatcher
(leftId, rightId, exactMatchFilter, groupCondition, levelLogics)[source]¶ Bases:
object
Perform multiple level entity matching with exact and/or fuzzy logic
Parameters: - leftId (string) – id column name of left DF (df1)
- rightId (string) – id column name of right DF (df2)
- exactMatchFilter (ExactMatchPreFilter) – exact match condition, if records matched no further tests will be performed
- groupCondition (GroupCondition) – for exact match leftovers, a deterministic condition to narrow down the search space
- levelLogics (list(ExactLogic or FuzzyLogic)) – A list of level match conditions (always weaker than exactMatchFilter), all of them will be tested
Example
code:
SmvEntityMatcher("id", "_id", ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")), GroupCondition(soundex("first_name") == soundex("_first_name")), [ ExactLogic("First_Name_Match", col("first_name") == col("_first_name")), FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9) ] )
Returns: (XYZ) -
doMatch
(df1, df2, keepOriginalCols=True)[source]¶ Apply SmvEntityMatcher to the 2 DataFrames
Parameters: - df1 (DataFrame) – DataFrame 1 with an id column with name “id”
- df2 (DataFrame) – DataFrame 2 with an id column with name “id”
- keepOriginalCols (boolean) – whether to keep all input columns of df1 and df2, defaults to true
Example
code:
SmvEntityMatcher("id", "_id", ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")), GroupCondition(soundex("first_name") == soundex("_first_name")), [ ExactLogic("First_Name_Match", col("first_name") == col("_first_name")), FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9) ] ).doMatch(df1, df2, False)
Returns: a DataFrame with df1’s id and df2’s id and match flags of all the levels. For levels with fuzzy logic, the matching score is also provided. A column named “MatchBitmap” also provided to summarize all the matching flags. When keepOriginalCols is true, input columns are also kept Return type: (DataFrame)
smv.runconfig module¶
SMV User Run Configuration Parameters
This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.
smv.smvapp module¶
SmvPy entry class and singleton``smvApp
This module provides the main SMV Python entry point ``SmvPy
class and a singleton smvApp.
It is equivalent to SmvApp
on Scala side
-
class
smv.smvapp.
SmvApp
(arglist, _sc=None, _sqlContext=None)[source]¶ Bases:
object
The Python representation of SMV.
Its singleton instance is created later in the containing module and is named smvApp
Adds java_imports to the namespace in the JVM gateway in SparkContext (in pyspark). It also creates an instance of SmvPyClient.
-
classmethod
createInstance
(arglist, _sc=None, _sqlContext=None)[source]¶ Create singleton instance. Also returns the instance.
-
classmethod
smv.smvpydataset module¶
SMV DataSet Framework interface
This module defines the abstract classes which formed the SmvDataSet Framework for clients’ projects
-
class
smv.smvpydataset.
SmvCsvFile
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyInput
,smv.smvpydataset.WithParser
Input from a file in CSV format
-
path
¶ User-specified path to the input csv file
Override this to specify the path to the csv file.
Returns: path Return type: (str)
-
-
class
smv.smvpydataset.
SmvCsvStringData
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyInput
Input data defined by a schema string and data string
-
dataStr
¶ Smv data string.
E.g. “212,2016-10-03;119,2015-01-07”
Returns: data Return type: (str)
-
schemaStr
¶ Smv Schema string.
E.g. “id:String; dt:Timestamp”
Returns: schema Return type: (str)
-
-
smv.smvpydataset.
SmvExtDataSet
(refname)[source]¶ Creates an SmvDataSet representing an external (Scala) SmvDataSet
E.g. MyExtMod = SmvExtDataSet(“the.scala.mod”)
Parameters: fqn (str) – fqn of the Scala SmvDataSet Returns: external dataset with given fqn Return type: (SmvExtDataSet)
-
smv.smvpydataset.
SmvExtModuleLink
(refname)[source]¶ Creates a link to an external (Scala) SmvDataSet
SmvExtModuleLink(fqn) is equivalent to SmvModuleLink(SmvExtDataSet(fqn))
Parameters: fqn (str) – fqn of the the Scala SmvDataSet Returns: link to the Scala SmvDataSet Return type: (SmvModuleLink)
-
class
smv.smvpydataset.
SmvHiveTable
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyInput
Input from a Hive table
-
tableName
¶ User-specified name Hive hive table to extract input from
Override this to specify your own table name.
Returns: table name Return type: (str)
-
-
class
smv.smvpydataset.
SmvModule
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyDataSet
Base class for SmvModules written in Python
-
IsSmvModule
= True¶
-
class
RunParams
(urn2df)[source]¶ Bases:
object
Map from SmvDataSet to resulting DataFrame
We need to simulate a dict from ds to df where the same object can be keyed by different datasets with the same urn. For example, in the module
- class X(SmvModule):
- def requiresDS(self): return [SmvModuleLink(“foo”)] def run(self, i): return i[SmvModuleLink(“foo”)]
the i argument of the run method should map SmvModuleLink(“foo”) to the correct DataFrame.
Parameters: (dict) – a map from urn to DataFrame
-
run
(i)[source]¶ User-specified definition of the operations of this SmvModule
Override this method to define the output of this module, given a map ‘i’ from inputSmvDataSet to resulting DataFrame. ‘i’ will have a mapping for each SmvDataSet listed in requiresDS. E.g.
- def requiresDS(self):
- return [MyDependency]
- def run(self, i):
- return i[MyDependency].select(“importantColumn”)
Parameters: (RunParams) – mapping from input SmvDataSet to DataFrame Returns: ouput of this SmvModule Return type: (DataFrame)
-
-
smv.smvpydataset.
SmvModuleLink
(target)[source]¶ Creates a link to an SmvDataSet
When a module X in one stage depends on a module Y in a different stage, it must do through through an SmvModuleLink (listing Y directly as a dependency will lead to a runtime error). For example,:
# In stage s1 class Y(SmvModule): ... # In stage s2 class X(SmvModule) def requiresDS(self): return [SmvModuleLink(Y)] ...
Parameters: ds (SmvDataSet) – dataset to link to Returns: link to ds Return type: (SmvModuleLink)
-
class
smv.smvpydataset.
SmvModuleLinkTemplate
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvModule
A module link provides access to data generated by modules from another stage
-
IsSmvModuleLink
= True¶
-
-
class
smv.smvpydataset.
SmvMultiCsvFiles
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyInput
,smv.smvpydataset.WithParser
Raw input from multiple csv files sharing single schema
Instead of a single input file, specify a data dir with files which share the same schema.
-
dir
¶ Path to the directory containing the csv files and their schema
Returns: path Return type: (str)
-
-
class
smv.smvpydataset.
SmvOutput
[source]¶ Bases:
object
Mixin which marks an SmvModule as one of the output of its stage
- SmvOutputs are distinct from other SmvDataSets in that
- SmvModuleLinks can only link to SmvOutputs
- The -s and –run-app options of smv-pyrun only run SmvOutputs and their dependencies.
-
IsSmvOutput
= True¶
-
smv.smvpydataset.
SmvPyCsvFile
¶ Deprecated alias of SmvCsvFile
alias of
SmvCsvFile
-
smv.smvpydataset.
SmvPyCsvStringData
¶ Deprecated alias of SmvCsvStringData
alias of
SmvCsvStringData
-
class
smv.smvpydataset.
SmvPyDataSet
(smvApp)[source]¶ Bases:
object
Abstract base class for all SmvDataSets
-
IsSmvPyDataSet
= True¶
-
dqm
()[source]¶ DQM policy
Override this method to define your own DQM policy (optional). Default is an empty policy.
Returns: a DQM policy Return type: (SmvDQM)
-
isEphemeral
()[source]¶ Should this SmvDataSet skip persisting its data?
Returns: True if this SmvDataSet should not persist its data, false otherwise Return type: (bool)
-
publishHiveSql
()[source]¶ An optional sql query to run to publish the results of this module when the –publish-hive command line is used. The DataFrame result of running this module will be available to the query as the “dftable” table.
- Example:
>>> return "insert overwrite table mytable select * from dftable"
- Note:
- If this method is not specified, the default is to just create the table specified by tableName() with the results of the module.
Returns: the query to run. Return type: (string)
-
-
smv.smvpydataset.
SmvPyExtDataSet
(refname)¶ Deprecated alias of SmvExtDataSet
-
smv.smvpydataset.
SmvPyExtModuleLink
(refname)¶ Deprecated alias of SmvExtModuleLink
-
smv.smvpydataset.
SmvPyHiveTable
¶ Deprecated alias of SmvHiveTable
alias of
SmvHiveTable
-
class
smv.smvpydataset.
SmvPyInput
(smvApp)[source]¶ Bases:
smv.smvpydataset.SmvPyDataSet
SmvDataSet representing external input
-
smv.smvpydataset.
SmvPyModuleLink
(target)¶ Deprecated alias of SmvModuleLink
-
smv.smvpydataset.
SmvPyMultiCsvFiles
¶ Deprecated alias of SmvMultiCsvFiles
alias of
SmvMultiCsvFiles