smv package

Submodules

smv.csv_attributes module

SMV User Run Configuration Parameters

This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.

smv.csv_attributes.CsvAttributes(delimiter=', ', quotechar='"', hasHeader=False)[source]

smv.datasetmgr module

DataSetMgr entry class This module provides the python entry point to DataSetMgr on scala side

class smv.datasetmgr.DataSetMgr(_jvm, smvconfig)[source]

Bases: object

The Python representation of DataSetMgr.

allDataSets()[source]

Return all the SmvGenericModules in the app

inferDS(*partial_names)[source]

Return DSs from a list of partial names

Parameters:*partial_names (str) – list of partial names
Returns:list of SmvGenericModules
Return type:list(SmvGenericModules)
inferFqn(partial_name)[source]

Return FQN string from partial name

load(*fqns)[source]

Load SmvGenericModules for specified FQNs

Parameters:*fqns (str) – list of FQNs as strings
Returns:list of Scala SmvGenericModules (j_ds)
Return type:list(SmvGenericModules)
modulesToRun(modPartialNames, stageNames, allMods)[source]

Return a modules need to run Combine specified modules, (-m), stages, (-s) and if (–run-app) specified, all output modules

register(repo_factory)[source]

Register python repo factory

stages()[source]
tx()[source]

Create a TXContext for multiple places, avoid the long TXContext line

class smv.datasetmgr.TX(_jvm, resourceFactories, stages)[source]

Bases: object

Abstraction of the transaction boundary for loading SmvGenericModules. A TX object

  • will instantiate a set of repos when itself instantiated and will
  • reuse the same repos for all queries. This means that each new TX object will
  • reload the SmvGenericModules from source once during its lifetime.

NOTE: Once a new TX is created, the well-formedness of the SmvGenericModules provided by the previous TX is not guaranteed. Particularly it may become impossible to run modules from the previous TX.

allDataSets()[source]
allOutputModules()[source]
inferDS(partial_names)[source]
load(fqns)[source]
outputModulesForStage(stageNames)[source]
class smv.datasetmgr.TXContext(_jvm, resourceFactories, stages)[source]

Bases: object

Create a TX context for “with tx() as tx” syntax

smv.datasetrepo module

class smv.datasetrepo.DataSetRepo(smvApp)[source]

Bases: object

all_project_pymodules
dataSetsForStage(stageName)[source]
loadDataSet(fqn)[source]
load_pymodule(fqn)[source]
class smv.datasetrepo.DataSetRepoFactory(smvApp)[source]

Bases: object

createRepo()[source]

smv.datasetresolver module

class smv.datasetresolver.DataSetResolver(repo)[source]

Bases: object

DataSetResolver (DSR) is the entrypoint through which the DataSetMgr acquires SmvGenericModules. A DSR object represent a single transaction. Each DSR creates a set of DataSetRepos at instantiation. When asked for an SmvGenericModules, DSR queries the repos for that SmvGenericModule and resolves it. The SmvGenericModule is responsible for resolving itself, given access to the DSR to load/resolve the SmvGenericModule’s dependencies. DSR caches the SmvGenericModule it has already resolved to ensure that any SmvGenericModule is only resolved once.

loadDataSet(fqns)[source]

Given a list of FQNs, return cached resolved version SmvGenericModules if exists, or otherwise load unresolved version from source and resolve them.

resolveDataSet(ds)[source]

Return cached resolved version of given SmvGenericModule if it exists, or resolve it otherwise.

smv.dqm module

SMV DataSet Framework interface

This module defines the abstract classes which formed the SmvGenericModule Framework for clients’ projects

smv.dqm.DQMFix(condition, fix, name=None, taskPolicy=None)[source]

DQMFix will fix a column with a default value

Example

If “age” greater than 100, make it 100

>>> DQMFix(col('age') > 100, lit(100).alias('age'), 'age_cap100', FailNone)
Parameters:
  • condition (Column) – boolean condition that determines when the fix should occur on the records of a DF
  • fix (Column) – the fix to use when replacing a value that does not pass the condition
  • name (String) – optional parameter for naming the DQMFix. if not specified, defaults to the condition text
  • taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns:

a DQMFix object

Return type:

(DQMFix)

smv.dqm.DQMRule(rule, name=None, taskPolicy=None)[source]

DQMRule defines a requirement on the records of a DF

Example

Require the sum of “a” and “b” columns less than 100

>>> DQMRule(col('a') + col('b') < 100.0, 'a_b_sum_lt100', FailPercent(0.01))
Parameters:
  • rule (Column) – boolean condition that defines the requirement on the records of a DF
  • name (string) – optional parameter for naming the DQMRule. if not specified, defaults to the rule text
  • taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns:

a DQMRule object

Return type:

(DQMRule)

smv.dqm.FailAny()[source]

Any rule fail or fix with FailAny will cause the entire DF to fail

Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailCount(threshold)[source]

Tasks with FailCount(n) will fail the DF if the task is triggered >= n times

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailNone()[source]

Tasks with FailNone will not trigger any DF level policy

Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailParserCountPolicy(threshold)[source]

If the total time of parser fails >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailPercent(threshold)[source]

Tasks with FailPercent(r) will fail the DF if the task is triggered >= r percent of the total number of records in the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM Task
Return type:(DQMTaskPolicy)
smv.dqm.FailTotalFixCountPolicy(threshold)[source]

For all the fixes in a DQM, if the total number of times they are triggered is >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalFixPercentPolicy(threshold)[source]

For all the fixes in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalRuleCountPolicy(threshold)[source]

For all the rules in a DQM, if the total number of times they are triggered is >= threshold, fail the DF

Parameters:threshold (int) – the threshold after which the DF fails
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.FailTotalRulePercentPolicy(threshold)[source]

For all the rules in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF

Parameters:threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0
Returns:policy for DQM
Return type:(DQMPolicy)
smv.dqm.SmvDQM()[source]

Factory method for Scala SmvDQM

smv.error module

Errors thrown by SMV

exception smv.error.SmvDqmValidationError(dqmValidationResult)[source]

Bases: smv.error.SmvRuntimeError

This class has an instance of dqmValidationResult(dict)

exception smv.error.SmvMetadataValidationError(msg)[source]

Bases: smv.error.SmvRuntimeError

exception smv.error.SmvRuntimeError(msg)[source]

Bases: RuntimeError

smv.functions module

smv.functions.diceSorensen(c1, c2)[source]

