smv package

Submodules

smv.csv_attributes module

SMV User Run Configuration Parameters

This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.

smv.csv_attributes.CsvAttributes(delimiter=', ', quotechar='"', hasHeader=False)[source]

smv.dqm module

SMV DataSet Framework interface

This module defines the abstract classes which formed the SmvDataSet Framework for clients’ projects

smv.dqm.DQMFix(condition, fix, name=None, taskPolicy=None)[source]

DQMFix will fix a column with a default value

Example

# If “age” greater than 100, make it 100 val f = DQMFix($”age” > 100, lit(100) as “age”, “age_cap100”, FailNone) DQMFix(col(‘age’) > 100, lit(100).alias(‘age’), ‘age_cap100’, FailNone)

Parameters:
  • condition (Column) – boolean condition that determines when the fix should occur on the records of a DF
  • fix (Column) – the fix to use when replacing a value that does not pass the condition
  • name (String) – optional parameter for naming the DQMFix. if not specified, defaults to the condition text
  • taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns:

a DQMFix object

Return type:

(DQMFix)

smv.dqm.DQMRule(rule, name=None, taskPolicy=None)[source]

DQMRule defines a requirement on the records of a DF

Example

# Require the sum of “a” and “b” columns less than 100 DQMRule(col(‘a’) + col(‘b’) < 100.0, ‘a_b_sum_lt100’, FailPercent(0.01))

Parameters:
  • rule (Column) – boolean condition that defines the requirement on the records of a DF
  • name (string) – optional parameter for naming the DQMRule. if not specified, defaults to the rule text
  • taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns:

a DQMRule object

Return type:

(DQMRule)

smv.dqm.FailAny()[source]

Any rule fail or fix with FailAny will cause the entire DF to fail

Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailCount(threshold)[source]

Tasks with FailCount(n) will fail the DF if the task is triggered >= n times

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailNone()[source]

Tasks with FailNone will not trigger any DF level policy

Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailParserCountPolicy(threshold)[source]

If the total time of parser fails >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailPercent(threshold)[source]

Tasks with FailPercent(r) will fail the DF if the task is triggered >= r percent of the total number of records in the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailTotalFixCountPolicy(threshold)[source]

For all the fixes in a DQM, if the total number of times they are triggered is >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalFixPercentPolicy(threshold)[source]

For all the fixes in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalRuleCountPolicy(threshold)[source]

For all the rules in a DQM, if the total number of times they are triggered is >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalRulePercentPolicy(threshold)[source]

For all the rules in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.SmvDQM()[source]

Factory method for Scala SmvDQM

smv.error module

Errors thrown by SMV

exception smv.error.SmvRuntimeError(msg)[source]

Bases: exceptions.RuntimeError

smv.functions module

smv.functions.diceSorensen(c1, c2)[source]

2-gram UDF with formula (2 * number of overlaped gramCnt)/(s1.gramCnt + s2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

2-gram

Return type:

(Column)

smv.functions.jaroWinkler(c1, c2)[source]

Jaro-Winkler edit distance metric UDF

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

distances

Return type:

(Column)

smv.functions.nGram2(c1, c2)[source]

2-gram UDF with formula (number of overlaped gramCnt)/max(c1.gramCnt, c2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

2-gram

Return type:

(Column)

smv.functions.nGram3(c1, c2)[source]

3-gram UDF with formula (number of overlaped gramCnt)/max(s1.gramCnt, s2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

3-gram

Return type:

(Column)

smv.functions.normlevenshtein(c1, c2)[source]

Levenshtein edit distance metric UDF

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

distances

Return type:

(Column)

smv.functions.smvArrayCat(sep, col)[source]

For an array typed column, concat the elements to a string with the given separater.

Parameters:
  • sep – a Python string to separate the fields
  • col – a Column with ArrayType
Returns:

a Column in StringType with array elements concatenated

Return type:

(col)

smv.functions.smvCollectSet(col, datatype)[source]

An aggregate function, which will collect all the values of the given column and create a set as an array typed column. Since Spark 1.6, a spark function collect_set was introduced, so as migrate to Spark 1.6 and later, this smvCollectSet will be depricated.

Parameters:
  • col (Column) – column to be aggregated on
  • datatype (DataType) – datatype of the input column
smv.functions.smvCreateLookUp(m, default, outputType)[source]

Return a Python UDF which will perform a dictionary lookup on a column

Parameters:
  • m (dictionary) – a Python dictionary to be applied
  • default (any) – default value if dictionary lookup failed
  • outputType (DataType) – output value’s data type
Returns:

an udf which can apply to a column and apply the lookup

Return type:

(udf)

smv.functions.smvFirst(c, nonNull=False)[source]

Variation of Spark “first” which also returns null values

Since Spark “first” will return the first non-null value, we have to create our version smvFirst which to retune the real first value, even if it’s null. Alternatively can return the first non-null value.

Parameters:
  • (Column (c) – column to extract first value from
  • nonNull (bool) – If false, return first value even if null. If true, return first non-null value. Defaults to false.
Returns:

first value

Return type:

(object)

smv.functions.smvHashKey(head, *others)[source]

Create MD5 on concatenated columns. Return “Prefix” + MD5 Hex string(size 32 string) as the unique key

MD5’s collisions rate on real data records could be ignored based on the following discussion.

https://marc-stevens.nl/research/md5-1block-collision/ The shortest messages have the same MD5 are 512-bit (64-byte) messages as below

4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa200a8284bf36e8e4b55b35f427593d849676da0d1555d8360fb5f07fea2 and the (different by two bits) 4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa202a8284bf36e8e4b55b35f427593d849676da0d1d55d8360fb5f07fea2 both have MD5 hash 008ee33a9d58b51cfeb425b0959121c9

There are other those pairs, but all carefully constructed. Theoretically the random collisions will happen on data size approaching 2^64 (since MD5 has 128-bit), which is much larger than the number of records we deal with (a billion is about 2^30) There for using MD5 to hash primary key columns is good enough for creating an unique key

This function can take 2 forms: - smvHashKey(prefix, col1, col2, ...) - smvHashKey(col1, col2, ...)

Parameters:
  • prefix (String) – return string’s prefix
  • col. (Column) – columns to be part of hash
Returns:

a StringType column as Prefix + MD5 Hex string

Return type:

(col)

smv.functions.smvStrCat(head, *others)[source]

Concatenate multiple columns to a single string. Similar to concat and concat_ws functions in Spark but behaves differently when some columns are nulls. The Spark version will return null if any of the inputs is null. smvStrCat will return null if all of the inputs are nulls, otherwise it will coalesce null cols to blank.

This function can take 2 forms: - smvStrCat(sep, col1, col2, ...) - smvStrCat(col1, col2, ...)

Parameters:
  • sep (String) – separater for the concats
  • col. (Column) – columns to be concatenated
Returns:

a StringType column

Return type:

(col)

smv.graph module

Provides dependency graphing of SMV modules.

class smv.graph.SmvDependencyGraph(smvApp, stageNames=None)[source]

Bases: graphviz.files.Source

smv.graph.svg_graph(*stageNames)[source]

smv.helpers module

SMV DataFrame Helpers and Column Helpers

This module provides the helper functions on DataFrame objects and Column objects

class smv.helpers.ColumnHelper(col)[source]

Bases: object

smvDay70()[source]

Convert a Timestamp to the number of days from 1970-01-01

Example

>>> df.select(col("dob")).smvDay70
Returns:number of days from 1970-01-01 (start from 0)
Return type:(integer)
smvDayOfMonth()[source]

Extract day of month component from a timestamp

Example

>>> df.select(col("dob")).smvDayOfMonth()
Returns:day of month component as integer (range 1-31), or null if input column is null
Return type:(integer)
smvDayOfWeek()[source]

Extract day of week component from a timestamp

Example

>>> df.select(col("dob")).smvDayOfWeek()
Returns:day of week component as integer (range 1-7, 1 being Sunday), or null if input column is null
Return type:(integer)
smvHour()[source]

Extract hour component from a timestamp

Example

>>> df.select(col("dob")).smvHour()
Returns:hour component as integer, or null if input column is null
Return type:(integer)
smvIsAllIn(*vals)[source]

Returns true if ALL of the Array columns’ elements are in the given parameter sequence

Parameters:vals (*any) – vals must be of the same type as the Array content

Example

input DF:

k v
a b
c d
   
>>> df.select( array(col("k"), col("v")) ).smvIsAllIn("a", "b", "c").alias("isFound"))

output DF:

isFound
true
false
false
Returns:result of smvIsAllIn
Return type:(DataFrame)
smvIsAnyIn(*vals)[source]

Returns true if ANY one of the Array columns’ elements are in the given parameter sequence

Parameters:vals (*any) – vals must be of the same type as the Array content

Example

input DF:

k v
a b
c d
   
>>> df.select( array(col("k"), col("v")) ).smvIsAnyIn("a", "b", "c").alias("isFound"))

output DF:

isFound
true
true
false
Returns:result of smvIsAnyIn
Return type:(DataFrame)
smvMonth()[source]

Extract month component from a timestamp

Example

>>> df.select(col("dob")).smvMonth()
Returns:month component as integer, or null if input column is null
Return type:(integer)
smvMonth70()[source]

Convert a Timestamp to the number of months from 1970-01-01

Example

>>> df.select(col("dob")).smvMonth70
Returns:number of months from 1970-01-01 (start from 0)
Return type:(integer)
smvPlusDays(delta)[source]

Add N days to Timestamp column

Parameters:delta (integer) – the number of days to add

Example

>>> df.select(col("dob")).smvPlusDays(3)
Returns:the incremented Timestamp, or null if input is null
Return type:(Timestamp)
smvPlusMonths(delta)[source]

Add N months to Timestamp column

Parameters:delta (integer) – the number of months to add

Note

The calculation will do its best to only change the month field retaining the same day of month. However, in certain circumstances, it may be necessary to alter smaller fields. For example, 2007-03-31 plus one month cannot result in 2007-04-31, so the day of month is adjusted to 2007-04-30.

Example

>>> df.select(col("dob")).smvPlusMonths(3)
Returns:the incremented Timestamp, or null if input is null
Return type:(Timestamp)
smvPlusWeeks(delta)[source]

Add N weeks to Timestamp column

Parameters:delta (integer) – the number of weeks to add

Example

>>> df.select(col("dob")).smvPlusWeeks(3)
Returns:the incremented Timestamp, or null if input is null
Return type:(Timestamp)
smvPlusYears(delta)[source]

Add N years to Timestamp column

Parameters:delta (integer) – the number of years to add

Example

>>> df.select(col("dob")).smvPlusYears(3)
Returns:the incremented Timestamp, or null if input is null
Return type:(Timestamp)
smvQuarter()[source]

Extract quarter component from a timestamp

Example

>>> df.select(col("dob")).smvQuarter()
Returns:quarter component as integer (1-based), or null if input column is null
Return type:(integer)
smvStrToTimestamp(fmt)[source]

Build a timestamp from a string

Parameters:fmt (string) – the format is the same as the Java Date format

Example

>>> df.select(col("dob")).smvStrToTimestamp("yyyy-MM-dd")
Returns:the converted Timestamp
Return type:(Timestamp)
smvYear()[source]

Extract year component from a timestamp

Example

>>> df.select(col("dob")).smvYear()
Returns:year component as integer, or null if input column is null
Return type:(integer)
class smv.helpers.DataFrameHelper(df)[source]

Bases: object

peek(pos=1, colRegex='.*')[source]

Display a DataFrame row in transposed view

Parameters:
  • pos (integer) – the n-th row to display, default as 1
  • colRegex (string) – show the columns with name matching the regex, default as ”.*”
Returns:

(None)

peekSave(path, pos=1, colRegex='.*')[source]

Write peek result to a file

Parameters:
  • path (string) – local file name to Write
  • pos (integer) – the n-th row to display, default as 1
  • colRegex (string) – show the columns with name matching the regex, default as ”.*”
Returns:

(None)

smvBinHist(*colWithBin)[source]

Print distributions on numerical columns with applying the specified bin size

Parameters:colWithBin (*tuple) – each tuple must be of size 2, where the first element is the column name, and the second is the bin size for that column

Example

>>> df.smvBinHist(("col_a", 1))
Returns:(None)
smvConcatHist(*cols)[source]

Display EDD histogram of a group of columns (joint distribution)

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvConcatHist("a", "b")
Returns:(None)
smvCountHist(keys, binSize)[source]

Print the distribution of the value frequency on specific columns

Parameters:
  • keys (list(string)) – the column names on which the EDD histogram is performed
  • binSize (integer) – the bin size for the histogram

Example

>>> df.smvCountHist(["k"], 1)
Returns:(None)
smvDedupByKey(*keys)[source]

Remove duplicate records from the DataFrame by arbitrarly selecting the first record from a set of records with same primary key or key combo.

Parameters:keys (*string or *Column) – the column names or Columns on which to apply dedup

Example

input DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
2 B C4
>>> df.dedupByKey("id")

output DataFrame:

id product Company
1 A C1
2 B C3
>>> df.dedupByKey("id", "product")

output DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
Returns:a DataFrame without duplicates for the specified keys
Return type:(DataFrame)
smvDedupByKeyWithOrder(*keys)[source]

Remove duplicated records by selecting the first record relative to a given ordering

The order is specified in another set of parentheses, as follows:

>>> def smvDedupByKeyWithOrder(self, *keys)(*orderCols)

Note

Same as the dedupByKey method, we use RDD groupBy in the implementation of this method to make sure we can handle large key space.

Parameters:keys (*string or *Column) – the column names or Columns on which to apply dedup

Example

input DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
2 B C4
>>> df.dedupByKeyWithOrder(col("id"))(col("product").desc())

output DataFrame:

id product Company
1 C C2
2 B C3
Returns:a DataFrame without duplicates for the specified keys / order
Return type:(DataFrame)
smvDesc(*colDescs)[source]

Adds column descriptions

Parameters:colDescs (*tuple) – tuples of strings where the first is the column name, and the second is the description to add

Example

>>> df.smvDesc(("a", "description of col a"), ("b", "description of col b"))
Returns:the DataFrame with column descriptions added
Return type:(DataFrame)
smvDescFromDF(descDF)[source]

Adds column descriptions

Parameters:descDF (DataFrame) – a companion 2-column desciptionDF that has variable names as column 1 and corresponding variable descriptions as column 2

Example

>>> df.smvDescFromDF(desciptionDF)
Returns:the DataFrame with column descriptions added
Return type:(DataFrame)
smvDiscoverPK(n=10000)[source]

Find a column combination which uniquely identifies a row from the data

Note

The algorithm only looks for a set of keys which uniquely identifies the row. There could be more key combinations which can also be the primary key.

Parameters:n (integer) – number of rows the PK discovery algorithm will run on, defaults to 10000

Example

>>> df.smvDiscoverPK(5000)
Returns:(None)
smvDumpDF()[source]

Dump the schema and data of given df to screen for debugging purposes

Similar to show() method of DF from Spark 1.3, although the format is slightly different. This function’s format is more convenient for us and hence has remained.

Example

>>> df.smvDumpDF()
Returns:(None)
smvEdd(*cols)[source]

Display EDD summary

Parameters:cols (*string) – column names on which to perform EDD summary

Example

>>> df.smvEdd("a", "b")
Returns:(None)
smvEddCompare(df2, ignoreColName)[source]

Compare 2 DFs by comparing their Edd Summary result

The result of the comparison is printed out.

Parameters:
  • df2 (DataFrame) – the DataFrame to compare with
  • ignoreColName (boolean) – specifies whether to ignore column names, default is false

Example

>>> df.smvEddCompare(df2)
>>> df.smvEddCompare(df2, true)
Returns:(None)
smvExpandStruct(*cols)[source]

Expand structure type column to a group of columns

Parameters:cols (*string) – column names to expand

Example

input DF:
[id: string, address: struct<state:string, zip:string, street:string>]
>>> df.smvExpandStruct("address")
output DF:
[id: string, state: string, zip: string, street: string]
Returns:DF with expanded columns
Return type:(DataFrame)
smvExportCsv(path, n=None)[source]

Export DataFrame to local file system

Parameters:
  • path (string) – relative path to the app running directory on local file system (instead of HDFS)
  • n (integer) – optional. number of records to export. default is all records

Note

Since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user’s responsibility to make sure that the DF is small enought to fit into memory.

Example

>>> df.smvExportCsv("./target/python-test-export-csv.csv")
Returns:(None)
smvFreqHist(*cols)[source]

Print EDD histogram with frequency sorting

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvFreqHist("a")
Returns:(None)
smvGetDesc(colName=None)[source]

Returns column description(s)

Parameters:colName (string) – optional column name for which to get the description.

Example

>>> df.smvGetDesc("col_a")
>>> df.smvGetDesc()
Returns:description string of colName, if specified (list(tuple)): a list of (colName, description) pairs for all columns
Return type:(string)
smvGroupBy(*cols)[source]

Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.

See [[org.tresamigos.smv.SmvGroupedDataFunc]] for list of functions that can be applied to the grouped data.

Parameters:cols (*string or *Column) – column names or Column objects to group on

Note

This is going away shortly and user will be able to use standard Spark groupBy method directly.

Example

>>> df.smvGroupBy(col("k"))
>>> df.smvGroupBy("k")
Returns:grouped data object
Return type:(SmvGroupedData)
smvHashSample(key, rate=0.01, seed=23)[source]

Sample the df according to the hash of a column

MurmurHash3 algorithm is used for generating the hash

Parameters:
  • key (string or Column) – column name or Column to sample on
  • rate (double) – sample rate in range (0, 1] with a default of 0.01 (1%)
  • seed (int) – random generator integer seed with a default of 23

Example

>>> df.smvHashSample(col("key"), rate=0.1, seed=123)
Returns:sampled DF
Return type:(DataFrame)
smvHist(*cols)[source]

Display EDD histogram

Each column’s histogram prints separately

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvHist("a")
Returns:(None)
smvJoinByKey(other, keys, joinType)[source]

joins two DataFrames on a key

The Spark DataFrame join operation does not handle duplicate key names. If both left and right side of the join operation contain the same key, the result DataFrame is unusable.

The smvJoinByKey method will allow the user to join two DataFrames using the same join key. Post join, only the left side keys will remain. In case of outer-join, the coalesce(leftkey, rightkey) will replace the left key to be kept.

Parameters:
  • other (DataFrame) – the DataFrame to join with
  • keys (list(string)) – a list of column names on which to apply the join
  • joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]

Example

>>> df1.smvJoinByKey(df2, ["k"], "inner")
Returns:result of the join operation
Return type:(DataFrame)
smvJoinMultipleByKey(keys, joinType='inner')[source]

Create multiple DF join builder

It is used in conjunction with joinWith and doJoin

Parameters:
  • keys (list(string)) – a list of column names on which to apply the join
  • joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]