2-gram UDF with formula (2 * number of overlaped gramCnt)/(s1.gramCnt + s2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

2-gram

Return type:

(Column)

smv.functions.jaroWinkler(c1, c2)[source]

Jaro-Winkler edit distance metric UDF

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

distances

Return type:

(Column)

smv.functions.nGram2(c1, c2)[source]

2-gram UDF with formula (number of overlaped gramCnt)/max(c1.gramCnt, c2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

2-gram

Return type:

(Column)

smv.functions.nGram3(c1, c2)[source]

3-gram UDF with formula (number of overlaped gramCnt)/max(s1.gramCnt, s2.gramCnt)

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

3-gram

Return type:

(Column)

smv.functions.normlevenshtein(c1, c2)[source]

Levenshtein edit distance metric UDF

Parameters:
  • c1 (Column) – first column
  • c2 (Column) – second column
Returns:

distances

Return type:

(Column)

smv.functions.smvArrayCat(sep, col)[source]

For an array typed column, concat the elements to a string with the given separater.

Parameters:
  • sep – a Python string to separate the fields
  • col – a Column with ArrayType
Returns:

a Column in StringType with array elements concatenated

Return type:

(col)

smv.functions.smvCollectSet(col, datatype)[source]

An aggregate function, which will collect all the values of the given column and create a set as an array typed column. Since Spark 1.6, a spark function collect_set was introduced, so as migrate to Spark 1.6 and later, this smvCollectSet will be depricated.

Parameters:
  • col (Column) – column to be aggregated on
  • datatype (DataType) – datatype of the input column
smv.functions.smvCreateLookUp(m, default, outputType)[source]

Return a Python UDF which will perform a dictionary lookup on a column

Parameters:
  • m (dictionary) – a Python dictionary to be applied
  • default (any) – default value if dictionary lookup failed
  • outputType (DataType) – output value’s data type
Returns:

an udf which can apply to a column and apply the lookup

Return type:

(udf)

smv.functions.smvFirst(c, nonNull=False)[source]

Variation of Spark “first” which also returns null values

Since Spark “first” will return the first non-null value, we have to create our version smvFirst which to retune the real first value, even if it’s null. Alternatively can return the first non-null value.

Parameters:
  • (Column (c) – column to extract first value from
  • nonNull (bool) – If false, return first value even if null. If true, return first non-null value. Defaults to false.
Returns:

first value

Return type:

(object)

smv.functions.smvHashKey(head, *others)[source]

Create MD5 on concatenated columns. Return “Prefix” + MD5 Hex string(size 32 string) as the unique key

MD5’s collisions rate on real data records could be ignored based on the following discussion.

https://marc-stevens.nl/research/md5-1block-collision/ The shortest messages have the same MD5 are 512-bit (64-byte) messages as below

4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa200a8284bf36e8e4b55b35f427593d849676da0d1555d8360fb5f07fea2 and the (different by two bits) 4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa202a8284bf36e8e4b55b35f427593d849676da0d1d55d8360fb5f07fea2 both have MD5 hash 008ee33a9d58b51cfeb425b0959121c9

There are other those pairs, but all carefully constructed. Theoretically the random collisions will happen on data size approaching 2^64 (since MD5 has 128-bit), which is much larger than the number of records we deal with (a billion is about 2^30) There for using MD5 to hash primary key columns is good enough for creating an unique key

This function can take 2 forms: - smvHashKey(prefix, col1, col2, …) - smvHashKey(col1, col2, …)

Parameters:
  • prefix (String) – return string’s prefix
  • col. (Column) – columns to be part of hash
Returns:

a StringType column as Prefix + MD5 Hex string

Return type:

(col)

smv.functions.smvStrCat(head, *others)[source]

Concatenate multiple columns to a single string. Similar to concat and concat_ws functions in Spark but behaves differently when some columns are nulls. The Spark version will return null if any of the inputs is null. smvStrCat will return null if all of the inputs are nulls, otherwise it will coalesce null cols to blank.

This function can take 2 forms: - smvStrCat(sep, col1, col2, …) - smvStrCat(col1, col2, …)

Parameters:
  • sep (String) – separater for the concats
  • col. (Column) – columns to be concatenated
Returns:

a StringType column

Return type:

(col)

smv.helpers module

SMV DataFrame Helpers and Column Helpers

This module provides the helper functions on DataFrame objects and Column objects

class smv.helpers.ColumnHelper(col)[source]

Bases: object

smvArrayFlatten(elemType)[source]

smvArrayFlatten helper applies flatten operation on an Array of Array column.

Example

>>> df.select(col('arrayOfArrayOfStr').smvArrayFlatten(StringType()))
Parameters:elemType (DataType or DataFram) – array element’s data type, in object form or the DataFrame to infer the element data type
smvDay70()[source]

Convert a Timestamp to the number of days from 1970-01-01

Example

>>> df.select(col("dob").smvDay70())
Returns:IntegerType. Number of days from 1970-01-01 (start from 0)
Return type:(Column)
smvDayOfMonth()[source]

Extract day of month component from a timestamp

Example

>>> df.select(col("dob").smvDayOfMonth())
Returns:IntegerType. Day of month component as integer (range 1-31), or null if input column is null
Return type:(Column)
smvDayOfWeek()[source]

Extract day of week component from a timestamp

Example

>>> df.select(col("dob").smvDayOfWeek())
Returns:IntegerType. Day of week component as integer (range 1-7, 1 being Monday), or null if input column is null
Return type:(Column)
smvGetColName()[source]

Returns the name of a Column as a sting

Example: >>> df.a.smvGetColName()

Returns:(str)
smvHour()[source]

Extract hour component from a timestamp

Example

>>> df.select(col("dob").smvHour())
Returns:IntegerType. Hour component as integer, or null if input column is null
Return type:(Column)
smvIsAllIn(*vals)[source]

Returns true if ALL of the Array columns’ elements are in the given parameter sequence

Parameters:vals (*any) – vals must be of the same type as the Array content

Example

input DF:

k v
a b
c d
   
>>> df.select(array(col("k"), col("v")).smvIsAllIn("a", "b", "c").alias("isFound"))

output DF:

isFound
true
false
false
Returns:BooleanType
Return type:(Column)
smvIsAnyIn(*vals)[source]

Returns true if ANY one of the Array columns’ elements are in the given parameter sequence

Parameters:vals (*any) – vals must be of the same type as the Array content

Example

input DF:

k v
a b
c d
   
>>> df.select(array(col("k"), col("v")).smvIsAnyIn("a", "b", "c").alias("isFound"))

output DF:

isFound
true
true
false
Returns:BooleanType
Return type:(Column)
smvMonth()[source]

Extract month component from a timestamp

Example

>>> df.select(col("dob").smvMonth())
Returns:IntegerType. Month component as integer, or null if input column is null
Return type:(Column)
smvMonth70()[source]

Convert a Timestamp to the number of months from 1970-01-01

Example

>>> df.select(col("dob").smvMonth70())
Returns:IntegerType. Number of months from 1970-01-01 (start from 0)
Return type:(Column)
smvPlusDays(delta)[source]

Add N days to Timestamp or Date column

Parameters:delta (int or Column) – the number of days to add

Example

>>> df.select(col("dob").smvPlusDays(3))
Returns:
TimestampType. The incremented Timestamp, or null if input is null.
Note even if the input is DateType, the output is TimestampType
Return type:(Column)

Please note that although Spark’s date_add function does the similar thing, they are actually different.

  • Both can act on both Timestamp and Date types
  • smvPlusDays always returns Timestamp, while F.date_add always returns Date
smvPlusMonths(delta)[source]

Add N months to Timestamp or Date column

Parameters:delta (int or Column) – the number of months to add

Note

The calculation will do its best to only change the month field retaining the same day of month. However, in certain circumstances, it may be necessary to alter smaller fields. For example, 2007-03-31 plus one month cannot result in 2007-04-31, so the day of month is adjusted to 2007-04-30.

Example

>>> df.select(col("dob").smvPlusMonths(3))
Returns:
TimestampType. The incremented Timestamp, or null if input is null.
Note even if the input is DateType, the output is TimestampType
Return type:(Column)

Please note that although Spark’s add_months function does the similar thing, they are actually different.

  • Both can act on both Timestamp and Date types
  • smvPlusMonths always returns Timestamp, while F.add_months always returns Date
smvPlusWeeks(delta)[source]

Add N weeks to Timestamp or Date column

Parameters:delta (int or Column) – the number of weeks to add

Example

>>> df.select(col("dob").smvPlusWeeks(3))
Returns:
TimestampType. The incremented Timestamp, or null if input is null.
Note even if the input is DateType, the output is TimestampType
Return type:(Column)
smvPlusYears(delta)[source]

Add N years to Timestamp or Date column

Parameters:delta (int or Column) – the number of years to add

Example

>>> df.select(col("dob").smvPlusYears(3))
Returns:
TimestampType. The incremented Timestamp, or null if input is null.
Note even if the input is DateType, the output is TimestampType
Return type:(Column)
smvQuarter()[source]

Extract quarter component from a timestamp

Example

>>> df.select(col("dob").smvQuarter())
Returns:IntegerType. Quarter component as integer (1-based), or null if input column is null
Return type:(Column)
smvStrToTimestamp(fmt)[source]

Build a timestamp from a string

Parameters:fmt (string) – the format is the same as the Java Date format

Example

>>> df.select(col("dob").smvStrToTimestamp("yyyy-MM-dd"))
Returns:TimestampType. The converted Timestamp
Return type:(Column)
smvTimeToIndex()[source]

smvTime helper to convert smvTime column to time index integer

Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output 172, 551, 16405 (# of quarters, months, and days from 19700101)

smvTimeToLabel()[source]

smvTime helper to convert smvTime column to human readable form

Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output “2013-Q1”, “2015-12”, “2014-12-01”

smvTimeToTimestamp()[source]

smvTime helper to convert smvTime column to a timestamp at the beginning of the given time pireod.

Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output “2013-01-01 00:00:00.0”, “2015-12-01 00:00:00.0”, “2014-12-01 00:00:00.0”
smvTimeToType()[source]

smvTime helper to convert smvTime column to time type string

Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output type “quarter”, “month”, “day”

smvTimestampToStr(timezone, fmt)[source]

Build a string from a timestamp and timezone

Parameters:

Example

>>> df.select(col("ts").smvTimestampToStr("America/Los_Angeles","yyyy-MM-dd HH:mm:ss"))
Returns:StringType. The converted String with given format
Return type:(Column)
smvYear()[source]

Extract year component from a timestamp

Example

>>> df.select(col("dob").smvYear())
Returns:IntegerType. Year component as integer, or null if input column is null
Return type:(Column)
class smv.helpers.DataFrameHelper(df)[source]

Bases: object

peek(pos=1, colRegex='.*')[source]

Display a DataFrame row in transposed view

Parameters:
  • pos (integer) – the n-th row to display, default as 1
  • colRegex (string) – show the columns with name matching the regex, default as “.*”
Returns:

(None)

peekSave(path, pos=1, colRegex='.*')[source]

Write peek result to a file

Parameters:
  • path (string) – local file name to Write
  • pos (integer) – the n-th row to display, default as 1
  • colRegex (string) – show the columns with name matching the regex, default as “.*”
Returns:

(None)

selectByLabel(labels=None)[source]

Select columns whose metadata contains the specified labels

If the labels is empty, returns a DataFrame with all the unlabeled columns. Will throw if there are no columns that satisfy the condition.

Parameters:labels – (list(string)) a list of label strings for the columns to match

Example

>>> df.selectByLabel(["tag_1"])
Returns:the DataFrame with the selected columns
Return type:(DataFrame)
smvBinHist(*colWithBin)[source]

Print distributions on numerical columns with applying the specified bin size

Parameters:colWithBin (*tuple) – each tuple must be of size 2, where the first element is the column name, and the second is the bin size for that column

Example

>>> df.smvBinHist(("col_a", 1))
Returns:(None)
smvConcatHist(*cols)[source]

Display EDD histogram of a group of columns (joint distribution)

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvConcatHist("a", "b")
Returns:(None)
smvCountHist(keys, binSize)[source]

Print the distribution of the value frequency on specific columns

Parameters:
  • keys (list(string)) – the column names on which the EDD histogram is performed
  • binSize (integer) – the bin size for the histogram

Example

>>> df.smvCountHist(["k"], 1)
Returns:(None)
smvDedupByKey(*keys)[source]

Remove duplicate records from the DataFrame by arbitrarily selecting the first record from a set of records with same primary key or key combo.

Parameters:keys (*string or *Column) – the column names or Columns on which to apply dedup

Example

input DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
2 B C4
>>> df.smvDedupByKey("id")

output DataFrame:

id product Company
1 A C1
2 B C3
>>> df.smvDedupByKey("id", "product")

output DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
Returns:a DataFrame without duplicates for the specified keys
Return type:(DataFrame)
smvDedupByKeyWithOrder(*keys)[source]

Remove duplicated records by selecting the first record relative to a given ordering

The order is specified in another set of parentheses, as follows:

>>> def smvDedupByKeyWithOrder(self, *keys)(*orderCols)

Note

Same as the smvDedupByKey method, we use RDD groupBy in the implementation of this method to make sure we can handle large key space.

Parameters:keys (*string or *Column) – the column names or Columns on which to apply dedup

Example

input DataFrame:

id product Company
1 A C1
1 C C2
2 B C3
2 B C4
>>> df.smvDedupByKeyWithOrder(col("id"))(col("product").desc())

output DataFrame:

id product Company
1 C C2
2 B C3
Returns:a DataFrame without duplicates for the specified keys / order
Return type:(DataFrame)
smvDesc(*colDescs)[source]

Adds column descriptions

Parameters:colDescs (*tuple) – tuples of strings where the first is the column name, and the second is the description to add

Example

>>> df.smvDesc(("a", "description of col a"), ("b", "description of col b"))
Returns:the DataFrame with column descriptions added
Return type:(DataFrame)
smvDescFromDF(descDF)[source]

Adds column descriptions

Parameters:descDF (DataFrame) – a companion 2-column desciptionDF that has variable names as column 1 and corresponding variable descriptions as column 2

Example

>>> df.smvDescFromDF(desciptionDF)
Returns:the DataFrame with column descriptions added
Return type:(DataFrame)
smvDiscoverPK(n=10000)[source]

Find a column combination which uniquely identifies a row from the data

The resulting output is printed out

Note

The algorithm only looks for a set of keys which uniquely identifies the row. There could be more key combinations which can also be the primary key.

Parameters:n (integer) – number of rows the PK discovery algorithm will run on, defaults to 10000

Example

>>> df.smvDiscoverPK(5000)
Returns:(None)
smvDumpDF()[source]

Dump the schema and data of given df to screen for debugging purposes

Similar to show() method of DF from Spark 1.3, although the format is slightly different. This function’s format is more convenient for us and hence has remained.

Example

>>> df.smvDumpDF()
Returns:(None)
smvDupeCheck(keys, n=10000)[source]

For a given list of potential keys, check for duplicated records with the number of duplications and all the columns.

Null values are allowed in the potential keys, so duplication on Null valued keys will also be reported.

Parameters:
  • keys (list(string)) – the key column list which the duplicate check applied
  • n (integer) – number of rows from input data for checking duplications, defaults to 10000
Returns:

returns key columns + “_N” + the rest columns for the records with more key duplication records,

where “_N” has the count of duplications of the key values of that record

Return type:

(DataFrame)

smvEdd(*cols)[source]

Display EDD summary

Parameters:cols (*string) – column names on which to perform EDD summary

Example

>>> df.smvEdd("a", "b")
Returns:(None)
smvEddCompare(df2, ignoreColName)[source]

Compare 2 DFs by comparing their Edd Summary result

The result of the comparison is printed out.

Parameters:
  • df2 (DataFrame) – the DataFrame to compare with
  • ignoreColName (boolean) – specifies whether to ignore column names, default is false

Example

>>> df.smvEddCompare(df2)
>>> df.smvEddCompare(df2, true)
Returns:(None)
smvExpandStruct(*cols)[source]

Expand structure type column to a group of columns

Parameters:cols (*string) – column names to expand

Example

input DF:
[id: string, address: struct<state:string, zip:string, street:string>]
>>> df.smvExpandStruct("address")
output DF:
[id: string, state: string, zip: string, street: string]
Returns:DF with expanded columns
Return type:(DataFrame)
smvExportCsv(path, n=None)[source]

Export DataFrame to local file system

Parameters:
  • path (string) – relative path to the app running directory on local file system (instead of HDFS)
  • n (integer) – optional. number of records to export. default is all records

Note

Since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user’s responsibility to make sure that the DF is small enough to fit into local file system.

Example

>>> df.smvExportCsv("./target/python-test-export-csv.csv")
Returns:(None)
smvFreqHist(*cols)[source]

Print EDD histogram with frequency sorting

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvFreqHist("a")
Returns:(None)
smvGetDesc(colName=None)[source]

Returns column description(s)

Parameters:colName (string) – optional column name for which to get the description.

Example

>>> df.smvGetDesc("col_a")
>>> df.smvGetDesc()
Returns:description string of colName, if specified
Return type:(string)

or:

Returns:a list of (colName, description) pairs for all columns
Return type:(list(tuple))
smvGetLabel(colName=None)[source]

Returns a list of column label(s)

Parameters:colName (string) – optional column name for which to get the label.

Example

>>> df.smvGetLabel("col_a")
>>> df.smvGetLabel()
Returns:a list of label strings of colName, if specified
Return type:(list(string))

or:

Returns:a list of (colName, list(labels)) pairs for all columns
Return type:(list(tuple))
smvGroupBy(*cols)[source]

Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.

See [[org.tresamigos.smv.SmvGroupedDataFunc]] for list of functions that can be applied to the grouped data.

Parameters:cols (*string or *Column) – column names or Column objects to group on

Note

This is going away shortly and user will be able to use standard Spark groupBy method directly.

Example

>>> df.smvGroupBy(col("k"))
>>> df.smvGroupBy("k")
Returns:grouped data object
Return type:(SmvGroupedData)
smvHashSample(key, rate=0.01, seed=23)[source]

Sample the df according to the hash of a column

MurmurHash3 algorithm is used for generating the hash

Parameters:
  • key (string or Column) – column name or Column to sample on
  • rate (double) – sample rate in range (0, 1] with a default of 0.01 (1%)
  • seed (int) – random generator integer seed with a default of 23

Example

>>> df.smvHashSample(col("key"), rate=0.1, seed=123)
Returns:sampled DF
Return type:(DataFrame)
smvHist(*cols)[source]

Display EDD histogram

Each column’s histogram prints separately

Parameters:cols (*string) – The columns on which to perform EDD histogram

Example

>>> df.smvHist("a")
Returns:(None)
smvJoinByKey(other, keys, joinType, isNullSafe=False)[source]

joins two DataFrames on a key

The Spark DataFrame join operation does not handle duplicate key names. If both left and right side of the join operation contain the same key, the result DataFrame is unusable.

The smvJoinByKey method will allow the user to join two DataFrames using the same join key. Post join, only the left side keys will remain. In case of outer-join, the coalesce(leftkey, rightkey) will replace the left key to be kept.

Parameters:
  • other (DataFrame) – the DataFrame to join with
  • keys (list(string)) – a list of column names on which to apply the join
  • joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
  • isNullSafe (boolean) – if true matches null keys between left and right tables and keep in output. Default False.

Example

>>> df1.smvJoinByKey(df2, ["k"], "inner")
>>> df_with_null_key.smvJoinByKey(df2, ["k"], "inner", True)
Returns:result of the join operation
Return type:(DataFrame)
smvJoinMultipleByKey(keys, joinType='inner')[source]

Create multiple DF join builder

It is used in conjunction with joinWith and doJoin

Parameters:
  • keys (list(string)) – a list of column names on which to apply the join
  • joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]

Example

>>> df.joinMultipleByKey(["k1", "k2"], "inner").joinWith(df2, "_df2").joinWith(df3, "_df3", "leftouter").doJoin()
Returns:the builder object for the multi join operation
Return type:(SmvMultiJoin)
smvLabel(colNames, labels)[source]

Adds labels to the specified columns

A column may have multiple labels. Adding the same label twice to a column has the same effect as adding that label once.

For multiple colNames, the same set of labels will be added to all of them. When colNames is empty, the set of labels will be added to all columns of the df. labels parameters must be non-empty.

Parameters:
  • labels – (list(string)) a list of label strings to add
  • colNames – (list(string)) list of names of columns for which to add labels

Example

>>> df.smvLabel(["col_a", "col_b", "col_c"], ["tag_1", "tag_2"])
>>> df.smvLabel([], ["tag_1", "tag_2"])
Returns:the DataFrame with labels added to the specified columns
Return type:(DataFrame)

or:

Returns:the DataFrame with labels added to all columns
Return type:(DataFrame)
smvOverlapCheck(keyColName)[source]

For a set of DFs, which share the same key column, check the overlap across them

The other DataFrames are specified in another set of parentheses, as follows:

>>> df1.smvOverlapCheck(key)(*df)
Parameters:keyColName (string) – the column name for the key column

Examples

>>> df1.smvOverlapCheck("key")(df2, df3, df4)
output DF has 2 columns:
  • key
  • flag: a bit-string, e.g. 0110. Each bit represents whether the original DF has this key

It can be used with EDD to summarize on the flag:

>>> df1.smvOverlapCheck("key")(df2, df3).smvHist("flag")
Returns:the DataFrame with the key and flag columns
Return type:(DataFrame)
smvPrefixFieldNames(prefix)[source]

Apply a prefix to all column names in the given DataFrame.

Parameters:prefix (string) – prefix string to be added to all the column names

Example

>>> df.smvPrefixFieldNames("x_")

Above will add x_ to the beginning of every column name in the DataFrame. Please note that if the renamed column names over lap with existing columns, the method will error out.

Returns:(DataFrame)
smvRemoveDesc(*colNames)[source]

Removes description for the given columns from the Dataframe

Parameters:colNames (*string) – names of columns for which to remove the description

Example

>>> df.smvRemoveDesc("col_a", "col_b")
>>> df.smvRemoveDesc()
Returns:the DataFrame with column descriptions removed
Return type:(DataFrame)

or:

Returns:the DataFrae with all column descriptions removed
Return type:(DataFrame)
smvRemoveLabel(colNames=None, labels=None)[source]

Removes labels from the specified columns

For multiple colNames, the same set of labels will be removed from all of them. When colNames is empty, the set of labels will be removed from all columns of the df. When labels is empty, all labels will be removed from the given columns.

If neither columns nor labels are specified, i.e. both parameter lists are empty, then all labels are removed from all columns in the data frame, essentially clearing the label meta data.

Parameters:
  • labels – (list(string)) a list of label strings to remove
  • colNames – (list(string)) list of names of columns for which to remove labels

Example

>>> df.smvRemoveLabel(["col_a"], ["tag_1"])
>>> df.smvRemoveLabel()
Returns:the DataFrame with specified labels removed from the specified columns
Return type:(DataFrame)

or:

Returns:the DataFrame with all label meta data cleared
Return type:(DataFrame)
smvRenameField(*namePairs)[source]

Rename one or more fields of a DataFrame

Parameters:namePairs (*tuple) – tuples of strings where the first is the source column name, and the second is the target column name

Example

>>> df.smvRenameField(("a", "aa"), ("c", "cc"))
Returns:the DataFrame with renamed fields
Return type:(DataFrame)
smvSelectMinus(*cols)[source]

Remove one or more columns from current DataFrame

Parameters:cols (*string or *Column) – column names or Columns to remove from the DataFrame

Example

>>> df.smvSelectMinus("col1", "col2")
>>> df.smvSelectMinus(col("col1"), col("col2"))
Returns:the resulting DataFrame after removal of columns
Return type:(DataFrame)
smvSelectPlus(*cols)[source]

Selects all the current columns in current DataFrame plus the supplied expressions

The new columns are added to the end of the current column list.

Parameters:cols (*Column) – expressions to add to the DataFrame

Example

>>> df.smvSelectPlus((col("price") * col("count")).alias("amt"))
Returns:the resulting DataFrame after removal of columns
Return type:(DataFrame)
smvSkewJoinByKey(other, joinType, skewVals, key)[source]

Join that leverages broadcast (map-side) join of rows with skewed (high-frequency) values

Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results.

Parameters:
  • other (DataFrame) – DataFrame to join with
  • joinType (str) – name of type of join (e.g. “inner”)
  • skewVals (list(object)) – list of skewed values
  • key (str) – key on which to join (also the Column with the skewed values)

Example

>>> df.smvSkewJoinByKey(df2, "inner", [4], "cid")

will broadcast join the rows of df1 and df2 where col(“cid”) == “4” and join the remaining rows of df1 and df2 without broadcast join.

Returns:the result of the join operation
Return type:(DataFrame)
smvUnion(*dfothers)[source]

Unions DataFrames with different number of columns by column name and schema

Spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns.

Parameters:dfOthers (*DataFrame) – the dataframes to union with

Example

>>> df.smvUnion(df2, df3)
Returns:the union of all specified DataFrames
Return type:(DataFrame)
smvUnpivot(*cols)[source]

Unpivot the selected columns

Given a set of records with value columns, turns the value columns into value rows.

Parameters:cols (*string) – the names of the columns to unpivot

Example

input DF:

id X Y Z
1 A B C
2 D E F
3 G H I
>>> df.smvUnpivot("X", "Y", "Z")

output DF:

id column value
1 X A
1 Y B
1 Z C
3 Y H
3 Z I
Returns:the unpivoted DataFrame
Return type:(DataFrame)
smvUnpivotRegex(cols, colNameFn, indexColName)[source]

Unpivot the selected columns using the specified regex

Parameters:
  • cols (*string) – the names of the columns to unpivot
  • colNameFn (string) – a regex representing the function to be applied when unpivoting
  • indexColName (string) – the name of the index column to be created

Example

input DF:

id A_1 A_2 B_1 B_2
1 1_a_1 1_a_2 1_b_1 1_b_2
2 2_a_1 2_a_2 2_b_1 2_b_2
>>> df.smvUnpivotRegex( ["A_1", "A_2", "B_1", "B_2"], "(.*)_(.*)", "index" )

output DF:

id index A B
1 1 1_a_1 1_b_1
1 2 1_a_2 1_b_2
2 1 2_a_1 2_b_1
2 2 2_a_2 2_b_2
Returns:the unpivoted DataFrame
Return type:(DataFrame)
smvWithLabel(labels=None)[source]

Returns all column names in the data frame that contain all the specified labels

If the labels is empty, returns all unlabeled columns in the data frame. Will throw if there are no columns that satisfy the condition.

Parameters:labels – (list(string)) a list of label strings for the columns to match

Example

>>> df.smvWithLabel(["tag_1", "tag_2"])
Returns:a list of column name strings that match the specified labels
Return type:(list(string))
topNValsByFreq(n, col)[source]

Get top N most frequent values in Column col

Parameters:
  • n (int) – maximum number of values
  • col (Column) – which column to get values from

Example

>>> df.topNValsByFreq(1, col("cid"))

will return the single most frequent value in the cid column

Returns:most frequent values (type depends on schema)
Return type:(list(object))
class smv.helpers.SmvGroupedData(df, keys, sgd)[source]

Bases: object

The result of running smvGroupBy on a DataFrame. Implements special SMV aggregations.

smvDecile(value_cols, ignoreNull=True)[source]

Compute deciles of some columns on a grouped data

Simply an alias to smvQuantile(value_cols, 10, ignoreNull)

Parameters:
  • value_cols (list(str)) – columns to calculate deciles on
  • ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.
smvFillNullWithPrevValue(*orderCols)[source]

Fill in Null values with “previous” value according to an ordering

Examples

Given DataFrame df representing the table

K T V
a 1 null
a 2 a
a 3 b
a 4 null

we can use

>>> df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V")

to produce the result

K T V
a 1 null
a 2 a
a 3 b
a 4 b
Returns:result of fill nulls with previous value
Return type:(Dataframe)
smvPercentRank(value_cols, ignoreNull=True)[source]

Compute the percent rank of a sequence of columns within a group in a given DataFrame.

Used Spark’s percentRank window function. The precent rank is defined as R/(N-1), where R is the base 0 rank, and N is the population size. Under this definition, min value (R=0) has percent rank 0.0, and max value has percent rank 1.0.

For each column for which the percent rank is computed (e.g. “v”), an additional column is added to the output, v_pctrnk

All other columns in the input are untouched and propagated to the output.

Parameters:
  • value_cols (list(str)) – columns to calculate percentRank on
  • ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.

Example

>>> df.smvGroupBy('g, 'g2).smvPercentRank(["v1", "v2", "v3"])
smvPivot(pivotCols, valueCols, baseOutput)[source]

smvPivot adds the pivoted columns without additional aggregation. In other words N records in, N records out

SmvPivot for Pivot Operations:

Pivot Operation on DataFrame that transforms multiple rows per key into a columns so they can be aggregated to a single row for a given key while preserving all the data variance.

Example

For input data below:

id month product count
1 5/14 A 100
1 6/14 B 200
1 5/14 B 300

We would like to generate a data set to be ready for aggregations. The desired output is:

id count_5_14_A count_5_14_B count_6_14_A count_6_14_B
1 100 NULL NULL NULL
1 NULL NULL NULL 200
1 NULL 300 NULL NULL
The raw input is divided into three parts.
  1. key column: part of the primary key that is preserved in the output.
    That would be the id column in the above example.
  2. pivot columns: the columns whose row values will become the new column names.
    The cross product of all unique values for each column is used to generate the output column names.
  3. value column: the value that will be copied/aggregated to corresponding output
    column. count in our example.

Example code:

>>> df.smvGroupBy("id").smvPivot([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"])
Parameters:
  • pivotCols (list(list(str))) – specify the pivot columns, on above example, it is [[‘month, ‘product]]. If [[‘month’], [‘month’, ‘product’]] is used, the output columns will have “count_5_14” and “count_6_14” as addition to the example.
  • valueCols (list(string)) – names of value columns which will be prepared for aggregation
  • baseOutput (list(str)) – expected names of the pivoted column In above example, it is [“5_14_A”, “5_14_B”, “6_14_A”, “6_14_B”]. The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns. If an empty sequence is provided, the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)
Returns:

result of pivot operation

Return type:

(DataFrame)

smvPivotCoalesce(pivotCols, valueCols, baseOutput)[source]

Perform SmvPivot, then coalesce the output Please refer smvPivot’s document for context and details of the SmvPivot operation.

Parameters:
  • pivotCols (list(list(str))) – list of lists of column names to pivot
  • valueCols (list(string)) – names of value columns to coalesce
  • baseOutput (list(str)) – expected names pivoted column

Examples

For example, given a DataFrame df that represents the table

k p v
a c 1
a d 2
a e null
a f 5

we can use

>>> df.smvGroupBy("k").smvPivotCoalesce([['p']], ['v'], ['c', 'd', 'e', 'f'])

to produce the following output

k v_c v_d v_e v_f
a 1 2 null 5
Returns:result of pivot coalesce
Return type:(Dataframe)
smvPivotSum(pivotCols, valueCols, baseOutput)[source]

Perform SmvPivot, then sum the results. Please refer smvPivot’s document for context and details of the SmvPivot operation.

Parameters:
  • pivotCols (list(list(str))) – list of lists of column names to pivot
  • valueCols (list(string)) – names of value columns to sum
  • baseOutput (list(str)) – expected names pivoted column

Examples

For example, given a DataFrame df that represents the table

id month product count
1 5/14 A 100
1 6/14 B 200
1 5/14 B 300

we can use

>>> df.smvGroupBy("id").smvPivotSum([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"])

to produce the following output

id count_5_14_A count_5_14_B count_6_14_A count_6_14_B
1 100 300 NULL 200
Returns:result of pivot sum
Return type:(DataFrame)
smvQuantile(value_cols, bin_num, ignoreNull=True)[source]

Compute the quantile bin numbers within a group in a given DataFrame.

Estimate quantiles and quantile groups given a data with unknown distribution is quite arbitrary. There are multiple ‘definitions’ used in different softwares. Please refer https://en.wikipedia.org/wiki/Quantile#Estimating_quantiles_from_a_sample for details.

smvQuantile calculated from Spark’s percentRank. The algorithm is equavalent to the one labled as R-7, Excel, SciPy-(1,1), Maple-6 in above wikipedia page. Please note it is slight different from SAS’s default algorithm (labled as SAS-5).

Returned quantile bin numbers are 1 based. For example when bin_num=10, returned values are integers from 1 to 10, inclusively.

For each column for which the quantile is computed (e.g. “v”), an additional column is added to the output, “v_quantile”.

All other columns in the input are untouched and propagated to the output.

Parameters:
  • value_cols (list(str)) – columns to calculate quantiles on
  • bin_num (int) – number of quantiles, e.g. 4 for quartile, 10 for decile
  • ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.

Example

>>> df.smvGroupBy('g, 'g2).smvQuantile(["v"], 100)
smvRePartition(numParts)[source]

Repartition SmvGroupedData using specified partitioner on the keys. A HashPartitioner with the specified number of partitions will be used.

This method is used in the cases that the key-space is very large. In the current Spark DF’s groupBy method, the entire key-space is actually loaded into executor’s memory, which is very dangerous when the key space is big. The regular DF’s repartition function doesn’t solve this issue since a random repartition will not guaranteed to reduce the key-space on each executor. In that case we need to use this function to linearly reduce the key-space.

Example:

>>> df.smvGroupBy("k1", "k2").smvRePartition(32).agg(sum("v") as "v")
smvTimePanelAgg(time_col, start, end)[source]

Apply aggregation on given keys and specified time panel period

Parameters:
  • time_col (str) – the column name in the data as the event timestamp
  • start (panel.PartialTime) – could be Day, Month, Week, Quarter, refer the panel package for details
  • end (panel.PartialTime) – should be the same time type as the “start”
  • addMissingTimeWithNull (default true) – fill null records

Both start and end PartialTime are inclusive.

Example:

>>> df.smvGroupBy("K").smvTimePanelAgg("TS", Week(2012, 1, 1), Week(2012, 12, 31))(
        sum("V").alias("V")
    )
Returns:
a result data frame with keys, and a column with name smvTime, and
aggregated values. Refer the panel package for the potential forms of different PartialTimes
Return type:(DataFrame)

When addMissingTimeWithNull is true, the aggregation should be always on the variables instead of on literals (should NOT be count(lit(1))).

Example with on data:

Given DataFrame df as

K TS V
1 2012-01-01 1.5
1 2012-03-01 4.5
1 2012-07-01 7.5
1 2012-05-01 2.45

after applying

>>> import smv.panel as P
>>> df.smvGroupBy("K").smvTimePanelAgg("TS", P.Quarter(2012, 1), P.Quarter(2012, 2))(
        sum("V").alias("V")
    )

the result is

K smvTime V
1 Q201201 6.0
1 Q201202 2.45
smvTopNRecs(maxElems, *cols)[source]

For each group, return the top N records according to a given ordering

Example

>>> df.smvGroupBy("id").smvTopNRecs(3, col("amt").desc())

This will keep the 3 largest amt records for each id

Parameters:
  • maxElems (int) – maximum number of records per group
  • cols (*str) – columns defining the ordering
Returns:

result of taking top records from groups

Return type:

(DataFrame)

smvWithTimePanel(time_col, start, end)[source]

Adding a specified time panel period to the DF

Parameters:
  • time_col (str) – the column name in the data as the event timestamp
  • start (panel.PartialTime) – could be Day, Month, Week, Quarter, refer the panel package for details
  • end (panel.PartialTime) – should be the same time type as the “start”
  • addMissingTimeWithNull (boolean) – Default True. when some PartialTime is missing whether to fill null records
Returns:

a result data frame with keys, and a column with name smvTime, and

other input columns. Refer the panel package for the potential forms of different PartialTimes.

Return type:

(DataFrame)

Note

Since TimePanel defines a period of time, if for some group in the data there are missing Months (or Quarters), when addMissingTimeWithNull is true, this function will add records with non-null keys and all possible smvTime columns with all other columns null-valued.

Example

Given DataFrame df as

K TS V
1 2014-01-01 1.2
1 2014-03-01 4.5
1 2014-03-25 10.3

after applying

>>> import smv.panel as P
>>> df.smvGroupBy("k").smvWithTimePanel("TS", P.Month(2014,1), P.Month(2014, 3))

the result is

K TS V smvTime
1 2014-01-01 1.2 M201401
1 2014-03-01 4.5 M201403
1 2014-03-25 10.3 M201403
1 None None M201401
1 None None M201402
1 None None M201403
class smv.helpers.SmvMultiJoin(sqlContext, mj)[source]

Bases: object

Wrapper around Scala’s SmvMultiJoin

doJoin(dropextra=False)[source]

Trigger the join operation

Parameters:dropExtra (boolean) – default false, which will keep all duplicated name columns with the postfix. when true, the duplicated columns will be dropped

Example

>>> joindf.doJoin()
Returns:result of executing the join operation
Return type:(DataFrame)
joinWith(df, postfix, jointype=None)[source]

Append SmvMultiJoin Chain

Parameters:
  • df (DataFrame) – the DataFrame to join with
  • postfix (string) – postfix to use when renaming join columns
  • jointype (string) – optional jointype. if not specified, conf.defaultJoinType is used. choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]

Example

>>> joindf = df1.smvJoinMultipleByKey(['a'], 'inner').joinWith(df2, '_df2').joinWith(df3, '_df3', 'outer')
Returns:formula of the join. need to call doJoin() on it to execute
Return type:(SmvMultiJoin)
smv.helpers.init_helpers()[source]

smv.historical_validators module

class smv.historical_validators.SmvHistoricalValidator(*args)[source]

Bases: object

Base of all user defined historical validator rules.

Derived classes must override the metadata`, and validateMetadata methods.

metadata(df)[source]

identical interface to standard dataset metadata method

validateMetadata(cur, hist)[source]

identical interface to standard dataset validateMetadata method

smv.historical_validators.SmvHistoricalValidators(*validators)[source]

Decorator to specify set of validators to apply to an SmvGenericModule.

Each validator should specify the standard metadata and validateMetadata methods. In addition, each validator will have a unique key defined that will determine how it is stored in the final metadata structure. This decorator will take care of composing the union of all metadata from validators and decomposing the full metadata structure into individual pieces for each validator (hint: the _key() method of each validator is used to key into the final metadata structure)

smv.jprops module

smv.jprops.iter_properties(fh, comments=False)[source]

Incrementally read properties from a Java .properties file.

Yields tuples of key/value pairs.

If comments is True, comments will be included with jprops.COMMENT in place of the key.

Parameters:
  • fh – a readable file-like object
  • comments – should include comments (default: False)
smv.jprops.load_properties(fh, mapping=<class 'dict'>)[source]

Reads properties from a Java .properties file.

Returns a dict (or provided mapping) of properties.

Parameters:
  • fh – a readable file-like object
  • mapping – mapping type to load properties into
smv.jprops.store_properties(fh, props, comment=None, timestamp=True)[source]

Writes properties to the file in Java properties format.

Parameters:
  • fh – a writable file-like object
  • props – a mapping (dict) or iterable of key/value pairs
  • comment – comment to write to the beginning of the file
  • timestamp – boolean indicating whether to write a timestamp comment
smv.jprops.write_comment(fh, comment)[source]

Writes a comment to the file in Java properties format.

Newlines in the comment text are automatically turned into a continuation of the comment by adding a “#” to the beginning of each line.

Parameters:
  • fh – a writable file-like object
  • comment – comment string to write
smv.jprops.write_property(fh, key, value)[source]

Write a single property to the file in Java properties format.

Parameters:
  • fh – a writable file-like object
  • key – the key to write
  • value – the value to write

smv.matcher module

smv.matcher.ExactLogic(colName, expr)[source]

Level match with exact logic

Parameters:
  • colName (string) – level name used in the output DF
  • expr (Column) – match logic colName

Example

>>> ExactLogic("First_Name_Match", col("first_name") == col("_first_name"))
Returns:(ExactLogic)
smv.matcher.ExactMatchPreFilter(colName, expr)[source]

Specify the top-level exact match

Parameters:
  • colName (string) – level name used in the output DF
  • expr (Column) – match logic condition Column

Example

>>> ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name"))
Returns:(ExactMatchPreFilter)
smv.matcher.FuzzyLogic(colName, predicate, valueExpr, threshold)[source]

Level match with fuzzy logic

Parameters:
  • colName (string) – level name used in the output DF
  • predicate (Column) – a condition column, no match if this condition evaluates as false
  • valueExpr (Column) – a value column, which typically return a score. Higher score means higher chance of matching
  • threshold (float) – No match if the evaluated valueExpr < threshold

Example

>>> FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
Returns:(FuzzyLogic)
smv.matcher.GroupCondition(expr)[source]

Specify the shared matching condition of all the levels (except the top-level exact match)

Parameters:expr (Column) – shared matching condition

Note

expr should be in “left == right” form so that it can really optimize the process by reducing searching space

Example

>>> GroupCondition(soundex("first_name") == soundex("_first_name"))
Returns:(GroupCondition)
smv.matcher.NoOpGroupCondition()[source]

Always returns None

Returns:(None)
smv.matcher.NoOpPreFilter()[source]

Always returns None

Returns:(None)
class smv.matcher.SmvEntityMatcher(leftId, rightId, exactMatchFilter, groupCondition, levelLogics)[source]

Bases: object

Perform multiple level entity matching with exact and/or fuzzy logic

Parameters:
  • leftId (string) – id column name of left DF (df1)
  • rightId (string) – id column name of right DF (df2)
  • exactMatchFilter (ExactMatchPreFilter) – exact match condition, if records matched no further tests will be performed
  • groupCondition (GroupCondition) – for exact match leftovers, a deterministic condition to narrow down the search space
  • levelLogics (list(ExactLogic or FuzzyLogic)) – A list of level match conditions (always weaker than exactMatchFilter), all of them will be tested

Example

code:

SmvEntityMatcher("id", "_id",
    ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")),
    GroupCondition(soundex("first_name") == soundex("_first_name")),
    [
        ExactLogic("First_Name_Match", col("first_name") == col("_first_name")),
        FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
    ]
)
doMatch(df1, df2, keepOriginalCols=True)[source]

Apply SmvEntityMatcher to the 2 DataFrames

Parameters:
  • df1 (DataFrame) – DataFrame 1 with an id column with name “id”
  • df2 (DataFrame) – DataFrame 2 with an id column with name “id”
  • keepOriginalCols (boolean) – whether to keep all input columns of df1 and df2, defaults to true

Example

code:

SmvEntityMatcher("id", "_id",
    ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")),
    GroupCondition(soundex("first_name") == soundex("_first_name")),
    [
        ExactLogic("First_Name_Match", col("first_name") == col("_first_name")),
        FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
    ]
).doMatch(df1, df2, False)
Returns:a DataFrame with df1’s id and df2’s id and match flags of all the levels. For levels with fuzzy logic, the matching score is also provided. A column named “MatchBitmap” also provided to summarize all the matching flags. When keepOriginalCols is true, input columns are also kept
Return type:(DataFrame)

smv.modulesvisitor module

class smv.modulesvisitor.ModulesVisitor(roots)[source]

Bases: object

Provides way to do depth and breadth first visit to the sub-graph of modules given a set of roots

bfs_visit(action, state, need_to_run_only=False)[source]

Breadth first visit

dfs_visit(action, state, need_to_run_only=False)[source]

Depth first visit

modules_needed_for_run

smv.panel module

SMV PartialTime and TimePanel Framework

This module defines the most used PartialTime including Day, Month, Week, Quarter, and the TimePanel

PartialTime is the base class of
  • Day
  • Month
  • Week
  • Quarter
PartialTime has the following methods
  • smvTime(): returns (str), will be the value of the smvTime column if added to a DF
  • timeIndex(): returns (int), an integer incremental by one as the PartialTime increase
    by one unit
  • timeLabel(): returns (str), a human readable string
  • timeType(): returns (str), the type of the PartialTime
smv.panel.Day(year, month, day)[source]

Define an smv.panel.Day

Day extends smv.panel.PartialTime base class

Parameters:
  • year (int) –
  • month (int) –
  • day (int) –

Example

>>> d = Day(2012, 5, 31)
>>> d.smvTime()
u'D20120531'
>>> d.timeIndex()
15491
>>> d.timeLabel()
u'2012-05-31'
>>> d.timeType()
u'day'
Returns:(java object smv.panel.Day)
smv.panel.Month(year, month)[source]

Define an smv.panel.Month

Month extends smv.panel.PartialTime base class

Parameters:
  • year (int) –
  • month (int) –

Example

>>> m = Month(2012, 5)
>>> m.smvTime()
u'M201205'
>>> m.timeIndex()
508
>>> m.timeLabel()
u'2012-05'
>>> m.timeType()
u'month'
Returns:(java object smv.panel.Month)
smv.panel.Quarter(year, quarter)[source]

Define an smv.panel.Quarter

Quarter extends smv.panel.PartialTime base class

Parameters:
  • year (int) –
  • quarter (int) –

Example

>>> q = Quarter(2012, 1)
>>> q.smvTime()
u'Q201201'
>>> q.timeIndex()
168
>>> q.timeLabel()
u'2012-Q1'
>>> q.timeType()
u'quarter'
Returns:(java object smv.panel.Quarter)
smv.panel.TimePanel(start, end)[source]

Define an smv.panel.TimePanel

TimePanel is a consecutive range of PartialTimes It has a “start” PartialTime and “end” PartialTime, both are inclusive. “start” and “end” have to have the same timeType
Parameters:
  • start (java object smv.PartialTime) – Quarter, Month, Day etc.
  • end (java object smv.PartialTime) – Quarter, Month, Day etc.

Example

>>> tp = TimePanel(Day(2012, 1, 1), Day(2013, 12, 31))
Returns:(java object smv.panel.TimePanel)
smv.panel.Week(year, month, day, start_on='Monday')[source]

Define an smv.panel.Week

Week extends smv.panel.PartialTime base class

Parameters:
  • year (int) –
  • month (int) –
  • day (int) –
  • start_on (str) – Week starts on, valid values: Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday. Default value is Monday

Example

>>> w = Week(2012, 3, 4)
>>> w.smvTime()
u'W20120227'
>>> w.timeIndex()
2200
>>> w.timeLabel()
u'Week of 2012-02-27'
>>> w.timeType()
u'week'
>>> w = Week(2012, 3, 4, "Sunday")
>>> w.timeType()
u'week_start_on_Sunday'
>>> w.smvTime()
u'W(7)20120304'
>>> w.timeIndex()
2201
>>> w.timeLabel()
u'Week of 2012-03-04'
Returns:(java object smv.panel.Week)

smv.provider module

SMV Provider API

This module allows user to declare/register providers.

class smv.provider.SmvProvider[source]

Bases: object

Base class of all provider classes.

Each provider must inherit from this class and also define provider_type() static method that returns the type as a string.

Note: there is no version agnostic way to enforce derived classes to implement a static provider_type() so the check will be done dynamically (runtime)

IS_PROVIDER = True
static provider_type()[source]
classmethod provider_type_fqn()[source]

create a hierarchichal provider type fqn for a given provider class based on the provider class hierarchy.

Example (assume provider_type() for class X is X):
class A(SmvProvider) class B(A) class C(B)

In the above example, C’s provider type is just C but C’s provider_type_fqn is “A.B.C”

smv.runconfig module

SMV User Run Configuration Parameters

This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.

class smv.runconfig.SmvRunConfig[source]

Bases: object

DEPRECATED

Run config accessor methods have been absorbed by SmvGenericModule, so SmvRunConfig is maintained to support existing projects. SmvRunConfig’s influence on the dataset hash is preserved so that modules do not have to transition overnight to using SmvGenericModule.requiresConfig in order for the config to influence the dataset hash.

smv.runinfo module

Easy Python access to SmvRunInfoCollector and related Scala classes.

Todo

  • document example use
class smv.runinfo.SmvRunInfo(meta, metahist)[source]

Bases: object

collection of a module’s running info with:

  • metadata
  • metahistory
class smv.runinfo.SmvRunInfoCollector[source]

Bases: object

A list of SmvRunInfos from a run transaction, and methods to help reporting on them

add_runinfo(fqn, meta, meta_hist)[source]
dqm_state(ds_name)[source]

Returns the DQM state for a given dataset

Returns:A dictionary representation of the dqm state
Raises:py4j.protocol.Py4JError – if there is java call error or there is no validation result or dqm state for the specified dataset (e.g. caused by a typo in the name)
dqm_validation(ds_name)[source]

Returns the DQM validation result for a given dataset

Returns:A dictionary representation of the dqm validation result
Raises:py4j.protocol.Py4JError – if there is java call error or there is no validation result for the specified dataset (e.g. caused by a typo in the name)
fqns()[source]

Returns a list of FQNs for all datasets that ran

metadata(ds_name)[source]

Returns the metadata for a given dataset as a dict

metadata_history(ds_name)[source]

Returns the metadata history for a given dataset as a list(dict)

show_report(ds_name=None, show_history=False)[source]

Print detailed report of information collected

Parameters:
  • ds_name (str) – report only of named ds if not None
  • show_history (bool) – include metadata history in report if True (default False)

smv.schema_meta_ops module

SMV Schema Meta Operations

Provides helper functions for SmvDesc and SmvLabel operations

class smv.schema_meta_ops.SchemaMetaOps(df)[source]

Bases: object

addDesc(*colDescs)[source]

Adds column descriptions

addLabel(colNames, labels)[source]

Adds labels to the specified columns

If colNames are empty, adds the same set of labels to all columns

colsWithLabel(labels=None)[source]

Returns all column names in the data frame that contain all the specified labels

If labels are empty, returns names of unlabeled columns

getDesc(colName)[source]

Returns column description(s)

If colName is empty, returns descriptions for all columns

getLabel(colName)[source]

Returns a list of column label(s)

If colName is empty, returns labels for all columns

removeDesc(*colNames)[source]

Removes description for the given columns from the Dataframe

If colNames are empty, removes descriptions of all columns

removeLabel(colNames=None, labels=None)[source]

Removes labels from the specified columns

If colNames are empty, removes the same set of labels from all columns If labels are empty, removes all labels from the given columns If they are both empty, removes all labels from all columns

smv.smvapp module

SmvApp entry class This module provides the main SMV Python entry point SmvPy class and a singleton smvApp. It is equivalent to SmvApp on Scala side

class smv.smvapp.SmvApp(arglist, _sparkSession, py_module_hotload=True)[source]

Bases: object

The Python representation of SMV.

Its singleton instance is created later in the containing module and is named smvApp

Adds java_imports to the namespace in the JVM gateway in SparkContext (in pyspark). It also creates an instance of SmvPyClient.

SEMI_PRIVATE_LIB_PREFIX = 'dsalib'
SRC_LIB_PATH = 'library'
SRC_PROJECT_PATH = 'src/main/python'
abs_path_for_project_path(project_path)[source]
all_data_dirs()[source]

All the config data dirs as an object. Could be dynamic, so calculate each time when use

appDir()[source]
appId()[source]
appName()[source]
copyToHdfs(fileobj, destination)[source]

Copies the content of a file object to an HDFS location.

Parameters:
  • fileobj (file object) – a file-like object whose content is to be copied, such as one returned by open(), or StringIO
  • destination (str) – specifies the destination path in the hadoop file system

The file object is expected to have been opened in binary read mode.

The file object is closed when this function completes.

createDF(schema, data=None)[source]
createDFWithLogger(schema, data, readerLogger)[source]
classmethod createInstance(arglist, _sparkSession, py_module_hotload=True)[source]

Create singleton instance. Also returns the instance.

defaultCsvWithHeader()[source]
defaultTsv()[source]
defaultTsvWithHeader()[source]
discoverSchemaAsSmvSchema(path, csvAttributes, n=100000)[source]

Discovers the schema of a .csv file and returns a Scala SmvSchema instance

path — path to csvfile n — number of records used to discover schema (optional) csvAttributes — Scala CsvAttributes instance (optional)

exception_handling()[source]

Decorator function to catch Py4JJavaError and raise SmvDqmValidationError if any. Otherwise just pass through the original exception

getConf(key)[source]
getCurrentProperties()[source]

Python dict of current megred props defaultProps ++ appConfProps ++ homeConfProps ++ usrConfProps ++ cmdLineProps ++ dynamicRunConfig Where right wins out in map merge.

getDsHash(name)[source]

The current hashOfHash for the named module as a hex string

Parameters:
  • name (str) – The uniquen name of a module. Does not have to be the FQN.
  • runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the correct hash.
Returns:

The hashOfHash of the named module

Return type:

(str)

getFileNamesByType(ftype)[source]

Return a list of file names which has the postfix ftype

classmethod getInstance()[source]
getMetadataHistoryJson(fqn)[source]

Returns the metadata history for a given fqn

getMetadataJson(fqn)[source]

Returns the metadata for a given fqn

getModuleResult(fqn, forceRun=False)[source]

Run module and get its result, which may not be a DataFrame

getRunInfo(fqn)[source]

Returns the run information of a module and all its dependencies from the last run.

Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run.

If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules.

Parameters:
  • fqn (str) – fqn of target module
  • runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the info.
Returns:

SmvRunInfoCollector

getRunInfoByPartialName(name)[source]

Returns the run information of a module and all its dependencies from the last run.

Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run.

If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules.

Parameters:name (str) – unique suffix to fqn of target module
Returns:SmvRunInfoCollector
getSchemaByDataFileAsSmvSchema(data_file_name, path=None)[source]

Get the schema of a data file from its path and returns a Scala SmvSchema instance. The result will be None if the corresponding schema file does not exist or is invalid.

getStageFromModuleFqn(fqn)[source]

Returns the stage name for a given fqn

get_all_connection_names()[source]

Get all connetions defined in the app, return a list of names

get_connection_by_name(name)[source]

Get connection instance from name

get_graph_json()[source]

Generate a json string representing the dependency graph.

get_module_state_json(fqns)[source]

Generate a json string for modules’ needToRun state of the app

Parameters:fqns (list(string)) – module fqn list to get state for
Returns:json string. E.g. {“stage.mymod”: {“needsToRun” : True}}
Return type:(string)
get_need_to_run(roots, keep_roots=False)[source]

Given a list of target modules to run, return a list of modules which will run and be persisted in the order of how they should run. This is a sub-set of modules_needed_for_run, but only keep the non-ephemeral and not-persisted-yet modules. Please note that some of the roots may not be in this list, to include all roots, set keep_roots to True

get_provider_by_fqn(fqn)[source]

Return provider class from provider name fqn

get_providers_by_prefix(fqn_prefix)[source]

Discover all providers in user lib and smv lib that have an provider type fqn starting with the given prefix.

inputDir()[source]
input_connection()[source]
jdbcDriver()[source]
jdbcUrl()[source]
load_single_ds(fqn)[source]

Return ds from fqn

outputDir()[source]
prependDefaultDirs()[source]

Ensure that mods in src/main/python and library/ are discoverable. If we add more default dirs, we’ll make this a set

prepend_source(project_path)[source]
publishModuleToHiveByName(**kwargs)[source]
quickRunModule(**kwargs)[source]
refresh_provider_cache()[source]

Re-discover providers and set provider_cache

removeDefaultDirs()[source]

The cleanup version of prependDefaultDirs

remove_source(project_path)[source]
run()[source]
runModule(**kwargs)[source]
runModuleByName(**kwargs)[source]
scalaNone()[source]

Returns a Scala None value

scalaOption(val)[source]

Returns a Scala Option containing the value

semiLibs()[source]

Use introspection to determine list of availabe semiprivate libs.

setAppDir(appDir)[source]

SMV’s equivalent of ‘cd’ for app dirs.

setDynamicRunConfig(runConfig)[source]
classmethod setInstance(app)[source]

Set the singleton instance.

smvLibs()[source]

Use introspection to determine list of availabe smv builtin libs.

smvVersion()[source]
stages()[source]

Stages is a function as they can be set dynamically on an SmvApp instance

userLibs()[source]

Use introspection to determine list of availabe user libs.

smv.smvappinfo module

class smv.smvappinfo.SmvAppInfo(smvApp)[source]

Bases: object

Provides SmvApp module list, dependency graph etc. for shell and plot. This class is mainly for CLI and GUI. The functions are not core to SmvApp.

create_graph_dot()[source]

Create graphviz dot graph string for the whole app

create_graph_json()[source]

Create dependency graph Json string Dependency graph does not have modules’ state info

create_module_state_json(fqns)[source]

Create all modules needToRun state Json string

ls(stage=None)[source]

List all modules, under their stages

ls_ancestors(mname)[source]

List given module’s ancestors, under their stages

ls_dead(stage=None)[source]

List all dead modules, under their stages

ls_descendants(mname)[source]

List given module’s descendants, under their stages

ls_stage()[source]

list all stage names

smv.smvconfig module

Handle command line args and props files

class smv.smvconfig.SmvConfig(arglist, _jvm)[source]

Bases: object

Smv configurations Including:

  • command line parsing
  • read in property files
  • dynamic configuration handling
all_data_dirs()[source]

Create all the data dir configs

app_id()[source]
app_name()[source]
df_persist_format()[source]

Spark DF’s default persisted format. Available values:

  • smvcsv_on_hdfs
  • parquet_on_hdfs (default)
force_edd()[source]
get_run_config(key)[source]

Run config will be accessed within client modules. Return run-config value of the given key.

2 possible sources of run-config:
  • dynamic_props (which passed in by client code)
  • props files/command-line parameters
get_run_config_keys()[source]

Return all the run-config keys

infer_stage_full_name(part_name)[source]

For a given partial stage name, infer full stage name

merged_props()[source]

All the props (static + dynamic)

read_props_from_app_dir(_app_dir)[source]

For a given app dir, read in the prop files

set_app_dir(new_app_dir)[source]

Dynamic reset of app dir, so that the location of app and user conf files. Re-read the props files

set_dynamic_props(new_d_props)[source]

Reset dynamic props Overwrite entire dynamic props fully each reset Ignore reset if new_d_props is None

spark_sql_props()[source]
stage_names()[source]
use_lock()[source]

smv.smvdriver module

class smv.smvdriver.SmvDriver[source]

Bases: object

Driver for an SMV application

SmvDriver handles the boiler plate around parsing driver args, constructing an SmvApp, and running an application. To use SmvDriver, override main and in the main block of your driver script call construct your driver and call run.

create_smv_app(smv_args, driver_args)[source]

Override this to define how this driver’s SmvApp is created

Default is just SmvApp.createInstance(smv_args). Note that it’s important to use createInstance to ensure that the singleton app is set.

SmvDriver will parse the full CLI args to distinguish the SMV args from from the args to your driver.

Parameters:
  • smv_args (list(str)) – CLI args for SMV - should be passed to SmvApp)
  • driver_args (list(str)) – CLI args for the driver
main(app, driver_args)[source]

Override this to define the driver logic

Default is to just call run onthe SmvApp.

Parameters:
  • app (SmvApp) – app which was constructed
  • driver_args (list(str)) – CLI args for the driver
run()[source]

Run the driver

smv.smvgenericmodule module

class smv.smvgenericmodule.SmvGenericModule(smvApp)[source]

Bases: abc.ABC

Abstract base class for all SMV modules, including dataset and task modules

IsSmvDataSet = True
class RunParams(fqn2df)[source]

Bases: object

Map from SmvGenericModule to resulting DataFrame

We need to simulate a dict from ds to df where the same object can be keyed by different datasets with the same fqn. For example, in the module

class X(SmvModule):
def requiresDS(self): return [Foo] def run(self, i): return i[Foo]

the i argument of the run method should map Foo to the correct DataFrame.

Parameters:(dict) – a map from fqn to DataFrame
description()[source]
doRun(known)[source]

Do the real data calculation or the task of this module

dsType()[source]

Return SmvGenericModule’s type

classmethod fqn()[source]

Returns the fully qualified name

instanceValHash()[source]

Hash computed based on instance values of the dataset, such as the timestamp of an input file

Returns:(int)
isEphemeral()[source]

Should this SmvGenericModule skip persisting its data?

Returns:True if this SmvGenericModule should not persist its data, false otherwise
Return type:(bool)
isSmvOutput()[source]
metaStrategy()[source]

Return an SmvIoStrategy for metadata persisting

metadata(df)[source]

User-defined metadata

Override this method to define metadata that will be logged with your module’s results. Defaults to empty dictionary.

Parameters:df (DataFrame) – result of running the module, used to generate metadata
Returns:dictionary of serializable metadata
Return type:(dict)
metadataHistorySize()[source]

Override to define the maximum size of the metadata history for this module

Returns:size
Return type:(int)
needsToRun()[source]

For non-ephemeral module, when persisted, no need to run for ephemeral module if all its requiresDS no need to run, also no need to run

persistStrategy()[source]

Return an SmvIoStrategy for data persisting

requiresDS()[source]

User-specified list of dependencies

Override this method to specify the SmvGenericModule needed as inputs.

Returns:a list of dependencies
Return type:(list(SmvGenericModule))
validateMetadata(current, history)[source]

User-defined metadata validation

Override this method to define validation rules for metadata given the current metadata and historical metadata.

Parameters:
  • current (dict) – current metadata kv
  • history (list(dict)) – list of historical metadata kv’s
Returns:

Validation failure message. Return None (or omit a return statement) if successful.

Return type:

(str)

version()[source]

Version number Deprecated!

Returns:version number of this SmvGenericModule
Return type:(str)
versioned_fqn
class smv.smvgenericmodule.SmvProcessModule(smvApp)[source]

Bases: smv.smvgenericmodule.SmvGenericModule

Base class for all intermediate data process modules

This is a sub-class of SmvGenericModule and as a sibling class of SmvIoModule.

  • SmvProcessModule: multiple input, single output
  • SmvInput: non-input, single output
  • SmvOutput: single-input, non-output

User need to implement:

  • requiresDS
  • run
doRun(known)[source]

Compute this dataset, and return the dataframe

isEphemeral()[source]

Default SmvProcessModule’s ephemeral flag to false so when mixin SmvOutput, will still be non-ephemeral

requiresConfig()[source]

User-specified list of config keys this module depends on

The given keys and their values will influence the dataset hash

requiresLib()[source]

User-specified list of ‘library’ dependencies. These are code, other than the DataSet’s run method that impact its output or behaviour.

Override this method to assist in re-running this module based on changes in other python objects (functions, classes, packages).

Limitations: For python modules and packages, the ‘requiresLib()’ method is limited to registering changes on the main file of the package (for module ‘foo’, that’s ‘foo.py’, for package ‘bar’, that’s ‘bar/__init__.py’). This means that if a module or package imports other modules, the imported module’s changes will not impact DataSet hashes.

Returns:a list of library dependencies
Return type:(list(module))
run(i)[source]

User-specified definition of the operations of this SmvModule

Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.

def requiresDS(self):
return [MyDependency]
def run(self, i):
return i[MyDependency].select(“importantColumn”)
Parameters:(RunParams) – mapping from input SmvGenericModule to DataFrame
Returns:output of this SmvModule
Return type:(DataFrame)
smvGetRunConfig(key)[source]

return the current user run configuration value for the given key.

smvGetRunConfigAsBool(key)[source]
smvGetRunConfigAsInt(key)[source]

smv.smvhdfs module

class smv.smvhdfs.SmvHDFS(j_smvHDFS)[source]

Bases: object

writeToFile(py_fileobj, file_name)[source]

smv.smvinput module

SMV DataSet Framework’s SmvInputBase interface

This module defines the abstract classes which formed the SmvGenericModule’s input DS Framework for clients’ projects

class smv.smvinput.SmvMultiCsvFiles(smvApp)[source]

Bases: smv.iomod.inputs.SmvMultiCsvInputFiles

Raw input from multiple csv files sharing single schema

Instead of a single input file, specify a data dir with files which share the same schema.

connectionName()[source]

Name of the connection to read/write

dir()[source]

Path to the directory containing the csv files and their schema

Returns:path
Return type:(str)
dirName()[source]

Path to the directory containing the csv files relative to the path defined in the connection

Returns:(str)
doRun(known)[source]

Do the real data calculation or the task of this module

get_connection()[source]

Get data connection instance from connectionName()

Connetion should be configured in conf file with at least a class FQN

Ex: smv.conn.con_name.class=smv.conn.SmvJdbcConnectionInfo

run(df)[source]
class smv.smvinput.SmvCsvFile(smvApp)[source]

Bases: smv.iomod.inputs.SmvCsvInputFile

Input from a file in CSV format Base class for CSV file input. User need to define path method.

Example: >>> class MyCsvFile(SmvCsvFile): >>> def path(self): >>> return “path/relative/to/smvInputDir/file.csv”

connectionName()[source]

Name of the connection to read/write

doRun(known)[source]

Do the real data calculation or the task of this module

fileName()[source]

User-specified file name relative to the path defined in the connection

Returns:(string)
get_connection()[source]

Get data connection instance from connectionName()

Connetion should be configured in conf file with at least a class FQN

Ex: smv.conn.con_name.class=smv.conn.SmvJdbcConnectionInfo

path()[source]

relative path to csv file

run(df)[source]
class smv.smvinput.SmvSqlCsvFile(smvApp)[source]

Bases: smv.smvinput.SmvCsvFile

Input from a file in CSV format and using a SQL query to access it

query()[source]

Query used to extract data from the table which reads the CSV file

Override this to specify your own query (optional). Default is equivalent to ‘select * from ‘ + tableName.

Returns:query
Return type:(str)
run(df)[source]
tableName = 'df'
class smv.smvinput.SmvCsvStringData(smvApp)[source]

Bases: smv.iomod.inputs.SmvCsvStringInputData

class smv.smvinput.SmvHiveTable(smvApp)[source]

Bases: smv.iomod.inputs.SmvHiveInputTable

Input from a Hive table This is for backward compatability. Will be deprecated. Please use iomod.SmvHiveInputTable instead.

User need to implement:

  • tableName

Custom query at reading is no more supported, please use downstream module to process data

connectionName()[source]

Name of the connection to read/write

get_connection()[source]

Get data connection instance from connectionName()

Connetion should be configured in conf file with at least a class FQN

Ex: smv.conn.con_name.class=smv.conn.SmvJdbcConnectionInfo

tableName()[source]

User-specified name Hive hive table to extract input from

Override this to specify your own table name.

Returns:table name
Return type:(str)

smv.smviostrategy module

class smv.smviostrategy.SmvCsvOnHdfsIoStrategy(smvApp, path, smvSchema, logger, write_mode='overwrite')[source]

Bases: smv.smviostrategy.SmvIoStrategy

Simply read/write of csv, given schema. Not for persisting, which should be handled by SmvCsvPersistenceStrategy

read()[source]

Read data from persisted

write(raw_data)[source]

Write data to persist file/db

class smv.smviostrategy.SmvCsvPersistenceStrategy(smvApp, versioned_fqn, file_path=None)[source]

Bases: smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy

Persist strategy for using Smv CSV IO handler

Parameters:
  • smvApp (SmvApp) –
  • versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
  • file_path (str) – parameter “versioned_fqn” is used to create a data file path. However if “file_path” is provided, all the other 2 parameters are ignored
isPersisted()[source]

Whether the data got successfully persisted before

remove()[source]

Remove persisted file(s)

class smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy(smvApp, versioned_fqn=None, postfix=None, file_path=None)[source]

Bases: smv.smviostrategy.SmvPersistenceStrategy

Abstract class for persisting data to Hdfs file system handling general tasks as file name creation, locking when write, etc.

Parameters:
  • smvApp (SmvApp) –
  • versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
  • postfix (str) – persisted file’s postfix
  • file_path (str) – parameters “versioned_fqn” and “postfix” are used to create a data file path. However if “file_path” is provided, all the other 3 parameters are ignored
isPersisted()[source]

Whether the data got successfully persisted before

read()[source]

Read data from persisted

remove()[source]

Remove persisted file(s)

write(dataframe)[source]

Write data to persist file/db

class smv.smviostrategy.SmvHiveIoStrategy(smvApp, conn_info, table_name, write_mode='errorifexists')[source]

Bases: smv.smviostrategy.SmvIoStrategy

Persist strategy for spark Hive IO

Parameters:
  • smvApp (SmvApp) –
  • conn_info (SmvConnectionInfo) – Hive connection info
  • table_name (str) – the table to read from/write to
  • write_mode (str) – spark df writer’s SaveMode
read()[source]

Read data from persisted

write(raw_data)[source]

Write data to persist file/db

class smv.smviostrategy.SmvIoStrategy[source]

Bases: abc.ABC

Base class for all module I/O, including read, write and persistence

read()[source]

Read data from persisted

write(raw_data)[source]

Write data to persist file/db

class smv.smviostrategy.SmvJdbcIoStrategy(smvApp, conn_info, table_name, write_mode='errorifexists')[source]

Bases: smv.smviostrategy.SmvIoStrategy

Persist strategy for spark JDBC IO

Parameters:
  • smvApp (SmvApp) –
  • conn_info (SmvConnectionInfo) – Jdbc connection info
  • table_name (str) – the table to read from/write to
  • write_mode (str) – spark df writer’s SaveMode
read()[source]

Read data from persisted

write(raw_data)[source]

Write data to persist file/db

class smv.smviostrategy.SmvJsonOnHdfsPersistenceStrategy(smvApp, path)[source]

Bases: smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy

class smv.smviostrategy.SmvNonOpPersistenceStrategy[source]

Bases: smv.smviostrategy.SmvPersistenceStrategy

Never persist, isPersisted always returns false

isPersisted()[source]

Whether the data got successfully persisted before

read()[source]

Read data from persisted

remove()[source]

Remove persisted file(s)

write(raw_data)[source]

Write data to persist file/db

class smv.smviostrategy.SmvParquetPersistenceStrategy(smvApp, versioned_fqn, file_path=None)[source]

Bases: smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy

Persist strategy for using Spark native parquet

Parameters:
  • smvApp (SmvApp) –
  • versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
  • file_path (str) – parameter “versioned_fqn” is used to create a data file path. However if “file_path” is provided, all the other 2 parameters are ignored
isPersisted()[source]

Whether the data got successfully persisted before

remove()[source]

Remove persisted file(s)

class smv.smviostrategy.SmvPersistenceStrategy[source]

Bases: smv.smviostrategy.SmvIoStrategy

Base class for IO strategy which used for persisting data

isPersisted()[source]

Whether the data got successfully persisted before

remove()[source]

Remove persisted file(s)

class smv.smviostrategy.SmvPicklablePersistenceStrategy(smvApp, versioned_fqn, file_path=None)[source]

Bases: smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy

class smv.smviostrategy.SmvSchemaOnHdfsIoStrategy(smvApp, path, write_mode='overwrite')[source]

Bases: smv.smviostrategy.SmvIoStrategy

Read/write of an SmvSchema file on Hdfs

read()[source]

Read data from persisted

write(smvSchema)[source]

Write data to persist file/db

class smv.smviostrategy.SmvTextOnHdfsIoStrategy(smvApp, path)[source]

Bases: smv.smviostrategy.SmvIoStrategy

Simple read/write a small text file on Hdfs

read()[source]

Read data from persisted

write(rawdata)[source]

Write data to persist file/db

class smv.smviostrategy.SmvXmlOnHdfsIoStrategy(smvApp, path, rowTag, schema=None)[source]

Bases: smv.smviostrategy.SmvIoStrategy

Read/write Xml file on Hdfs using Spark DF reader/writer

read()[source]

Read data from persisted

write(rawdata)[source]

Write data to persist file/db

smv.smvlock module

class smv.smvlock.NonOpLock[source]

Bases: object

class smv.smvlock.SmvLock(_jvm, _lock_path)[source]

Bases: object

Create a lock context

smv.smvmetadata module

class smv.smvmetadata.SmvMetaData[source]

Bases: object

addApplicationContext(smvApp)[source]
addDependencyMetadata(deps)[source]
addDqmValidationResult(result_json)[source]
addDuration(name, duration)[source]
addEddResult(edd_result_json_list)[source]
addFQN(fqn)[source]
addSchemaMetadata(df)[source]
addSystemMeta(mod)[source]
addTimestamp(dt)[source]
addUserMeta(user_meta)[source]
addVerHex(ver_hex)[source]
fromJson(meta_json)[source]
getEddResult()[source]
toJson()[source]
class smv.smvmetadata.SmvMetaHistory[source]

Bases: object

fromJson(hist_json)[source]
toJson()[source]
update(new_meta, max_size)[source]

smv.smvmodule module

SMV DataSet Framework interface

This module defines the abstract classes which formed the SmvModule Framework for clients’ projects

class smv.smvmodule.SmvOutput[source]

Bases: object

Mixin which marks an SmvModule as one of the output of its stage SmvOutputs are distinct from other SmvModule in that

  • The -s and –run-app options of smv-run only run SmvOutputs and their dependencies.

Deprecated. Will be replaced by sub-classed of smv.io.SmvOutput.

IsSmvOutput = True
tableName()[source]

The user-specified table name used when exporting data to Hive (optional) :returns: (string)

class smv.smvmodule.SmvModule(smvApp)[source]

Bases: smv.smvmodule.SmvSparkDfModule

SmvModule is the same as SmvSparkDfModule. Since it was used in all the SMV projects, we will not change the its name for backward compatibility. May deprecate when the full SmvGenericModule framework get adopted

class smv.smvmodule.SmvSqlModule(smvApp)[source]

Bases: smv.smvmodule.SmvModule

An SMV module which executes a SQL query in place of a run method

query()[source]

User-specified SQL query defining the behavior of this module

Before the query is executed, all dependencies will be registered as tables with the names specified in the tables method.

requiresDS()[source]

User-specified list of dependencies

Override this method to specify the SmvGenericModule needed as inputs.

Returns:a list of dependencies
Return type:(list(SmvGenericModule))
run(i)[source]

User-specified definition of the operations of this SmvModule

Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.

def requiresDS(self):
return [MyDependency]
def run(self, i):
return i[MyDependency].select(“importantColumn”)
Parameters:(RunParams) – mapping from input SmvGenericModule to DataFrame
Returns:output of this SmvModule
Return type:(DataFrame)
tables()[source]

Dict of dependencies by table name.

class smv.smvmodule.SmvModel(smvApp)[source]

Bases: smv.smvgenericmodule.SmvProcessModule

SmvModule whose result is a data model

The result must be picklable - see https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled.

dsType()[source]

Return SmvGenericModule’s type

persistStrategy()[source]

Return an SmvIoStrategy for data persisting

class smv.smvmodule.SmvModelExec(smvApp)[source]

Bases: smv.smvmodule.SmvModule

SmvModule that runs a model produced by an SmvModel

doRun(known)[source]

Compute this dataset, and return the dataframe

dsType()[source]

Return SmvGenericModule’s type

requiresModel()[source]

User-specified SmvModel module

Returns:the SmvModel this module depends on
Return type:(SmvModel)
run(i, model)[source]

User-specified definition of the operations of this SmvModule

Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.

def requiresDS(self):
return [MyDependency]
def run(self, i):
return i[MyDependency].select(“importantColumn”)
Parameters:
  • i (RunParams) – mapping from input SmvGenericModule to DataFrame
  • model (SmvModel) – the model this module depends on
Returns:

picklable output of this SmvModule

Return type:

(object)

smv.smvmodulerunner module

class smv.smvmodulerunner.SmvModuleRunner(modules, smvApp, runMonitorCallback=None)[source]

Bases: object

Represent the run-transaction. Provides the single entry point to run a group of modules

get_runinfo()[source]
publish(publish_dir=None)[source]
publish_local(local_dir)[source]
publish_to_hive()[source]
publish_to_jdbc()[source]
purge_persisted()[source]
quick_run(forceRun=False)[source]
run(forceRun=False)[source]

smv.smvschema module

class smv.smvschema.SmvSchema(j_smv_schema)[source]

Bases: object

The Python representation of SmvSchema scala class.

Most of the work is still being done on the scala side and this is just a pass through.

static discover(csv_path, csvAttributes, n=100000)[source]

Discover schema from CSV file with given csv attributes

static fromFile(schema_file)[source]
static fromString(schema_str)[source]
saveToLocalFile(schema_file)[source]

Save schema to local (driver) file

toValue(i, str_val)[source]

convert the string value to native value based on type defined in schema.

For example, if first column was of type Int, then call of toValue(0, “55”) would return integer 55.

smv.smvshell module

Helper functions available in SMV’s Python shell

smv.smvshell.quickRun(name)[source]

Run module and return result. No persist, but use existing persisted if possible. No DQM

smv.smvshell.fullRun(name)[source]

Run module and return result. Persist and run DQM if given

smv.smvshell.df(name, forceRun=False, quickRun=True)[source]

The DataFrame result of running the named module

Parameters:
  • name (str) – The unique name of a module. Does not have to be the FQN.
  • forceRun (bool) – True if the module should be forced to run even if it has persisted output. False otherwise.
  • quickRun (bool) – skip computing dqm+metadata and persisting csv
Returns:

The result of running the named module.

Return type:

(DataFrame)

smv.smvshell.dshash(name)[source]

The current hashOfHash for the named module as a hex string

Parameters:
  • name (str) – The uniquen name of a module. Does not have to be the FQN.
  • runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the correct hash.
Returns:

The hashOfHash of the named module

Return type:

(int)

smv.smvshell.getModel(name, forceRun=False)[source]

Get the result of running the named SmvModel module

Parameters:
  • name (str) – The name of a module. Does not have to be the FQN.
  • forceRun (bool) – True if the module should be forced to run even if it has persisted output. False otherwise.
  • version (str) – The name of the published version to load from
Returns:

The result of running the named module

Return type:

(object)

smv.smvshell.openHive(tableName)[source]

Read in a Hive table as a DataFrame

Parameters:tableName (str) – The name of the Hive table
Returns:The resulting DataFrame
Return type:(DataFrame)
smv.smvshell.openCsv(path, validate=False)[source]

Read in a CSV file as a DataFrame

Parameters:
  • path (str) – The path of the CSV file
  • validate (bool) – If true, validate the CSV before return DataFrame (raise error if malformatted)
Returns:

The resulting DataFrame

Return type:

(DataFrame)

smv.smvshell.help()[source]

Print a list of the SMV helper functions available in the shell

smv.smvshell.lsStage()[source]

List all the stages

smv.smvshell.ls(stageName=None)[source]

List all datasets in a stage

Parameters:stageName (str) – The name of the stage. Defaults to None, in which ase all datasets in all stages will be listed.
smv.smvshell.lsDead(stageName=None)[source]

List dead datasets in a stage

Parameters:stageName (str) – The name of the stage. Defaults to None, in which ase all datasets in all stages will be listed.
smv.smvshell.props()[source]

The current app propertied used by SMV after the app, user, command-line and dynamic props are merged.

Returns:The ‘mergedProps’ or final props used by SMV
Return type:(dict)
smv.smvshell.exportToHive(dsname)[source]

Export dataset’s running result to a Hive table

Parameters:dsname (str) – The name of an SmvModule
smv.smvshell.ancestors(dsname)[source]

List all ancestors of a dataset

Ancestors of a dataset are the dataset it depends on, directly or in-directly, including datasets from other stages.

Parameters:dsname (str) – The name of an SmvGenericModule
smv.smvshell.descendants(dsname)[source]

List all descendants of a dataset

Descendants of a dataset are the datasets which depend on it directly or in-directly, including datasets from other stages

Parameters:dsname (str) – The name of an SmvGenericModule
smv.smvshell.now()[source]

Print current time

smv.smvshell.smvDiscoverSchemaToFile(path, n=100000, ca=None)[source]

Try best to discover Schema from raw Csv file

Will save a schema file with postfix “.toBeReviewed” in local directory.

Parameters:
  • path (str) – Path to the CSV file
  • n (int) – Number of records to check for schema discovery, default 100k
  • ca (CsvAttributes) – Defaults to CsvWithHeader
smv.smvshell.run_test(test_name)[source]

Run a test with the given name without creating new Spark context

First reloads SMV and the test from source, then runs the test.

Parameters:test_name (str) – Name of the test to run
smv.smvshell.show_run_info(collector)[source]

Inspects the SmvRunInfoCollector object returned by smvApp.runModule

smv.smvshell.get_run_info(name, runConfig=None)[source]

Get the SmvRunInfoCollector with full information about a module and its dependencies

Parameters:
  • name (str) – name of the module whose information to collect
  • runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the info.

smv.utils module

smv.utils.check_socket(port)[source]

Check whether the given port is open to bind

smv.utils.infer_full_name_from_part(full_names, part_name)[source]

For a given partial name (postfix), infer full name from a list

smv.utils.is_string(obj)[source]

Check whether object is a string type with Python 2 and Python 3 compatibility

smv.utils.lazy_property(fn)[source]

Decorator that makes a property lazy-evaluated.

smv.utils.list_distinct(l)[source]

Return a the distinct version of the input list, perserve order

smv.utils.scala_seq_to_list(_jvm, j_seq)[source]

Convert Scala Seq to Python list

smv.utils.smv_copy_array(sc, *cols)[source]

Copy Python list to appropriate Java array

smv.utils.smvhash(text)[source]

Python’s hash function will return different numbers from run to from, starting from 3. Provide a deterministic hash function for use to calculate sourceCodeHash.

Module contents