Example

>>> df.joinMultipleByKey(["k1", "k2"], "inner").joinWith(df2, "_df2").joinWith(df3, "_df3", "leftouter").doJoin()
Returns:the builder object for the multi join operation
Return type:(SmvMultiJoin)
smvOverlapCheck(keyColName)[source]

For a set of DFs, which share the same key column, check the overlap across them

The other DataFrames are specified in another set of parentheses, as follows:

>>> df1.smvOverlapCheck(key)(*df)
Parameters:keyColName (string) – the column name for the key column

Examples

>>> df1.smvOverlapCheck("key")(df2, df3, df4)
output DF has 2 columns:
  • key
  • flag: a bit-string, e.g. 0110. Each bit represents whether the original DF has this key

It can be used with EDD to summarize on the flag:

>>> df1.smvOverlapCheck("key")(df2, df3).smvHist("flag")
smvRemoveDesc(*colNames)[source]

Removes description for the given columns from the Dataframe

Parameters:colNames (*string) – names of columns for which to remove the description

Example

>>> df.smvRemoveDesc("col_a", "col_b")
Returns:the DataFrame with column descriptions removed
Return type:(DataFrame)
smvRenameField(*namePairs)[source]

Rename one or more fields of a DataFrame

Parameters:namePairs (*tuple) – tuples of strings where the first is the source column name, and the second is the target column name

Example

>>> df.smvRenameField(("a", "aa"), ("c", "cc"))
Returns:the DataFrame with renamed fields
Return type:(DataFrame)
smvSelectMinus(*cols)[source]

Remove one or more columns from current DataFrame

Parameters:cols (*string or *Column) – column names or Columns to remove from the DataFrame

Example

>>> df.smvSelectMinus("col1", "col2")
>>> df.smvSelectMinus(col("col1"), col("col2"))
Returns:the resulting DataFrame after removal of columns
Return type:(DataFrame)
smvSelectPlus(*cols)[source]

Selects all the current columns in current DataFrame plus the supplied expressions

The new columns are added to the end of the current column list.

Parameters:cols (*Column) – expressions to add to the DataFrame

Example

>>> df.smvSelectPlus(col("price") * col("count") as "amt")
Returns:the resulting DataFrame after removal of columns
Return type:(DataFrame)
smvSkewJoinByKey(other, joinType, skewVals, key)[source]

Join that leverages broadcast (map-side) join of rows with skewed (high-frequency) values

Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results.

Parameters:
  • other (DataFrame) – DataFrame to join with
  • joinType (str) – name of type of join (e.g. “inner”)
  • skewVals (list(object)) – list of skewed values
  • key (str) – key on which to join (also the Column with the skewed values)

Example

{{{ df.smvSkewJoinByKey(df2, SmvJoinType.Inner, Seq(“9999999”), “cid”) }}} will broadcast join the rows of df1 and df2 where col(“cid”) == “9999999” and join the remaining rows of df1 and df2 without broadcast join.

smvUnion(*dfothers)[source]

Unions DataFrames with different number of columns by column name and schema

Spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns.

Parameters:dfOthers (*DataFrame) – the dataframes to union with

Example

>>> df.smvUnion(df2, df3)
Returns:the union of all specified DataFrames
Return type:(DataFrame)
smvUnpivot(*cols)[source]

Unpivot the selected columns

Given a set of records with value columns, turns the value columns into value rows.

Parameters:cols (*string) – the names of the columns to unpivot

Example

input DF:

id X Y Z
1 A B C
2 D E F
3 G H I
>>> df.smvUnpivot("X", "Y", "Z")

output DF:

id column value
1 X A
1 Y B
1 Z C
... ... ...
3 Y H
3 Z I
Returns:the unpivoted DataFrame
Return type:(DataFrame)
smvUnpivotRegex(cols, colNameFn, indexColName)[source]

Unpivot the selected columns using the specified regex

Parameters:
  • cols (*string) – the names of the columns to unpivot
  • colNameFn (string) – a regex representing the function to be applied when unpivoting
  • indexColName (string) – the name of the index column to be created

Example

input DF:

id A_1 A_2 B_1 B_2
1 1_a_1 1_a_2 1_b_1 1_b_2
2 2_a_1 2_a_2 2_b_1 2_b_2
>>> df.smvUnpivotRegex( ["A_1", "A_2", "B_1", "B_2"], "(.*)_(.*)", "index" )

output DF:

id index A B
1 1 1_a_1 1_b_1
1 2 1_a_2 1_b_2
2 1 2_a_1 2_b_1
2 2 2_a_2 2_b_2
Returns:the unpivoted DataFrame
Return type:(DataFrame)
topNValsByFreq(n, col)[source]

Get top N most frequent values in Column c

Parameters:
  • n (int) – maximum number of values
  • col (Column) – which column to get values from

Examples

>>> df.topNValsByFreq(1, col("cid"))
will return the single most frequent value in the cid column
Returns:most frequent values (type depends on schema)
Return type:(list(object))
class smv.helpers.SmvGroupedData(df, sgd)[source]

Bases: object

The result of running smvGroupBy on a DataFrame. Implements special SMV aggregations.

smvFillNullWithPrevValue(*orderCols)[source]

Fill in Null values with “previous” value according to an ordering

Examples

Given DataFrame df representing the table

K, T, V a, 1, null a, 2, a a, 3, b a, 4, null

we can use

>>> df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V")

to preduce the result

K, T, V a, 1, null a, 2, a a, 3, b a, 4, b

Returns:result of fill nulls with previous value
Return type:(Dataframe)
smvPivotCoalesce(pivotCols, valueCols, baseOutput)[source]

Perform SmvPivot, then coalesce the output

Parameters:
  • pivotCols (list(list(str))) – lists of names of column names to pivot
  • valueCols (list(string)) – names of value columns to coalesce
  • baseOutput (list(str)) – expected names pivoted column
Returns:

result of pivot coalesce

Return type:

(Dataframe)

smvPivotSum(pivotCols, valueCols, baseOutput)[source]

Perform SmvPivot, then sum the results

The user is required to supply the list of expected pivot column output names to avoid extra action on the input DataFrame. If an empty sequence is provided, then the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)

Parameters:
  • pivotCols (list(list(str))) – lists of names of column names to pivot
  • valueCols (list(string)) – names of value columns to sum
  • baseOutput (list(str)) – expected names pivoted column

Examples

For example, given a DataFrame df that represents the table

id month product count
1 5/14 A 100
1 6/14 B 200
1 5/14 B 300

we can use

>>> df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", "5_14_B", "6_14_A", "6_14_B")

to produce the following output

id count_5_14_A count_5_14_B count_6_14_A count_6_14_B
1 100 300 NULL 200
Returns:result of pivot sum
Return type:(DataFrame)
smvTopNRecs(maxElems, *cols)[source]

For each group, return the top N records according to a given ordering

Example

# This will keep the 3 largest amt records for each id df.smvGroupBy(“id”).smvTopNRecs(3, col(“amt”).desc())

Parameters:
  • maxElems (int) – maximum number of records per group
  • cols (*str) – columns defining the ordering
Returns:

result of taking top records from groups

Return type:

(DataFrame)

class smv.helpers.SmvMultiJoin(sqlContext, mj)[source]

Bases: object

Wrapper around Scala’s SmvMultiJoin

doJoin(dropextra=False)[source]

Trigger the join operation

Parameters:dropExtra (boolean) – default false, which will keep all duplicated name columns with the postfix. when true, the duplicated columns will be dropped

Example

joindf.doJoin()

Returns:result of executing the join operation
Return type:(DataFrame)
joinWith(df, postfix, jointype=None)[source]

Append SmvMultiJoin Chain

Parameters:
  • df (DataFrame) – the DataFrame to join with
  • postfix (string) – postfix to use when renaming join columns
  • jointype (string) – optional jointype. if not specified, conf.defaultJoinType is used. choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]

Example

>>> joindf = df1.smvJoinMultipleByKey(['a'], 'inner').joinWith(df2, '_df2').joinWith(df3, '_df3', 'outer')
Returns:formula of the join. need to call doJoin() on it to execute
Return type:(SmvMultiJoin)

smv.matcher module

smv.matcher.ExactLogic(colName, expr)[source]

Level match with exact logic

Parameters:
  • colName (string) – level name used in the output DF
  • expr (Column) – match logic colName

Example

>>> ExactLogic("First_Name_Match", col("first_name") == col("_first_name"))
Returns:(ExactLogic)
smv.matcher.ExactMatchPreFilter(colName, expr)[source]

Specify the top-level exact match

Parameters:
  • colName (string) – level name used in the output DF
  • expr (Column) – match logic condition Column

Example

>>> ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name"))
Returns:(ExactMatchPreFilter)
smv.matcher.FuzzyLogic(colName, predicate, valueExpr, threshold)[source]

Level match with fuzzy logic

Parameters:
  • colName (string) – level name used in the output DF
  • predicate (Column) – a condition column, no match if this condition evaluates as false
  • valueExpr (Column) – a value column, which typically return a score. Higher score means higher chance of matching
  • threshold (float) – No match if the evaluated valueExpr < threshold

Example

>>> FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
Returns:(FuzzyLogic)
smv.matcher.GroupCondition(expr)[source]

Specify the shared matching condition of all the levels (except the top-level exact match)

Parameters:expr (Column) – shared matching condition

Note

expr should be in “left == right” form so that it can really optimize the process by reducing searching space

Example

>>> GroupCondition(soundex("first_name") == soundex("_first_name"))
Returns:(GroupCondition)
smv.matcher.NoOpGroupCondition()[source]

Always returns None

Returns:(None)
smv.matcher.NoOpPreFilter()[source]

Always returns None

Returns:(None)
class smv.matcher.SmvEntityMatcher(leftId, rightId, exactMatchFilter, groupCondition, levelLogics)[source]

Bases: object

Perform multiple level entity matching with exact and/or fuzzy logic

Parameters:
  • leftId (string) – id column name of left DF (df1)
  • rightId (string) – id column name of right DF (df2)
  • exactMatchFilter (ExactMatchPreFilter) – exact match condition, if records matched no further tests will be performed
  • groupCondition (GroupCondition) – for exact match leftovers, a deterministic condition to narrow down the search space
  • levelLogics (list(ExactLogic or FuzzyLogic)) – A list of level match conditions (always weaker than exactMatchFilter), all of them will be tested

Example

code:

SmvEntityMatcher("id", "_id",
    ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")),
    GroupCondition(soundex("first_name") == soundex("_first_name")),
    [
        ExactLogic("First_Name_Match", col("first_name") == col("_first_name")),
        FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
    ]
)
Returns:(XYZ)
doMatch(df1, df2, keepOriginalCols=True)[source]

Apply SmvEntityMatcher to the 2 DataFrames

Parameters:
  • df1 (DataFrame) – DataFrame 1 with an id column with name “id”
  • df2 (DataFrame) – DataFrame 2 with an id column with name “id”
  • keepOriginalCols (boolean) – whether to keep all input columns of df1 and df2, defaults to true

Example

code:

SmvEntityMatcher("id", "_id",
    ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")),
    GroupCondition(soundex("first_name") == soundex("_first_name")),
    [
        ExactLogic("First_Name_Match", col("first_name") == col("_first_name")),
        FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
    ]
).doMatch(df1, df2, False)
Returns:a DataFrame with df1’s id and df2’s id and match flags of all the levels. For levels with fuzzy logic, the matching score is also provided. A column named “MatchBitmap” also provided to summarize all the matching flags. When keepOriginalCols is true, input columns are also kept
Return type:(DataFrame)

smv.runconfig module

SMV User Run Configuration Parameters

This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.

class smv.runconfig.SmvRunConfig[source]

Bases: object

Mix-in class to SmvModules that enable the module to access user run configuration parameters at run time.

smvGetRunConfig(key)[source]

return the current user run configuration value for the given key.

smvGetRunConfigAsBool(key)[source]
smvGetRunConfigAsInt(key)[source]

smv.smvapp module

SmvPy entry class and singleton``smvApp This module provides the main SMV Python entry point ``SmvPy class and a singleton smvApp. It is equivalent to SmvApp on Scala side

class smv.smvapp.DataSetRepo(smvApp)[source]

Bases: object

class Java[source]
implements = ['org.tresamigos.smv.IDataSetRepoPy4J']
dataSetsForStage(stageName)[source]
loadDataSet(fqn)[source]
notFound(modUrn, msg)[source]
outputModsForStage(stageName)[source]
class smv.smvapp.DataSetRepoFactory(smvApp)[source]

Bases: object

class Java[source]
implements = ['org.tresamigos.smv.IDataSetRepoFactoryPy4J']
createRepo()[source]
class smv.smvapp.SmvApp(arglist, _sc=None, _sqlContext=None)[source]

Bases: object

The Python representation of SMV.

Its singleton instance is created later in the containing module and is named smvApp

Adds java_imports to the namespace in the JVM gateway in SparkContext (in pyspark). It also creates an instance of SmvPyClient.

appName()[source]
createDF(schema, data=None)[source]
classmethod createInstance(arglist, _sc=None, _sqlContext=None)[source]

Create singleton instance. Also returns the instance.

create_smv_pyclient(arglist)[source]

return a smvPyClient instance

defaultCsvWithHeader()[source]
defaultTsv()[source]
defaultTsvWithHeader()[source]
classmethod getInstance()[source]
get_graph_json()[source]

Generate a json string representing the dependency graph. TODO: need to add a stageName parameter to limit it to a single stage.

outputDir()[source]
prepend_source(source_dir)[source]
run()[source]
runModule(urn)[source]

Runs either a Scala or a Python SmvModule by its Fully Qualified Name(fqn)

runModuleByName(name)[source]
scalaOption(val)[source]

Returns a Scala Option containing the value

urn2fqn(urnOrFqn)[source]

Extracts the SMV module FQN portion from its URN; if it’s already an FQN return it unchanged

smv.smvpydataset module

SMV DataSet Framework interface

This module defines the abstract classes which formed the SmvDataSet Framework for clients’ projects

class smv.smvpydataset.SmvCsvFile(smvApp)[source]

Bases: smv.smvpydataset.SmvPyInput, smv.smvpydataset.WithParser

Input from a file in CSV format

description()[source]
doRun(validator, known)[source]
path

User-specified path to the input csv file

Override this to specify the path to the csv file.

Returns:path
Return type:(str)
class smv.smvpydataset.SmvCsvStringData(smvApp)[source]

Bases: smv.smvpydataset.SmvPyInput

Input data defined by a schema string and data string

dataStr

Smv data string.

E.g. “212,2016-10-03;119,2015-01-07”

Returns:data
Return type:(str)
doRun(validator, known)[source]
schemaStr

Smv Schema string.

E.g. “id:String; dt:Timestamp”

Returns:schema
Return type:(str)
smv.smvpydataset.SmvExtDataSet(refname)[source]

Creates an SmvDataSet representing an external (Scala) SmvDataSet

E.g. MyExtMod = SmvExtDataSet(“the.scala.mod”)

Parameters:fqn (str) – fqn of the Scala SmvDataSet
Returns:external dataset with given fqn
Return type:(SmvExtDataSet)

Creates a link to an external (Scala) SmvDataSet

SmvExtModuleLink(fqn) is equivalent to SmvModuleLink(SmvExtDataSet(fqn))

Parameters:fqn (str) – fqn of the the Scala SmvDataSet
Returns:link to the Scala SmvDataSet
Return type:(SmvModuleLink)
class smv.smvpydataset.SmvHiveTable(smvApp)[source]

Bases: smv.smvpydataset.SmvPyInput

Input from a Hive table

description()[source]
doRun(validator, known)[source]
tableName

User-specified name Hive hive table to extract input from

Override this to specify your own table name.

Returns:table name
Return type:(str)
tableQuery()[source]

Query used to extract data from Hive table

Override this to specify your own query (optional). Default is equivalent to ‘select * from ‘ + tableName().

Returns:query
Return type:(str)
class smv.smvpydataset.SmvModule(smvApp)[source]

Bases: smv.smvpydataset.SmvPyDataSet

Base class for SmvModules written in Python

IsSmvModule = True
class RunParams(urn2df)[source]

Bases: object

Map from SmvDataSet to resulting DataFrame

We need to simulate a dict from ds to df where the same object can be keyed by different datasets with the same urn. For example, in the module

class X(SmvModule):
def requiresDS(self): return [SmvModuleLink(“foo”)] def run(self, i): return i[SmvModuleLink(“foo”)]

the i argument of the run method should map SmvModuleLink(“foo”) to the correct DataFrame.

Parameters:(dict) – a map from urn to DataFrame
doRun(validator, known)[source]
dsType()[source]
run(i)[source]

User-specified definition of the operations of this SmvModule

Override this method to define the output of this module, given a map ‘i’ from inputSmvDataSet to resulting DataFrame. ‘i’ will have a mapping for each SmvDataSet listed in requiresDS. E.g.

def requiresDS(self):
return [MyDependency]
def run(self, i):
return i[MyDependency].select(“importantColumn”)
Parameters:(RunParams) – mapping from input SmvDataSet to DataFrame
Returns:ouput of this SmvModule
Return type:(DataFrame)

Creates a link to an SmvDataSet

When a module X in one stage depends on a module Y in a different stage, it must do through through an SmvModuleLink (listing Y directly as a dependency will lead to a runtime error). For example,:

# In stage s1
class Y(SmvModule):
    ...

# In stage s2
class X(SmvModule)
    def requiresDS(self): return [SmvModuleLink(Y)]
    ...
Parameters:ds (SmvDataSet) – dataset to link to
Returns:link to ds
Return type:(SmvModuleLink)
class smv.smvpydataset.SmvModuleLinkTemplate(smvApp)[source]

Bases: smv.smvpydataset.SmvModule

A module link provides access to data generated by modules from another stage

dsType()[source]
isEphemeral()[source]
requiresDS()[source]
classmethod target()[source]

Returns the target SmvModule class from another stage to which this link points

classmethod urn()[source]
class smv.smvpydataset.SmvMultiCsvFiles(smvApp)[source]

Bases: smv.smvpydataset.SmvPyInput, smv.smvpydataset.WithParser

Raw input from multiple csv files sharing single schema

Instead of a single input file, specify a data dir with files which share the same schema.

description()[source]
dir

Path to the directory containing the csv files and their schema

Returns:path
Return type:(str)
doRun(validator, known)[source]
class smv.smvpydataset.SmvOutput[source]

Bases: object

Mixin which marks an SmvModule as one of the output of its stage

SmvOutputs are distinct from other SmvDataSets in that
  • SmvModuleLinks can only link to SmvOutputs
  • The -s and –run-app options of smv-pyrun only run SmvOutputs and their dependencies.
IsSmvOutput = True
tableName()[source]

The user-specified table name used when exporting data to Hive (optional)

Returns:(string)
smv.smvpydataset.SmvPyCsvFile

Deprecated alias of SmvCsvFile

alias of SmvCsvFile

smv.smvpydataset.SmvPyCsvStringData

Deprecated alias of SmvCsvStringData

alias of SmvCsvStringData

class smv.smvpydataset.SmvPyDataSet(smvApp)[source]

Bases: object

Abstract base class for all SmvDataSets

IsSmvPyDataSet = True
class Java[source]
implements = ['org.tresamigos.smv.ISmvModule']
datasetHash()[source]
dependencies()[source]
description()[source]
doRun(validator, known)[source]

Comput this dataset, and return the dataframe

dqm()[source]

DQM policy

Override this method to define your own DQM policy (optional). Default is an empty policy.

Returns:a DQM policy
Return type:(SmvDQM)
dsType()[source]

Return SmvPyDataSet’s type

classmethod fqn()[source]

Returns the fully qualified name

getDataFrame(validator, known)[source]
getDqm()[source]
isEphemeral()[source]

Should this SmvDataSet skip persisting its data?

Returns:True if this SmvDataSet should not persist its data, false otherwise
Return type:(bool)
isOutput()[source]
publishHiveSql()[source]

An optional sql query to run to publish the results of this module when the –publish-hive command line is used. The DataFrame result of running this module will be available to the query as the “dftable” table.

Example:
>>> return "insert overwrite table mytable select * from dftable"
Note:
If this method is not specified, the default is to just create the table specified by tableName() with the results of the module.
Returns:the query to run.
Return type:(string)
requiresDS()[source]

User-specified list of dependencies

Override this method to specify the SmvDataSets needed as inputs.

Returns:a list of dependencies
Return type:(list(SmvDataSet))
classmethod urn()[source]
version()[source]

Version number

Each SmvDataSet is versioned with a numeric string, so it and its result can be tracked together.

Returns:version number of this SmvDataSet
Return type:(str)
smv.smvpydataset.SmvPyExtDataSet(refname)

Deprecated alias of SmvExtDataSet

Deprecated alias of SmvExtModuleLink

smv.smvpydataset.SmvPyHiveTable

Deprecated alias of SmvHiveTable

alias of SmvHiveTable

class smv.smvpydataset.SmvPyInput(smvApp)[source]

Bases: smv.smvpydataset.SmvPyDataSet

SmvDataSet representing external input

dsType()[source]
isEphemeral()[source]
requiresDS()[source]
run(df)[source]

Post-processing for input data

Parameters:df (DataFrame) – input data
Returns:processed data
Return type:(DataFrame)
smv.smvpydataset.SmvPyModule

Deprecated alias of SmvModule

alias of SmvModule

Deprecated alias of SmvModuleLink

smv.smvpydataset.SmvPyMultiCsvFiles

Deprecated alias of SmvMultiCsvFiles

alias of SmvMultiCsvFiles

smv.smvpydataset.SmvPyOutput

Deprecated alias of SmvOutput

alias of SmvOutput

class smv.smvpydataset.WithParser[source]

Bases: object

shared parser funcs

csvAttr()[source]

Specifies the csv file format. Corresponds to the CsvAttributes case class in Scala.

defaultCsvWithHeader()[source]
defaultTsv()[source]
defaultTsvWithHeader()[source]
failAtParsingError()[source]
forceParserCheck()[source]

smv.utils module

smv.utils.check_socket(port)[source]

Check whether the given port is open to bind

smv.utils.for_name(name)[source]

Dynamically load a class by its name.

Equivalent to Java’s Class.forName

smv.utils.smv_copy_array(sc, *cols)[source]

Copy Python list to appropriate Java array

Module contents